1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107
|
"""
Test our Protocol class
"""
import asynctest
from unittest import mock
from . import testcase
from .. import exceptions
from .. import connect as amqp_connect
from .. import from_url as amqp_from_url
from ..protocol import AmqpProtocol, OPEN
class ProtocolTestCase(testcase.RabbitTestCaseMixin, asynctest.TestCase):
async def test_connect(self):
_transport, protocol = await amqp_connect(
host=self.host, port=self.port, virtualhost=self.vhost
)
self.assertEqual(protocol.state, OPEN)
await protocol.close()
async def test_connect_products_info(self):
client_properties = {
'program': 'aioamqp-tests',
'program_version': '0.1.1',
}
_transport, protocol = await amqp_connect(
host=self.host,
port=self.port,
virtualhost=self.vhost,
client_properties=client_properties,
)
self.assertEqual(protocol.client_properties, client_properties)
await protocol.close()
async def test_connection_unexistant_vhost(self):
with self.assertRaises(exceptions.AmqpClosedConnection):
await amqp_connect(host=self.host, port=self.port, virtualhost='/unexistant')
def test_connection_wrong_login_password(self):
with self.assertRaises(exceptions.AmqpClosedConnection):
self.loop.run_until_complete(
amqp_connect(host=self.host, port=self.port, login='wrong', password='wrong')
)
async def test_connection_from_url(self):
with mock.patch('aioamqp.connect') as connect:
async def func(*x, **y):
return 1, 2
connect.side_effect = func
await amqp_from_url('amqp://tom:pass@example.com:7777/myvhost')
connect.assert_called_once_with(
insist=False,
password='pass',
login_method='PLAIN',
login='tom',
host='example.com',
protocol_factory=AmqpProtocol,
virtualhost='myvhost',
port=7777,
)
async def test_ssl_context_connection_from_url(self):
ssl_context = mock.Mock()
with mock.patch('aioamqp.connect') as connect:
async def func(*x, **y):
return 1, 2
connect.side_effect = func
await amqp_from_url('amqps://tom:pass@example.com:7777/myvhost', ssl=ssl_context)
connect.assert_called_once_with(
insist=False,
password='pass',
login_method='PLAIN',
ssl=ssl_context,
login='tom',
host='example.com',
protocol_factory=AmqpProtocol,
virtualhost='myvhost',
port=7777,
)
async def test_amqps_connection_from_url(self):
ssl_context = mock.Mock()
with mock.patch('ssl.create_default_context') as create_default_context:
with mock.patch('aioamqp.connect') as connect:
create_default_context.return_value = ssl_context
async def func(*x, **y):
return 1, 2
connect.side_effect = func
await amqp_from_url('amqps://tom:pass@example.com:7777/myvhost')
connect.assert_called_once_with(
insist=False,
password='pass',
login_method='PLAIN',
ssl=ssl_context,
login='tom',
host='example.com',
protocol_factory=AmqpProtocol,
virtualhost='myvhost',
port=7777,
)
async def test_from_url_raises_on_wrong_scheme(self):
with self.assertRaises(ValueError):
await amqp_from_url('invalid://')
|