1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100
|
import asynctest
import asyncio
from . import testcase
class PublishTestCase(testcase.RabbitTestCaseMixin, asynctest.TestCase):
_multiprocess_can_split_ = True
async def test_publish(self):
# declare
await self.channel.queue_declare("q", exclusive=True, no_wait=False)
await self.channel.exchange_declare("e", "fanout")
await self.channel.queue_bind("q", "e", routing_key='')
# publish
await self.channel.publish("coucou", "e", routing_key='')
queues = self.list_queues()
self.assertIn("q", queues)
self.assertEqual(1, queues["q"]['messages'])
async def test_empty_publish(self):
# declare
await self.channel.queue_declare("q", exclusive=True, no_wait=False)
await self.channel.exchange_declare("e", "fanout")
await self.channel.queue_bind("q", "e", routing_key='')
# publish
await self.channel.publish("", "e", routing_key='')
queues = self.list_queues()
self.assertIn("q", queues)
self.assertEqual(1, queues["q"]["messages"])
self.assertEqual(0, queues["q"]["message_bytes"])
async def test_big_publish(self):
# declare
await self.channel.queue_declare("q", exclusive=True, no_wait=False)
await self.channel.exchange_declare("e", "fanout")
await self.channel.queue_bind("q", "e", routing_key='')
# publish
await self.channel.publish("a"*1000000, "e", routing_key='')
queues = self.list_queues()
self.assertIn("q", queues)
self.assertEqual(1, queues["q"]['messages'])
async def test_big_unicode_publish(self):
# declare
await self.channel.queue_declare("q", exclusive=True, no_wait=False)
await self.channel.exchange_declare("e", "fanout")
await self.channel.queue_bind("q", "e", routing_key='')
# publish
await self.channel.publish("Ы"*1000000, "e", routing_key='')
await self.channel.publish("Ы"*1000000, "e", routing_key='')
queues = self.list_queues()
self.assertIn("q", queues)
self.assertEqual(2, queues["q"]['messages'])
async def test_confirmed_publish(self):
# declare
await self.channel.confirm_select()
self.assertTrue(self.channel.publisher_confirms)
await self.channel.queue_declare("q", exclusive=True, no_wait=False)
await self.channel.exchange_declare("e", "fanout")
await self.channel.queue_bind("q", "e", routing_key='')
# publish
await self.channel.publish("coucou", "e", routing_key='')
queues = self.list_queues()
self.assertIn("q", queues)
self.assertEqual(1, queues["q"]['messages'])
async def test_return_from_publish(self):
called = False
async def callback(channel, body, envelope, properties):
nonlocal called
called = True
channel = await self.amqp.channel(return_callback=callback)
# declare
await channel.exchange_declare("e", "topic")
# publish
await channel.publish("coucou", "e", routing_key="not.found",
mandatory=True)
for _i in range(10):
if called:
break
await asyncio.sleep(0.1)
self.assertTrue(called)
|