1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71
|
import asyncio
import aiozmq
import zmq
class ZmqDealerProtocol(aiozmq.ZmqProtocol):
transport = None
def __init__(self, queue, on_close):
self.queue = queue
self.on_close = on_close
def connection_made(self, transport):
self.transport = transport
def msg_received(self, msg):
self.queue.put_nowait(msg)
def connection_lost(self, exc):
self.on_close.set_result(exc)
class ZmqRouterProtocol(aiozmq.ZmqProtocol):
transport = None
def __init__(self, on_close):
self.on_close = on_close
def connection_made(self, transport):
self.transport = transport
def msg_received(self, msg):
self.transport.write(msg)
def connection_lost(self, exc):
self.on_close.set_result(exc)
async def go():
router_closed = asyncio.Future()
dealer_closed = asyncio.Future()
router, _ = await aiozmq.create_zmq_connection(
lambda: ZmqRouterProtocol(router_closed), zmq.ROUTER, bind="tcp://127.0.0.1:*"
)
addr = list(router.bindings())[0]
queue = asyncio.Queue()
dealer, _ = await aiozmq.create_zmq_connection(
lambda: ZmqDealerProtocol(queue, dealer_closed), zmq.DEALER, connect=addr
)
for i in range(10):
msg = (b"data", b"ask", str(i).encode("utf-8"))
dealer.write(msg)
answer = await queue.get()
print(answer)
dealer.close()
await dealer_closed
router.close()
await router_closed
def main():
asyncio.run(go())
print("DONE")
if __name__ == "__main__":
main()
|