File: test_connection.py

package info (click to toggle)
aiortsp 1.4.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 296 kB
  • sloc: python: 2,338; makefile: 7; sh: 5
file content (128 lines) | stat: -rw-r--r-- 4,541 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
import asyncio
import functools
import sys

import pytest

from aiortsp.rtsp.auth import DigestAuth
from aiortsp.rtsp.connection import RTSPConnection
from aiortsp.rtsp.errors import RTSPResponseError
from aiortsp.rtsp.parser import RTSPParser


async def handle_client(queue, client_reader, client_writer):

    count = 0
    parser = RTSPParser()

    while True:
        data = await client_reader.read(10000)

        for msg in parser.parse(data):
            if count == 0:
                assert msg.content == 'Dumb ass!'
                assert msg.headers['content-length'] == str(len(b'Dumb ass!'))

            resp = f"""RTSP/1.0 200 OK\r
CSeq: {msg.cseq}\r
\r
"""
            if count == 0:
                resp += f"""ANNOUNCE rtsp://foo/bar.avi RTSP/1.0\r
CSeq: {msg.cseq + 1}\r
\r
"""
            client_writer.write(resp.encode())
            count += 1
            queue.put_nowait(msg)


@pytest.mark.asyncio
async def test_client_connection():
    queue = asyncio.Queue()

    server = await asyncio.start_server(functools.partial(handle_client, queue), '127.0.0.1', 5554)
    try:
        async with RTSPConnection('127.0.0.1', 5554, 'foo', 'bar') as conn:

            # Send an OPTIONS request
            resp = await conn.send_request('OPTIONS', '*', timeout=2, body=b'Dumb ass!')
            assert resp
            assert resp.status == 200

            # Ensure the other side have seen a request
            req = await asyncio.wait_for(queue.get(), 2)
            assert req.type == 'request'
            assert req.method == 'OPTIONS'
            assert req.cseq == resp.cseq

            # Then it should have sent an announce, and got a 551 reply
            req = await asyncio.wait_for(queue.get(), 2)
            assert req.type == 'response'
            assert req.status == 551
            assert req.cseq == resp.cseq + 1
    finally:
        server.close()


async def handle_client_auth(client_reader, client_writer):
    parser = RTSPParser()

    while True:
        data = await client_reader.read(10000)
        for msg in parser.parse(data):
            authorized = False
            if 'authorization' in msg.headers:
                # Check it!
                params = DigestAuth._parse_digest_header(msg.headers['authorization'].split(' ', 1)[-1])
                print("PARAMS", params)
                if params.get('nonce') == '0024e47aY398109708de9ccd8056c58a068a59540a99d3' and \
                    params.get('realm') == 'AXIS_ACCC8E000AA9' and \
                    params.get('opaque') == '123soleil' and \
                    params.get('uri') == 'rtsp://cam/axis-media/media.amp' and \
                    params.get('username') == 'root' and \
                    params.get('response') == '7daaf0f4e40fdff42cff28260f37914d':
                    authorized = True

            if authorized:
                client_writer.write("""RTSP/1.0 200 OK\r\nCSeq: {}\r\n\r\n""".format(msg.headers.get('cseq', 0)).encode())
            else:
                client_writer.write("""RTSP/1.0 401 Unauthorized\r
CSeq: {}\r
WWW-Authenticate: Digest realm="AXIS_ACCC8E000AA9", nonce="0024e47aY398109708de9ccd8056c58a068a59540a99d3", opaque="123soleil"\r
\r
""".format(msg.headers.get('cseq', 0)).encode())


@pytest.mark.asyncio
async def test_client_auth_no_credentials():
    server = await asyncio.start_server(handle_client_auth, '127.0.0.1', 5554)
    try:
        async with RTSPConnection('127.0.0.1', 5554) as conn:
            with pytest.raises(RTSPResponseError):
                await conn.send_request('DESCRIBE', 'rtsp://cam/axis-media/media.amp', timeout=2)
    finally:
        server.close()


@pytest.mark.asyncio
async def test_client_auth_invalid_credentials():
    server = await asyncio.start_server(handle_client_auth, '127.0.0.1', 5554)
    try:
        async with RTSPConnection('127.0.0.1', 5554, username='toto', password='hello') as conn:
            with pytest.raises(RTSPResponseError):
                await conn.send_request('DESCRIBE', 'rtsp://cam/axis-media/media.amp', timeout=2)
    finally:
        server.close()


@pytest.mark.asyncio
async def test_client_auth():
    server = await asyncio.start_server(handle_client_auth, '127.0.0.1', 5554)
    try:
        async with RTSPConnection('127.0.0.1', 5554, username='root', password='admin123') as conn:
            resp = await conn.send_request('DESCRIBE', 'rtsp://cam/axis-media/media.amp', timeout=2)
            assert resp
            assert resp.status == 200
    finally:
        server.close()