File: test_close.py

package info (click to toggle)
python-aioamqp 0.15.0-2
  • links: PTS, VCS
  • area: main
  • in suites: bookworm, forky, sid, trixie
  • size: 456 kB
  • sloc: python: 2,741; makefile: 187
file content (53 lines) | stat: -rw-r--r-- 1,749 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
import asyncio
import asynctest

from . import testcase
from .. import exceptions


class CloseTestCase(testcase.RabbitTestCaseMixin, asynctest.TestCase):

    def setUp(self):
        super().setUp()
        self.consume_future = asyncio.Future()

    async def callback(self, body, envelope, properties):
        self.consume_future.set_result((body, envelope, properties))

    async def get_callback_result(self):
        await self.consume_future
        result = self.consume_future.result()
        self.consume_future = asyncio.Future()
        return result

    async def test_close(self):
        channel = await self.create_channel()
        self.assertTrue(channel.is_open)
        await channel.close()
        self.assertFalse(channel.is_open)

    async def test_multiple_close(self):
        channel = await self.create_channel()
        await channel.close()
        self.assertFalse(channel.is_open)
        with self.assertRaises(exceptions.ChannelClosed):
            await channel.close()

    async def test_cannot_publish_after_close(self):
        channel = self.channel
        await channel.close()
        with self.assertRaises(exceptions.ChannelClosed):
            await self.channel.publish("coucou", "my_e", "")

    async def test_cannot_declare_queue_after_close(self):
        channel = self.channel
        await channel.close()
        with self.assertRaises(exceptions.ChannelClosed):
            await self.channel.queue_declare("qq")

    async def test_cannot_consume_after_close(self):
        channel = self.channel
        await self.channel.queue_declare("q")
        await channel.close()
        with self.assertRaises(exceptions.ChannelClosed):
            await channel.basic_consume(self.callback)