File: ice-client.py

package info (click to toggle)
python-aioice 0.10.1-1
  • links: PTS, VCS
  • area: main
  • in suites: sid, trixie
  • size: 424 kB
  • sloc: python: 3,832; makefile: 20; sh: 1
file content (118 lines) | stat: -rw-r--r-- 3,286 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
#!/usr/bin/env python

import argparse
import asyncio
import json
import logging

import websockets

import aioice

STUN_SERVER = ("stun.l.google.com", 19302)
WEBSOCKET_URI = "ws://127.0.0.1:8765"


async def offer(components: int) -> None:
    connection = aioice.Connection(
        ice_controlling=True, components=components, stun_server=STUN_SERVER
    )
    await connection.gather_candidates()

    websocket = await websockets.connect(WEBSOCKET_URI)

    # send offer
    await websocket.send(
        json.dumps(
            {
                "candidates": [c.to_sdp() for c in connection.local_candidates],
                "password": connection.local_password,
                "username": connection.local_username,
            }
        )
    )

    # await answer
    message = json.loads(await websocket.recv())
    print("received answer", message)
    for c in message["candidates"]:
        await connection.add_remote_candidate(aioice.Candidate.from_sdp(c))
    await connection.add_remote_candidate(None)
    connection.remote_username = message["username"]
    connection.remote_password = message["password"]

    await websocket.close()

    await connection.connect()
    print("connected")

    # send data
    data = b"hello"
    component = 1
    print("sending %s on component %d" % (repr(data), component))
    await connection.sendto(data, component)
    data, component = await connection.recvfrom()
    print("received %s on component %d" % (repr(data), component))

    await asyncio.sleep(5)
    await connection.close()


async def answer(components: int) -> None:
    connection = aioice.Connection(
        ice_controlling=False, components=components, stun_server=STUN_SERVER
    )
    await connection.gather_candidates()

    websocket = await websockets.connect(WEBSOCKET_URI)

    # await offer
    message = json.loads(await websocket.recv())
    print("received offer", message)
    for c in message["candidates"]:
        await connection.add_remote_candidate(aioice.Candidate.from_sdp(c))
    await connection.add_remote_candidate(None)
    connection.remote_username = message["username"]
    connection.remote_password = message["password"]

    # send answer
    await websocket.send(
        json.dumps(
            {
                "candidates": [c.to_sdp() for c in connection.local_candidates],
                "password": connection.local_password,
                "username": connection.local_username,
            }
        )
    )

    await websocket.close()

    await connection.connect()
    print("connected")

    # echo data back
    data, component = await connection.recvfrom()
    print("echoing %s on component %d" % (repr(data), component))
    await connection.sendto(data, component)

    await asyncio.sleep(5)
    await connection.close()


async def main() -> None:
    parser = argparse.ArgumentParser(description="ICE tester")
    parser.add_argument("action", choices=["offer", "answer"])
    parser.add_argument("--components", type=int, default=1)
    options = parser.parse_args()

    logging.basicConfig(level=logging.DEBUG)

    if options.action == "offer":
        await offer(options.components)
    else:
        await answer(options.components)


if __name__ == "__main__":
    asyncio.run(main())