1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
|
import asyncio
import tornado.ioloop
import tornado.web
from aio_pika import Message, connect_robust
class Base:
QUEUE: asyncio.Queue
class SubscriberHandler(tornado.web.RequestHandler, Base):
async def get(self) -> None:
message = await self.QUEUE.get()
await self.finish(message.body)
class PublisherHandler(tornado.web.RequestHandler):
async def post(self) -> None:
connection = self.application.settings["amqp_connection"]
channel = await connection.channel()
try:
await channel.default_exchange.publish(
Message(body=self.request.body), routing_key="test",
)
finally:
await channel.close()
await self.finish("OK")
async def make_app() -> tornado.web.Application:
amqp_connection = await connect_robust()
channel = await amqp_connection.channel()
queue = await channel.declare_queue("test", auto_delete=True)
Base.QUEUE = asyncio.Queue()
await queue.consume(Base.QUEUE.put, no_ack=True)
return tornado.web.Application(
[(r"/publish", PublisherHandler), (r"/subscribe", SubscriberHandler)],
amqp_connection=amqp_connection,
)
async def main() -> None:
app = await make_app()
app.listen(8888)
await asyncio.Future()
if __name__ == "__main__":
asyncio.run(main())
|