1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210
|
import argparse
import asyncio
import os
from unittest import TestCase
from aiortc import RTCIceCandidate, RTCSessionDescription
from aiortc.contrib.signaling import (
BYE,
add_signaling_arguments,
create_signaling,
object_from_string,
object_to_string,
)
from .utils import asynctest
async def delay(coro):
await asyncio.sleep(0.1)
return await coro()
offer = RTCSessionDescription(sdp="some-offer", type="offer")
answer = RTCSessionDescription(sdp="some-answer", type="answer")
class SignalingTest(TestCase):
def setUp(self):
def mock_print(*args, **kwargs):
pass
# hijack print()
self.original_print = __builtins__["print"]
__builtins__["print"] = mock_print
def tearDown(self):
# restore print()
__builtins__["print"] = self.original_print
@asynctest
async def test_copy_and_paste(self):
parser = argparse.ArgumentParser()
add_signaling_arguments(parser)
args = parser.parse_args(["-s", "copy-and-paste"])
sig_server = create_signaling(args)
sig_client = create_signaling(args)
def make_pipes():
r, w = os.pipe()
return os.fdopen(r, "r"), os.fdopen(w, "w")
# mock out read / write pipes
sig_server._read_pipe, sig_client._write_pipe = make_pipes()
sig_client._read_pipe, sig_server._write_pipe = make_pipes()
# connect
await sig_server.connect()
await sig_client.connect()
res = await asyncio.gather(sig_server.send(offer), delay(sig_client.receive))
self.assertEqual(res[1], offer)
res = await asyncio.gather(sig_client.send(answer), delay(sig_server.receive))
self.assertEqual(res[1], answer)
await asyncio.gather(sig_server.close(), sig_client.close())
# cleanup mocks
sig_client._write_pipe.close()
sig_server._write_pipe.close()
@asynctest
async def test_tcp_socket(self):
parser = argparse.ArgumentParser()
add_signaling_arguments(parser)
args = parser.parse_args(["-s", "tcp-socket"])
sig_server = create_signaling(args)
sig_client = create_signaling(args)
# connect
await sig_server.connect()
await sig_client.connect()
res = await asyncio.gather(sig_server.send(offer), delay(sig_client.receive))
self.assertEqual(res[1], offer)
res = await asyncio.gather(sig_client.send(answer), delay(sig_server.receive))
self.assertEqual(res[1], answer)
await asyncio.gather(sig_server.close(), sig_client.close())
@asynctest
async def test_tcp_socket_abrupt_disconnect(self):
parser = argparse.ArgumentParser()
add_signaling_arguments(parser)
args = parser.parse_args(["-s", "tcp-socket"])
sig_server = create_signaling(args)
sig_client = create_signaling(args)
# connect
await sig_server.connect()
await sig_client.connect()
res = await asyncio.gather(sig_server.send(offer), delay(sig_client.receive))
self.assertEqual(res[1], offer)
# break connection
sig_client._writer.close()
sig_server._writer.close()
res = await sig_server.receive()
self.assertIsNone(res)
res = await sig_client.receive()
self.assertIsNone(res)
await asyncio.gather(sig_server.close(), sig_client.close())
@asynctest
async def test_unix_socket(self):
parser = argparse.ArgumentParser()
add_signaling_arguments(parser)
args = parser.parse_args(["-s", "unix-socket"])
sig_server = create_signaling(args)
sig_client = create_signaling(args)
# connect
await sig_server.connect()
await sig_client.connect()
res = await asyncio.gather(sig_server.send(offer), delay(sig_client.receive))
self.assertEqual(res[1], offer)
res = await asyncio.gather(sig_client.send(answer), delay(sig_server.receive))
self.assertEqual(res[1], answer)
await asyncio.gather(sig_server.close(), sig_client.close())
@asynctest
async def test_unix_socket_abrupt_disconnect(self):
parser = argparse.ArgumentParser()
add_signaling_arguments(parser)
args = parser.parse_args(["-s", "unix-socket"])
sig_server = create_signaling(args)
sig_client = create_signaling(args)
# connect
await sig_server.connect()
await sig_client.connect()
res = await asyncio.gather(sig_server.send(offer), delay(sig_client.receive))
self.assertEqual(res[1], offer)
# break connection
sig_client._writer.close()
sig_server._writer.close()
res = await sig_server.receive()
self.assertIsNone(res)
res = await sig_client.receive()
self.assertIsNone(res)
await asyncio.gather(sig_server.close(), sig_client.close())
class SignalingUtilsTest(TestCase):
def test_bye_from_string(self):
self.assertEqual(object_from_string('{"type": "bye"}'), BYE)
def test_bye_to_string(self):
self.assertEqual(object_to_string(BYE), '{"type": "bye"}')
def test_candidate_from_string(self):
candidate = object_from_string(
'{"candidate": "candidate:0 1 UDP 2122252543 192.168.99.7 33543 typ host", '
'"id": "audio", "label": 0, "type": "candidate"}'
)
self.assertEqual(candidate.component, 1)
self.assertEqual(candidate.foundation, "0")
self.assertEqual(candidate.ip, "192.168.99.7")
self.assertEqual(candidate.port, 33543)
self.assertEqual(candidate.priority, 2122252543)
self.assertEqual(candidate.protocol, "UDP")
self.assertEqual(candidate.sdpMid, "audio")
self.assertEqual(candidate.sdpMLineIndex, 0)
self.assertEqual(candidate.type, "host")
def test_candidate_to_string(self):
candidate = RTCIceCandidate(
component=1,
foundation="0",
ip="192.168.99.7",
port=33543,
priority=2122252543,
protocol="UDP",
type="host",
)
candidate.sdpMid = "audio"
candidate.sdpMLineIndex = 0
self.assertEqual(
object_to_string(candidate),
'{"candidate": "candidate:0 1 UDP 2122252543 192.168.99.7 33543 typ host", '
'"id": "audio", "label": 0, "type": "candidate"}',
)
|