File: test_contrib_signaling.py

package info (click to toggle)
python-aiortc 1.11.0-1
  • links: PTS, VCS
  • area: main
  • in suites: sid, trixie
  • size: 3,052 kB
  • sloc: python: 20,294; javascript: 321; makefile: 21; sh: 1
file content (210 lines) | stat: -rw-r--r-- 6,533 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
import argparse
import asyncio
import os
from unittest import TestCase

from aiortc import RTCIceCandidate, RTCSessionDescription
from aiortc.contrib.signaling import (
    BYE,
    add_signaling_arguments,
    create_signaling,
    object_from_string,
    object_to_string,
)

from .utils import asynctest


async def delay(coro):
    await asyncio.sleep(0.1)
    return await coro()


offer = RTCSessionDescription(sdp="some-offer", type="offer")
answer = RTCSessionDescription(sdp="some-answer", type="answer")


class SignalingTest(TestCase):
    def setUp(self):
        def mock_print(*args, **kwargs):
            pass

        # hijack print()
        self.original_print = __builtins__["print"]
        __builtins__["print"] = mock_print

    def tearDown(self):
        # restore print()
        __builtins__["print"] = self.original_print

    @asynctest
    async def test_copy_and_paste(self):
        parser = argparse.ArgumentParser()
        add_signaling_arguments(parser)
        args = parser.parse_args(["-s", "copy-and-paste"])

        sig_server = create_signaling(args)
        sig_client = create_signaling(args)

        def make_pipes():
            r, w = os.pipe()
            return os.fdopen(r, "r"), os.fdopen(w, "w")

        # mock out read / write pipes
        sig_server._read_pipe, sig_client._write_pipe = make_pipes()
        sig_client._read_pipe, sig_server._write_pipe = make_pipes()

        # connect
        await sig_server.connect()
        await sig_client.connect()

        res = await asyncio.gather(sig_server.send(offer), delay(sig_client.receive))
        self.assertEqual(res[1], offer)

        res = await asyncio.gather(sig_client.send(answer), delay(sig_server.receive))
        self.assertEqual(res[1], answer)

        await asyncio.gather(sig_server.close(), sig_client.close())

        # cleanup mocks
        sig_client._write_pipe.close()
        sig_server._write_pipe.close()

    @asynctest
    async def test_tcp_socket(self):
        parser = argparse.ArgumentParser()
        add_signaling_arguments(parser)
        args = parser.parse_args(["-s", "tcp-socket"])

        sig_server = create_signaling(args)
        sig_client = create_signaling(args)

        # connect
        await sig_server.connect()
        await sig_client.connect()

        res = await asyncio.gather(sig_server.send(offer), delay(sig_client.receive))
        self.assertEqual(res[1], offer)

        res = await asyncio.gather(sig_client.send(answer), delay(sig_server.receive))
        self.assertEqual(res[1], answer)

        await asyncio.gather(sig_server.close(), sig_client.close())

    @asynctest
    async def test_tcp_socket_abrupt_disconnect(self):
        parser = argparse.ArgumentParser()
        add_signaling_arguments(parser)
        args = parser.parse_args(["-s", "tcp-socket"])

        sig_server = create_signaling(args)
        sig_client = create_signaling(args)

        # connect
        await sig_server.connect()
        await sig_client.connect()

        res = await asyncio.gather(sig_server.send(offer), delay(sig_client.receive))
        self.assertEqual(res[1], offer)

        # break connection
        sig_client._writer.close()
        sig_server._writer.close()

        res = await sig_server.receive()
        self.assertIsNone(res)

        res = await sig_client.receive()
        self.assertIsNone(res)

        await asyncio.gather(sig_server.close(), sig_client.close())

    @asynctest
    async def test_unix_socket(self):
        parser = argparse.ArgumentParser()
        add_signaling_arguments(parser)
        args = parser.parse_args(["-s", "unix-socket"])

        sig_server = create_signaling(args)
        sig_client = create_signaling(args)

        # connect
        await sig_server.connect()
        await sig_client.connect()

        res = await asyncio.gather(sig_server.send(offer), delay(sig_client.receive))
        self.assertEqual(res[1], offer)

        res = await asyncio.gather(sig_client.send(answer), delay(sig_server.receive))
        self.assertEqual(res[1], answer)

        await asyncio.gather(sig_server.close(), sig_client.close())

    @asynctest
    async def test_unix_socket_abrupt_disconnect(self):
        parser = argparse.ArgumentParser()
        add_signaling_arguments(parser)
        args = parser.parse_args(["-s", "unix-socket"])

        sig_server = create_signaling(args)
        sig_client = create_signaling(args)

        # connect
        await sig_server.connect()
        await sig_client.connect()

        res = await asyncio.gather(sig_server.send(offer), delay(sig_client.receive))
        self.assertEqual(res[1], offer)

        # break connection
        sig_client._writer.close()
        sig_server._writer.close()

        res = await sig_server.receive()
        self.assertIsNone(res)

        res = await sig_client.receive()
        self.assertIsNone(res)

        await asyncio.gather(sig_server.close(), sig_client.close())


class SignalingUtilsTest(TestCase):
    def test_bye_from_string(self):
        self.assertEqual(object_from_string('{"type": "bye"}'), BYE)

    def test_bye_to_string(self):
        self.assertEqual(object_to_string(BYE), '{"type": "bye"}')

    def test_candidate_from_string(self):
        candidate = object_from_string(
            '{"candidate": "candidate:0 1 UDP 2122252543 192.168.99.7 33543 typ host", '
            '"id": "audio", "label": 0, "type": "candidate"}'
        )
        self.assertEqual(candidate.component, 1)
        self.assertEqual(candidate.foundation, "0")
        self.assertEqual(candidate.ip, "192.168.99.7")
        self.assertEqual(candidate.port, 33543)
        self.assertEqual(candidate.priority, 2122252543)
        self.assertEqual(candidate.protocol, "UDP")
        self.assertEqual(candidate.sdpMid, "audio")
        self.assertEqual(candidate.sdpMLineIndex, 0)
        self.assertEqual(candidate.type, "host")

    def test_candidate_to_string(self):
        candidate = RTCIceCandidate(
            component=1,
            foundation="0",
            ip="192.168.99.7",
            port=33543,
            priority=2122252543,
            protocol="UDP",
            type="host",
        )
        candidate.sdpMid = "audio"
        candidate.sdpMLineIndex = 0
        self.assertEqual(
            object_to_string(candidate),
            '{"candidate": "candidate:0 1 UDP 2122252543 192.168.99.7 33543 typ host", '
            '"id": "audio", "label": 0, "type": "candidate"}',
        )