File: example.py

package info (click to toggle)
asyncio-dgram 2.2.0-3
  • links: PTS, VCS
  • area: main
  • in suites: sid, trixie
  • size: 160 kB
  • sloc: python: 598; makefile: 59; sh: 32
file content (35 lines) | stat: -rw-r--r-- 804 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import asyncio

import asyncio_dgram


async def udp_echo_client() -> None:
    stream = await asyncio_dgram.connect(("127.0.0.1", 8888))

    await stream.send(b"Hello World!")
    data, remote_addr = await stream.recv()
    print(f"Client received: {data.decode()!r}")

    stream.close()


async def udp_echo_server() -> None:
    stream = await asyncio_dgram.bind(("127.0.0.1", 8888))

    print(f"Serving on {stream.sockname}")

    data, remote_addr = await stream.recv()
    print(f"Echoing {data.decode()!r}")
    await stream.send(data, remote_addr)

    await asyncio.sleep(0.5)
    print(f"Shutting down server")


def main() -> None:
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.gather(udp_echo_server(), udp_echo_client()))


if __name__ == "__main__":
    main()