File: test_connection_lost.py

package info (click to toggle)
python-aioamqp 0.15.0-2
  • links: PTS, VCS
  • area: main
  • in suites: bookworm, forky, sid, trixie
  • size: 456 kB
  • sloc: python: 2,741; makefile: 187
file content (30 lines) | stat: -rw-r--r-- 881 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import asynctest
import asynctest.mock
import asyncio

from aioamqp.protocol import OPEN, CLOSED

from . import testcase


class ConnectionLostTestCase(testcase.RabbitTestCaseMixin, asynctest.TestCase):

    _multiprocess_can_split_ = True

    async def test_connection_lost(self):

        self.callback_called = False

        def callback(*args, **kwargs):
            self.callback_called = True

        amqp = self.amqp
        amqp._on_error_callback = callback
        channel = self.channel
        self.assertEqual(amqp.state, OPEN)
        self.assertTrue(channel.is_open)
        amqp._stream_reader._transport.close()  # this should have the same effect as the tcp connection being lost
        await asyncio.wait_for(amqp.worker, 1)
        self.assertEqual(amqp.state, CLOSED)
        self.assertFalse(channel.is_open)
        self.assertTrue(self.callback_called)