1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
|
import asynctest
import asynctest.mock
import asyncio
from aioamqp.protocol import OPEN, CLOSED
from . import testcase
class ConnectionLostTestCase(testcase.RabbitTestCaseMixin, asynctest.TestCase):
_multiprocess_can_split_ = True
async def test_connection_lost(self):
self.callback_called = False
def callback(*args, **kwargs):
self.callback_called = True
amqp = self.amqp
amqp._on_error_callback = callback
channel = self.channel
self.assertEqual(amqp.state, OPEN)
self.assertTrue(channel.is_open)
amqp._stream_reader._transport.close() # this should have the same effect as the tcp connection being lost
await asyncio.wait_for(amqp.worker, 1)
self.assertEqual(amqp.state, CLOSED)
self.assertFalse(channel.is_open)
self.assertTrue(self.callback_called)
|