1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248
|
import asyncio
import uuid
from os import urandom
import pytest
from aiomisc_pytest import TCPProxy
import aiormq
from aiormq.abc import DeliveredMessage
async def test_simple(amqp_channel: aiormq.Channel):
await amqp_channel.basic_qos(prefetch_count=1)
assert amqp_channel.number
queue = asyncio.Queue()
deaclare_ok = await amqp_channel.queue_declare(auto_delete=True)
consume_ok = await amqp_channel.basic_consume(deaclare_ok.queue, queue.put)
await amqp_channel.basic_publish(
b"foo",
routing_key=deaclare_ok.queue,
properties=aiormq.spec.Basic.Properties(message_id="123"),
)
message: DeliveredMessage = await queue.get()
assert message.body == b"foo"
cancel_ok = await amqp_channel.basic_cancel(consume_ok.consumer_tag)
assert cancel_ok.consumer_tag == consume_ok.consumer_tag
assert cancel_ok.consumer_tag not in amqp_channel.consumers
await amqp_channel.queue_delete(deaclare_ok.queue)
deaclare_ok = await amqp_channel.queue_declare(auto_delete=True)
await amqp_channel.basic_publish(b"foo bar", routing_key=deaclare_ok.queue)
message = await amqp_channel.basic_get(deaclare_ok.queue, no_ack=True)
assert message.body == b"foo bar"
async def test_blank_body(amqp_channel: aiormq.Channel):
await amqp_channel.basic_qos(prefetch_count=1)
assert amqp_channel.number
queue = asyncio.Queue()
deaclare_ok = await amqp_channel.queue_declare(auto_delete=True)
consume_ok = await amqp_channel.basic_consume(deaclare_ok.queue, queue.put)
await amqp_channel.basic_publish(
b"",
routing_key=deaclare_ok.queue,
properties=aiormq.spec.Basic.Properties(message_id="123"),
)
message: DeliveredMessage = await queue.get()
assert message.body == b""
cancel_ok = await amqp_channel.basic_cancel(consume_ok.consumer_tag)
assert cancel_ok.consumer_tag == consume_ok.consumer_tag
assert cancel_ok.consumer_tag not in amqp_channel.consumers
await amqp_channel.queue_delete(deaclare_ok.queue)
deaclare_ok = await amqp_channel.queue_declare(auto_delete=True)
await amqp_channel.basic_publish(b"foo bar", routing_key=deaclare_ok.queue)
message = await amqp_channel.basic_get(deaclare_ok.queue, no_ack=True)
assert message.body == b"foo bar"
async def test_bad_consumer(amqp_channel: aiormq.Channel, loop):
channel: aiormq.Channel = amqp_channel
await channel.basic_qos(prefetch_count=1)
declare_ok = await channel.queue_declare()
future = loop.create_future()
await channel.basic_publish(b"urgent", routing_key=declare_ok.queue)
consumer_tag = loop.create_future()
async def bad_consumer(message):
await channel.basic_cancel(await consumer_tag)
future.set_result(message)
raise Exception
consume_ok = await channel.basic_consume(
declare_ok.queue, bad_consumer, no_ack=False,
)
consumer_tag.set_result(consume_ok.consumer_tag)
message = await future
await channel.basic_reject(message.delivery.delivery_tag, requeue=True)
assert message.body == b"urgent"
future = loop.create_future()
await channel.basic_consume(
declare_ok.queue, future.set_result, no_ack=True,
)
message = await future
assert message.body == b"urgent"
assert message.delivery_tag is not None
assert message.exchange is not None
assert message.redelivered
async def test_ack_nack_reject(amqp_channel: aiormq.Channel):
channel: aiormq.Channel = amqp_channel
await channel.basic_qos(prefetch_count=1)
declare_ok = await channel.queue_declare(auto_delete=True)
queue = asyncio.Queue()
await channel.basic_consume(declare_ok.queue, queue.put, no_ack=False)
await channel.basic_publish(b"rejected", routing_key=declare_ok.queue)
message: DeliveredMessage = await queue.get()
assert message.body == b"rejected"
assert message.delivery_tag is not None
assert message.exchange is not None
assert not message.redelivered
await channel.basic_reject(message.delivery.delivery_tag, requeue=False)
await channel.basic_publish(b"nacked", routing_key=declare_ok.queue)
message = await queue.get()
assert message.body == b"nacked"
await channel.basic_nack(message.delivery.delivery_tag, requeue=False)
await channel.basic_publish(b"acked", routing_key=declare_ok.queue)
message = await queue.get()
assert message.body == b"acked"
await channel.basic_ack(message.delivery.delivery_tag)
async def test_confirm_multiple(amqp_channel: aiormq.Channel):
"""
RabbitMQ has been observed to send confirmations in a strange pattern
when publishing simultaneously where only some messages are delivered
to a queue. It sends acks like this 1 2 4 5(multiple, confirming also 3).
This test is probably inconsequential without publisher_confirms
This is a regression for https://github.com/mosquito/aiormq/issues/10
"""
channel: aiormq.Channel = amqp_channel
exchange = uuid.uuid4().hex
await channel.exchange_declare(exchange, exchange_type="topic")
try:
declare_ok = await channel.queue_declare(exclusive=True)
await channel.queue_bind(
declare_ok.queue, exchange, routing_key="test.5",
)
for i in range(10):
messages = [
asyncio.ensure_future(
channel.basic_publish(
b"test", exchange=exchange,
routing_key="test.{}".format(i),
),
)
for i in range(10)
]
_, pending = await asyncio.wait(messages, timeout=0.2)
assert not pending, "not all publishes were completed (confirmed)"
await asyncio.sleep(0.05)
finally:
await channel.exchange_delete(exchange)
async def test_exclusive_queue_locked(amqp_connection):
channel0 = await amqp_connection.channel()
channel1 = await amqp_connection.channel()
qname = str(uuid.uuid4())
await channel0.queue_declare(qname, exclusive=True)
try:
await channel0.basic_consume(qname, print, exclusive=True)
with pytest.raises(aiormq.exceptions.ChannelLockedResource):
await channel1.queue_declare(qname)
await channel1.basic_consume(qname, print, exclusive=True)
finally:
await channel0.queue_delete(qname)
async def test_remove_writer_when_closed(amqp_channel: aiormq.Channel):
with pytest.raises(aiormq.exceptions.ChannelClosed):
await amqp_channel.queue_declare(
"amq.forbidden_queue_name", auto_delete=True,
)
with pytest.raises(aiormq.exceptions.ChannelInvalidStateError):
await amqp_channel.queue_delete("amq.forbidden_queue_name")
async def test_proxy_connection(proxy_connection, proxy: TCPProxy):
channel: aiormq.Channel = await proxy_connection.channel()
await channel.queue_declare(auto_delete=True)
async def test_declare_queue_timeout(proxy_connection, proxy: TCPProxy):
for _ in range(3):
channel: aiormq.Channel = await proxy_connection.channel()
qname = str(uuid.uuid4())
with proxy.slowdown(read_delay=5, write_delay=0):
with pytest.raises(asyncio.TimeoutError):
await channel.queue_declare(
qname, auto_delete=True, timeout=0.5,
)
async def test_big_message(amqp_channel: aiormq.Channel):
size = 20 * 1024 * 1024
message = urandom(size)
await amqp_channel.basic_publish(message)
async def test_routing_key_too_large(amqp_channel: aiormq.Channel):
routing_key = "x" * 256
with pytest.raises(ValueError):
await amqp_channel.basic_publish(b"foo bar", routing_key=routing_key)
exchange = uuid.uuid4().hex
await amqp_channel.exchange_declare(exchange, exchange_type="topic")
with pytest.raises(ValueError):
await amqp_channel.exchange_bind(exchange, exchange, routing_key)
with pytest.raises(ValueError):
await amqp_channel.exchange_unbind(exchange, exchange, routing_key)
queue = await amqp_channel.queue_declare(exclusive=True)
with pytest.raises(ValueError):
await amqp_channel.queue_bind(queue.queue, exchange, routing_key)
with pytest.raises(ValueError):
await amqp_channel.queue_unbind(queue.queue, exchange, routing_key)
await amqp_channel.exchange_delete(exchange)
|