File: test_queue.py

package info (click to toggle)
python-aioamqp 0.15.0-2
  • links: PTS, VCS
  • area: main
  • in suites: bookworm, forky, sid, trixie
  • size: 456 kB
  • sloc: python: 2,741; makefile: 187
file content (242 lines) | stat: -rw-r--r-- 9,455 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
"""
    Amqp queue class tests
"""

import asyncio
import asynctest

from . import testcase
from .. import exceptions


class QueueDeclareTestCase(testcase.RabbitTestCaseMixin, asynctest.TestCase):

    def setUp(self):
        super().setUp()
        self.consume_future = asyncio.Future()

    async def callback(self, body, envelope, properties):
        self.consume_future.set_result((body, envelope, properties))

    async def get_callback_result(self):
        await self.consume_future
        result = self.consume_future.result()
        self.consume_future = asyncio.Future()
        return result

    async def test_queue_declare_no_name(self):
        result = await self.channel.queue_declare()
        self.assertIsNotNone(result['queue'])

    async def test_queue_declare(self):
        queue_name = 'queue_name'
        result = await self.channel.queue_declare('queue_name')
        self.assertEqual(result['message_count'], 0)
        self.assertEqual(result['consumer_count'], 0)
        self.assertEqual(result['queue'].split('.')[-1], queue_name)
        self.assertTrue(result)

    async def test_queue_declare_passive(self):
        queue_name = 'queue_name'
        await self.channel.queue_declare('queue_name')
        result = await self.channel.queue_declare(queue_name, passive=True)
        self.assertEqual(result['message_count'], 0)
        self.assertEqual(result['consumer_count'], 0)
        self.assertEqual(result['queue'].split('.')[-1], queue_name)

    async def test_queue_declare_custom_x_message_ttl_32_bits(self):
        queue_name = 'queue_name'
        # 2147483648 == 10000000000000000000000000000000
        # in binary, meaning it is 32 bit long
        x_message_ttl = 2147483648
        result = await self.channel.queue_declare('queue_name', arguments={
            'x-message-ttl': x_message_ttl
        })
        self.assertEqual(result['message_count'], 0)
        self.assertEqual(result['consumer_count'], 0)
        self.assertEqual(result['queue'].split('.')[-1], queue_name)
        self.assertTrue(result)

    async def test_queue_declare_passive_nonexistant_queue(self):
        queue_name = 'queue_name'
        with self.assertRaises(exceptions.ChannelClosed) as cm:
            await self.channel.queue_declare(queue_name, passive=True)

        self.assertEqual(cm.exception.code, 404)

    async def test_wrong_parameter_queue(self):
        queue_name = 'queue_name'
        await self.channel.queue_declare(queue_name, exclusive=False, auto_delete=False)

        with self.assertRaises(exceptions.ChannelClosed) as cm:
            await self.channel.queue_declare(queue_name,
                passive=False, exclusive=True, auto_delete=True)

        self.assertIn(cm.exception.code, [405, 406])

    async def test_multiple_channel_same_queue(self):
        queue_name = 'queue_name'

        channel1 = await self.amqp.channel()
        channel2 = await self.amqp.channel()

        result = await channel1.queue_declare(queue_name, passive=False)
        self.assertEqual(result['message_count'], 0)
        self.assertEqual(result['consumer_count'], 0)
        self.assertEqual(result['queue'].split('.')[-1], queue_name)

        result = await channel2.queue_declare(queue_name, passive=False)
        self.assertEqual(result['message_count'], 0)
        self.assertEqual(result['consumer_count'], 0)
        self.assertEqual(result['queue'].split('.')[-1], queue_name)

    async def _test_queue_declare(self, queue_name, exclusive=False, durable=False, auto_delete=False):
        # declare queue
        result = await self.channel.queue_declare(
            queue_name, no_wait=False, exclusive=exclusive, durable=durable,
            auto_delete=auto_delete)

        # assert returned results has the good arguments
        # in test the channel declared queues with prefixed names, to get the full name of the
        # declared queue we have to use self.full_name function
        self.assertEqual(self.full_name(queue_name), result['queue'])

        queues = self.list_queues()
        queue = queues[queue_name]

        # assert queue has been declared witht the good arguments
        self.assertEqual(queue_name, queue['name'])
        self.assertEqual(0, queue['consumers'])
        self.assertEqual(0, queue['messages_ready'])
        self.assertEqual(auto_delete, queue['auto_delete'])
        self.assertEqual(durable, queue['durable'])

        # delete queue
        await self.safe_queue_delete(queue_name)

    def test_durable_and_auto_deleted(self):
        self.loop.run_until_complete(
            self._test_queue_declare('q', exclusive=False, durable=True, auto_delete=True))

    def test_durable_and_not_auto_deleted(self):
        self.loop.run_until_complete(
            self._test_queue_declare('q', exclusive=False, durable=True, auto_delete=False))

    def test_not_durable_and_auto_deleted(self):
        self.loop.run_until_complete(
            self._test_queue_declare('q', exclusive=False, durable=False, auto_delete=True))

    def test_not_durable_and_not_auto_deleted(self):
        self.loop.run_until_complete(
            self._test_queue_declare('q', exclusive=False, durable=False, auto_delete=False))

    async def test_exclusive(self):
        # create an exclusive queue
        await self.channel.queue_declare("q", exclusive=True)
        # consume it
        await self.channel.basic_consume(self.callback, queue_name="q", no_wait=False)
        # create an other amqp connection

        _transport, protocol = await self.create_amqp()
        channel = await self.create_channel(amqp=protocol)
        # assert that this connection cannot connect to the queue
        with self.assertRaises(exceptions.ChannelClosed):
            await channel.basic_consume(self.callback, queue_name="q", no_wait=False)
        # amqp and channels are auto deleted by test case

    async def test_not_exclusive(self):
        # create a non-exclusive queue
        await self.channel.queue_declare('q', exclusive=False)
        # consume it
        await self.channel.basic_consume(self.callback, queue_name='q', no_wait=False)
        # create an other amqp connection
        _transport, protocol = await self.create_amqp()
        channel = await self.create_channel(amqp=protocol)
        # assert that this connection can connect to the queue
        await channel.basic_consume(self.callback, queue_name='q', no_wait=False)


class QueueDeleteTestCase(testcase.RabbitTestCaseMixin, asynctest.TestCase):


    async def test_delete_queue(self):
        queue_name = 'queue_name'
        await self.channel.queue_declare(queue_name)
        result = await self.channel.queue_delete(queue_name)
        self.assertTrue(result)

    async def test_delete_inexistant_queue(self):
        queue_name = 'queue_name'
        if self.server_version() < (3, 3, 5):
            with self.assertRaises(exceptions.ChannelClosed) as cm:
                result = await self.channel.queue_delete(queue_name)

            self.assertEqual(cm.exception.code, 404)

        else:
            result = await self.channel.queue_delete(queue_name)
            self.assertTrue(result)

class QueueBindTestCase(testcase.RabbitTestCaseMixin, asynctest.TestCase):


    async def test_bind_queue(self):
        queue_name = 'queue_name'
        exchange_name = 'exchange_name'

        await self.channel.queue_declare(queue_name)
        await self.channel.exchange_declare(exchange_name, type_name='direct')

        result = await self.channel.queue_bind(queue_name, exchange_name, routing_key='')
        self.assertTrue(result)

    async def test_bind_unexistant_exchange(self):
        queue_name = 'queue_name'
        exchange_name = 'exchange_name'


        await self.channel.queue_declare(queue_name)
        with self.assertRaises(exceptions.ChannelClosed) as cm:
            await self.channel.queue_bind(queue_name, exchange_name, routing_key='')
        self.assertEqual(cm.exception.code, 404)

    async def test_bind_unexistant_queue(self):
        queue_name = 'queue_name'
        exchange_name = 'exchange_name'


        await self.channel.exchange_declare(exchange_name, type_name='direct')

        with self.assertRaises(exceptions.ChannelClosed) as cm:
            await self.channel.queue_bind(queue_name, exchange_name, routing_key='')
        self.assertEqual(cm.exception.code, 404)

    async def test_unbind_queue(self):
        queue_name = 'queue_name'
        exchange_name = 'exchange_name'

        await self.channel.queue_declare(queue_name)
        await self.channel.exchange_declare(exchange_name, type_name='direct')

        await self.channel.queue_bind(queue_name, exchange_name, routing_key='')

        result = await self.channel.queue_unbind(queue_name, exchange_name, routing_key='')
        self.assertTrue(result)


class QueuePurgeTestCase(testcase.RabbitTestCaseMixin, asynctest.TestCase):


    async def test_purge_queue(self):
        queue_name = 'queue_name'

        await self.channel.queue_declare(queue_name)
        result = await self.channel.queue_purge(queue_name)
        self.assertEqual(result['message_count'], 0)

    async def test_purge_queue_inexistant_queue(self):
        queue_name = 'queue_name'

        with self.assertRaises(exceptions.ChannelClosed) as cm:
            await self.channel.queue_purge(queue_name)
        self.assertEqual(cm.exception.code, 404)