1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155
|
from __future__ import annotations
import base64
from collections.abc import Awaitable, Callable
from typing import Any, cast
import aiohttp
import pytest
from aiohttp import ClientSession, web
from aiohttp.test_utils import TestClient, TestServer
from rtsp_to_webrtc.client import Client
from rtsp_to_webrtc.exceptions import ResponseError
SERVER_URL = "https://example.com"
RTSP_URL = "rtsps://example"
OFFER_SDP = "v=0\r\no=carol 28908764872 28908764872 IN IP4 100.3.6.6\r\n..."
ANSWER_SDP = "v=0\r\no=bob 2890844730 2890844730 IN IP4 h.example.com\r\n..."
ANSWER_PAYLOAD = base64.b64encode(ANSWER_SDP.encode("utf-8")).decode("utf-8")
@pytest.fixture(autouse=True)
def setup_handler(
app: web.Application,
request_handler: Callable[[aiohttp.web.Request], Awaitable[aiohttp.web.Response]],
) -> None:
app.router.add_get("/static", request_handler)
app.router.add_post("/stream", request_handler)
@pytest.fixture
def cli(
loop: Any,
app: web.Application,
aiohttp_client: Callable[[web.Application], Awaitable[TestClient]],
) -> TestClient:
"""Creates a fake aiohttp client."""
client = loop.run_until_complete(aiohttp_client(app))
return cast(TestClient, client)
async def test_offer(cli: TestClient) -> None:
"""Test successful response from RTSPtoWebRTC server."""
assert isinstance(cli.server, TestServer)
cli.server.app["response"].append(
aiohttp.web.json_response({"sdp64": ANSWER_PAYLOAD})
)
client = Client(cast(ClientSession, cli))
answer_sdp = await client.offer(OFFER_SDP, RTSP_URL)
assert answer_sdp == ANSWER_SDP
async def test_response_missing_answer(cli: TestClient) -> None:
"""Test invalid response from RTSPtoWebRTC server."""
assert isinstance(cli.server, TestServer)
cli.server.app["response"].append(aiohttp.web.json_response({}))
client = Client(cast(ClientSession, cli))
with pytest.raises(ResponseError, match=r".*missing SDP Answer.*"):
await client.offer(OFFER_SDP, RTSP_URL)
async def test_server_failure(cli: TestClient) -> None:
"""Test a failure talking to RTSPtoWebRTC server."""
assert isinstance(cli.server, TestServer)
cli.server.app["response"].append(aiohttp.web.Response(status=502))
client = Client(cast(ClientSession, cli))
with pytest.raises(ResponseError, match=r"server failure.*"):
await client.offer(OFFER_SDP, RTSP_URL)
async def test_server_failure_with_error(cli: TestClient) -> None:
"""Test invalid response from RTSPtoWebRTC server."""
assert isinstance(cli.server, TestServer)
cli.server.app["response"].append(
aiohttp.web.json_response({"error": "a message"}, status=502)
)
client = Client(cast(ClientSession, cli))
with pytest.raises(ResponseError, match=r"server failure:.*a message.*"):
await client.offer(OFFER_SDP, RTSP_URL)
async def test_heartbeat(cli: TestClient) -> None:
"""Test successful response from RTSPtoWebRTC server."""
assert isinstance(cli.server, TestServer)
cli.server.app["response"].extend(
[
aiohttp.web.Response(status=200),
aiohttp.web.Response(status=502),
aiohttp.web.Response(status=404),
aiohttp.web.Response(status=200),
]
)
client = Client(cast(ClientSession, cli))
await client.heartbeat()
with pytest.raises(ResponseError):
await client.heartbeat()
with pytest.raises(ResponseError):
await client.heartbeat()
await client.heartbeat()
async def test_offer_stream_id(cli: TestClient) -> None:
"""Test offer with explicit stream id API."""
assert isinstance(cli.server, TestServer)
cli.server.app["response"].append(
aiohttp.web.json_response({"sdp64": ANSWER_PAYLOAD})
)
client = Client(cast(ClientSession, cli))
answer_sdp = await client.offer_stream_id("stream_id", OFFER_SDP, RTSP_URL)
assert answer_sdp == ANSWER_SDP
requests = cli.server.app["request"]
assert requests == [
"/stream",
]
assert cli.server.app["request-post"] == [
{
"url": "rtsps://example",
"sdp64": "dj0wDQpvPWNhcm9sIDI4OTA4NzY0ODcyIDI4OTA4NzY0ODcyIElOIElQNCAxMDAuMy42LjYNCi4uLg==",
}
]
async def test_offer_with_channel_data(cli: TestClient) -> None:
"""Test that the channel data is passed, though likely ignored by server."""
assert isinstance(cli.server, TestServer)
cli.server.app["response"].append(
aiohttp.web.json_response({"sdp64": ANSWER_PAYLOAD})
)
client = Client(cast(ClientSession, cli))
answer_sdp = await client.offer_stream_id(
"stream_id", OFFER_SDP, RTSP_URL, channel_data={"insecure_skip_verify": True}
)
assert answer_sdp == ANSWER_SDP
requests = cli.server.app["request"]
assert requests == [
"/stream",
]
assert cli.server.app["request-post"] == [
{
"url": "rtsps://example",
"sdp64": "dj0wDQpvPWNhcm9sIDI4OTA4NzY0ODcyIDI4OTA4NzY0ODcyIElOIElQNCAxMDAuMy42LjYNCi4uLg==",
"insecure_skip_verify": "True",
}
]
|