File: core_dealer_router.py

package info (click to toggle)
aiozmq 1.0.0-1
  • links: PTS, VCS
  • area: main
  • in suites: bookworm, forky, sid, trixie
  • size: 688 kB
  • sloc: python: 7,225; makefile: 162
file content (71 lines) | stat: -rw-r--r-- 1,579 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
import asyncio
import aiozmq
import zmq


class ZmqDealerProtocol(aiozmq.ZmqProtocol):

    transport = None

    def __init__(self, queue, on_close):
        self.queue = queue
        self.on_close = on_close

    def connection_made(self, transport):
        self.transport = transport

    def msg_received(self, msg):
        self.queue.put_nowait(msg)

    def connection_lost(self, exc):
        self.on_close.set_result(exc)


class ZmqRouterProtocol(aiozmq.ZmqProtocol):

    transport = None

    def __init__(self, on_close):
        self.on_close = on_close

    def connection_made(self, transport):
        self.transport = transport

    def msg_received(self, msg):
        self.transport.write(msg)

    def connection_lost(self, exc):
        self.on_close.set_result(exc)


async def go():
    router_closed = asyncio.Future()
    dealer_closed = asyncio.Future()
    router, _ = await aiozmq.create_zmq_connection(
        lambda: ZmqRouterProtocol(router_closed), zmq.ROUTER, bind="tcp://127.0.0.1:*"
    )

    addr = list(router.bindings())[0]
    queue = asyncio.Queue()
    dealer, _ = await aiozmq.create_zmq_connection(
        lambda: ZmqDealerProtocol(queue, dealer_closed), zmq.DEALER, connect=addr
    )

    for i in range(10):
        msg = (b"data", b"ask", str(i).encode("utf-8"))
        dealer.write(msg)
        answer = await queue.get()
        print(answer)
    dealer.close()
    await dealer_closed
    router.close()
    await router_closed


def main():
    asyncio.run(go())
    print("DONE")


if __name__ == "__main__":
    main()