File: test_reader.py

package info (click to toggle)
aiortsp 1.4.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 296 kB
  • sloc: python: 2,338; makefile: 7; sh: 5
file content (50 lines) | stat: -rw-r--r-- 1,363 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import asyncio
import logging
import sys

import pytest
from dpkt.rtp import RTP

from aiortsp.rtsp.reader import RTSPReader
from tests.test_session import handle_client_auth


@pytest.mark.asyncio
async def test_reader():
    count = 0
    server = await asyncio.start_server(handle_client_auth, '127.0.0.1', 5554)
    try:
        async with RTSPReader('rtspt://127.0.0.1:5554/media.sdp', timeout=2) as reader:
            async for pkt in reader.iter_packets():
                assert isinstance(pkt, RTP)
                count += 1

                if count >= 2:
                    server.close()

        assert count == 2
    finally:
        server.close()


@pytest.mark.asyncio
async def test_reader_reconnect():
    logging.basicConfig()
    count = 0

    server = await asyncio.start_server(handle_client_auth, '127.0.0.1', 5554)
    try:
        async with RTSPReader('rtspt://127.0.0.1:5554/media.sdp', run_loop=True, timeout=2, log_level=10) as reader:
            async for pkt in reader.iter_packets():
                print('PKT', len(pkt))
                assert isinstance(pkt, RTP)
                count += 1

                if count == 2:
                    # closing connection to test reconnect
                    reader.connection.close()

                if count == 4:
                    break
    finally:
        server.close()