File: test_auth.py

package info (click to toggle)
aiortsp 1.4.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 296 kB
  • sloc: python: 2,338; makefile: 7; sh: 5
file content (107 lines) | stat: -rw-r--r-- 3,318 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
import pytest

from aiortsp.rtsp.auth import DigestAuth, BasicAuth
from aiortsp.rtsp.parser import RTSPResponse


@pytest.mark.parametrize('user, passwd, method, url, req, repl', [

    ('root', 'admin123', 'DESCRIBE', 'rtsp://cam/axis-media/media.amp', {
        # Challenge
        'realm': "AXIS_ACCC8E000AA9",
        'nonce': "0024e47aY398109708de9ccd8056c58a068a59540a99d3"
    }, {
        # Response
        "username": "root",
        'realm': "AXIS_ACCC8E000AA9",
        'nonce': "0024e47aY398109708de9ccd8056c58a068a59540a99d3",
        'uri': "rtsp://cam/axis-media/media.amp",
        'response': "7daaf0f4e40fdff42cff28260f37914d",
    }),

    ('test', 'test123', 'DESCRIBE', 'rtsp://recorder:654/00000001-0000-babe-0000-accc8e000aa7/live', {
        # Challenge
        'realm': "media@genetec.com",
        'nonce': "900fa9ee25fb4d5e919fa17c2cd032f7",
        'opque': "0bd45fa6a89e4873a4c80ecf6287611f",
        'qop': "auth",
        'stale': "FALSE",
        'algorithm': "MD5"
    }, {
        # Response
        "username": "test",
        'realm': "media@genetec.com",
        'nonce': "900fa9ee25fb4d5e919fa17c2cd032f7",
        'uri': "rtsp://recorder:654/00000001-0000-babe-0000-accc8e000aa7/live",
        'response': "85ab8f66f2930845b1ed05742a2ad8b4",
    }),
])
def test_digest(user, passwd, method, url, req, repl):
    auth = DigestAuth(username=user, password=passwd)

    resp = RTSPResponse()

    auth_header = ', '.join(f'{k}="{v}"' for k, v in req.items())
    data, done = resp.feed(
        """RTSP/1.0 401 Unauthorized\r\nCSeq: 0\r\nWWW-Authenticate: Digest {}\r\n\r\n""".format(auth_header).encode())

    assert not data
    assert done

    assert resp.status == 401

    retry = auth.handle_401(resp.headers)
    assert retry is True

    # Force the case where auth is to be redone
    auth.handle_ok(resp.headers)
    retry = auth.handle_401(resp.headers)
    assert retry is True

    auth_args = auth._prepare_digest_header(method=method, url=url)

    assert auth_args == repl

    # Force the case where auth is to be redone
    auth.handle_ok({
        'authentication-info': 'qop="auth", nextnonce="deadb00b"'
    })
    assert auth.info['nonce'] == 'deadb00b'


@pytest.mark.parametrize('user, passwd, method, url, req, repl', [

    # Against Axis
    ('root', 'admin123', 'DESCRIBE', 'rtsp://cam/axis-media/media.amp', {
        # Challenge
        'realm': "AXIS_ACCC8E000AA9",
        'nonce': "0024e47aY398109708de9ccd8056c58a068a59540a99d3"
    }, 'Basic cm9vdDphZG1pbjEyMw=='),
])
def test_basic(user, passwd, method, url, req, repl):
    auth = BasicAuth(username=user, password=passwd)

    resp = RTSPResponse()

    auth_header = ', '.join(f'{k}="{v}"' for k, v in req.items())
    data, done = resp.feed(
        """RTSP/1.0 401 Unauthorized\r\nCSeq: 0\r\nWWW-Authenticate: Basic {}\r\n\r\n""".format(auth_header).encode())

    assert not data
    assert done

    assert resp.status == 401

    retry = auth.handle_401(resp.headers)
    assert retry is True

    # Force the case where auth is to be redone
    auth.handle_ok(resp.headers)
    retry = auth.handle_401(resp.headers)
    assert retry is True

    headers = {}
    auth.make_auth(method, url, headers)

    assert 'Authorization' in headers
    assert headers['Authorization'] == repl