File: test_protocol.py

package info (click to toggle)
python-aioamqp 0.15.0-2
  • links: PTS, VCS
  • area: main
  • in suites: bookworm, forky, sid, trixie
  • size: 456 kB
  • sloc: python: 2,741; makefile: 187
file content (107 lines) | stat: -rw-r--r-- 3,912 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
"""
    Test our Protocol class
"""
import asynctest
from unittest import mock

from . import testcase
from .. import exceptions
from .. import connect as amqp_connect
from .. import from_url as amqp_from_url
from ..protocol import AmqpProtocol, OPEN


class ProtocolTestCase(testcase.RabbitTestCaseMixin, asynctest.TestCase):

    async def test_connect(self):
        _transport, protocol = await amqp_connect(
            host=self.host, port=self.port, virtualhost=self.vhost
        )
        self.assertEqual(protocol.state, OPEN)
        await protocol.close()

    async def test_connect_products_info(self):
        client_properties = {
            'program': 'aioamqp-tests',
            'program_version': '0.1.1',
        }
        _transport, protocol = await amqp_connect(
            host=self.host,
            port=self.port,
            virtualhost=self.vhost,
            client_properties=client_properties,
        )

        self.assertEqual(protocol.client_properties, client_properties)
        await protocol.close()

    async def test_connection_unexistant_vhost(self):
        with self.assertRaises(exceptions.AmqpClosedConnection):
            await amqp_connect(host=self.host, port=self.port, virtualhost='/unexistant')

    def test_connection_wrong_login_password(self):
        with self.assertRaises(exceptions.AmqpClosedConnection):
            self.loop.run_until_complete(
                amqp_connect(host=self.host, port=self.port, login='wrong', password='wrong')
            )

    async def test_connection_from_url(self):
        with mock.patch('aioamqp.connect') as connect:
            async def func(*x, **y):
                return 1, 2
            connect.side_effect = func
            await amqp_from_url('amqp://tom:pass@example.com:7777/myvhost')
            connect.assert_called_once_with(
                insist=False,
                password='pass',
                login_method='PLAIN',
                login='tom',
                host='example.com',
                protocol_factory=AmqpProtocol,
                virtualhost='myvhost',
                port=7777,
            )

    async def test_ssl_context_connection_from_url(self):
        ssl_context = mock.Mock()
        with mock.patch('aioamqp.connect') as connect:
            async def func(*x, **y):
                return 1, 2
            connect.side_effect = func
            await amqp_from_url('amqps://tom:pass@example.com:7777/myvhost', ssl=ssl_context)
            connect.assert_called_once_with(
                insist=False,
                password='pass',
                login_method='PLAIN',
                ssl=ssl_context,
                login='tom',
                host='example.com',
                protocol_factory=AmqpProtocol,
                virtualhost='myvhost',
                port=7777,
            )

    async def test_amqps_connection_from_url(self):
        ssl_context = mock.Mock()
        with mock.patch('ssl.create_default_context') as create_default_context:
            with mock.patch('aioamqp.connect') as connect:
                create_default_context.return_value = ssl_context
                async def func(*x, **y):
                    return 1, 2
                connect.side_effect = func
                await amqp_from_url('amqps://tom:pass@example.com:7777/myvhost')
                connect.assert_called_once_with(
                    insist=False,
                    password='pass',
                    login_method='PLAIN',
                    ssl=ssl_context,
                    login='tom',
                    host='example.com',
                    protocol_factory=AmqpProtocol,
                    virtualhost='myvhost',
                    port=7777,
                )

    async def test_from_url_raises_on_wrong_scheme(self):
        with self.assertRaises(ValueError):
            await amqp_from_url('invalid://')