1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
|
import asyncio
import aio_pika
async def main() -> None:
connection = await aio_pika.connect_robust(
"amqp://guest:guest@127.0.0.1/",
)
async with connection:
routing_key = "test_queue"
# Transactions conflicts with `publisher_confirms`
channel = await connection.channel(publisher_confirms=False)
# Use transactions with async context manager
async with channel.transaction():
# Publishing messages but delivery will not be done
# before committing this transaction
for i in range(10):
message = aio_pika.Message(body="Hello #{}".format(i).encode())
await channel.default_exchange.publish(
message, routing_key=routing_key,
)
# Using transactions manually
tx = channel.transaction()
# start transaction manually
await tx.select()
await channel.default_exchange.publish(
aio_pika.Message(body="Hello {}".format(routing_key).encode()),
routing_key=routing_key,
)
await tx.commit()
# Using transactions manually
tx = channel.transaction()
# start transaction manually
await tx.select()
await channel.default_exchange.publish(
aio_pika.Message(body="Should be rejected".encode()),
routing_key=routing_key,
)
await tx.rollback()
if __name__ == "__main__":
asyncio.run(main())
|