File: rpc_custom_translator.py

package info (click to toggle)
aiozmq 1.0.0-1
  • links: PTS, VCS
  • area: main
  • in suites: bookworm, forky, sid, trixie
  • size: 688 kB
  • sloc: python: 7,225; makefile: 162
file content (57 lines) | stat: -rw-r--r-- 1,160 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
import asyncio
import aiozmq.rpc
import msgpack


class Point:
    def __init__(self, x, y):
        self.x = x
        self.y = y

    def __eq__(self, other):
        if isinstance(other, Point):
            return (self.x, self.y) == (other.x, other.y)
        return NotImplemented


translation_table = {
    0: (
        Point,
        lambda value: msgpack.packb((value.x, value.y)),
        lambda binary: Point(*msgpack.unpackb(binary)),
    ),
}


class ServerHandler(aiozmq.rpc.AttrHandler):
    @aiozmq.rpc.method
    def remote(self, val):
        return val


async def go():
    server = await aiozmq.rpc.serve_rpc(
        ServerHandler(), bind="tcp://*:*", translation_table=translation_table
    )
    server_addr = list(server.transport.bindings())[0]

    client = await aiozmq.rpc.connect_rpc(
        connect=server_addr, translation_table=translation_table
    )

    ret = await client.call.remote(Point(1, 2))
    assert ret == Point(1, 2)

    server.close()
    await server.wait_closed()
    client.close()
    await client.wait_closed()


def main():
    asyncio.run(go())
    print("DONE")


if __name__ == "__main__":
    main()