File: test_client.py

package info (click to toggle)
python-aiovlc 0.4.2-4
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 488 kB
  • sloc: python: 996; makefile: 16
file content (159 lines) | stat: -rw-r--r-- 4,877 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
"""Test the client."""

from __future__ import annotations

import asyncio
from unittest.mock import AsyncMock, call

import pytest

from aiovlc.client import Client
from aiovlc.exceptions import ConnectError, ConnectReadError


async def test_client_connect_disconnect(transport: AsyncMock, client: Client) -> None:
    """Test the client transport connect and disconnect."""
    await client.connect()

    assert transport.call_count == 1
    assert transport.call_args == call(host="localhost", port=4212)

    await client.disconnect()

    mock_writer: AsyncMock = transport.return_value[1]
    assert mock_writer.close.call_count == 1
    assert mock_writer.wait_closed.call_count == 1

    transport.reset_mock()
    mock_writer.reset_mock()

    async with client:
        assert transport.call_count == 1
        assert transport.call_args == call(host="localhost", port=4212)

    assert mock_writer.close.call_count == 1
    assert mock_writer.wait_closed.call_count == 1


async def test_client_connect_failure(transport: AsyncMock, client: Client) -> None:
    """Test the client transport connect failure."""
    transport.side_effect = OSError("Boom")

    with pytest.raises(ConnectError) as err:
        await client.connect()

    assert str(err.value) == "Failed to connect: Boom"


async def test_client_disconnect_failure(transport: AsyncMock, client: Client) -> None:
    """Test the client transport disconnect failure."""
    mock_writer: AsyncMock = transport.return_value[1]
    mock_writer.wait_closed.side_effect = OSError("Boom")

    await client.connect()
    # Disconnect error should be caught.
    await client.disconnect()

    assert transport.call_count == 1
    assert transport.call_args == call(host="localhost", port=4212)

    assert mock_writer.close.call_count == 1
    assert mock_writer.wait_closed.call_count == 1


async def test_client_read_write(transport: AsyncMock, client: Client) -> None:
    """Test the client transport read and write."""
    mock_reader: AsyncMock = transport.return_value[0]
    mock_writer: AsyncMock = transport.return_value[1]
    bytes_messages = [b"\xff\xfb\x01\xff\xfc\x01\r\n", b"test\n"]
    mock_reader.readuntil.side_effect = bytes_messages

    await client.connect()

    assert transport.call_count == 1
    assert transport.call_args == call(host="localhost", port=4212)

    read = await client.read()
    assert read == "test\n"

    message = "stop\n"

    await client.write("stop\n")
    assert mock_writer.write.call_count == 1
    assert mock_writer.write.call_args == call(message.encode())

    await client.disconnect()

    assert mock_writer.close.call_count == 1
    assert mock_writer.wait_closed.call_count == 1


async def test_client_read_failure(transport: AsyncMock, client: Client) -> None:
    """Test the client transport read failure."""
    mock_reader: AsyncMock = transport.return_value[0]
    mock_writer: AsyncMock = transport.return_value[1]

    await client.connect()

    assert transport.call_count == 1
    assert transport.call_args == call(host="localhost", port=4212)

    mock_reader.readuntil.side_effect = asyncio.LimitOverrunError("Boom", consumed=2)

    with pytest.raises(ConnectReadError) as err:
        await client.read()

    assert str(err.value) == "Failed to read: Boom."

    mock_reader.readuntil.side_effect = asyncio.IncompleteReadError(
        partial=b"partial_test",
        expected=20,
    )

    with pytest.raises(ConnectReadError) as err:
        await client.read()

    assert str(err.value) == (
        "Failed to read: 12 bytes read on a total of 20 expected bytes. "
        "Partial bytes read: b'partial_test'"
    )
    assert err.value.partial_bytes == b"partial_test"

    mock_reader.readuntil.side_effect = OSError("Boom")

    with pytest.raises(ConnectError) as connect_error:
        await client.read()

    assert str(connect_error.value) == "Failed to read: Boom"

    await client.disconnect()

    assert mock_writer.close.call_count == 1
    assert mock_writer.wait_closed.call_count == 1


async def test_client_write_failure(transport: AsyncMock, client: Client) -> None:
    """Test the client transport write failure."""
    mock_reader: AsyncMock = transport.return_value[0]
    mock_writer: AsyncMock = transport.return_value[1]
    bytes_message = b"test\n"
    mock_reader.readuntil.return_value = bytes_message

    await client.connect()

    assert transport.call_count == 1
    assert transport.call_args == call(host="localhost", port=4212)

    read = await client.read()

    mock_writer.write.side_effect = OSError("Boom")

    with pytest.raises(ConnectError) as err:
        await client.write(read)

    assert str(err.value) == "Failed to write: Boom"

    await client.disconnect()

    assert mock_writer.close.call_count == 1
    assert mock_writer.wait_closed.call_count == 1