1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159
|
"""Test the client."""
from __future__ import annotations
import asyncio
from unittest.mock import AsyncMock, call
import pytest
from aiovlc.client import Client
from aiovlc.exceptions import ConnectError, ConnectReadError
async def test_client_connect_disconnect(transport: AsyncMock, client: Client) -> None:
"""Test the client transport connect and disconnect."""
await client.connect()
assert transport.call_count == 1
assert transport.call_args == call(host="localhost", port=4212)
await client.disconnect()
mock_writer: AsyncMock = transport.return_value[1]
assert mock_writer.close.call_count == 1
assert mock_writer.wait_closed.call_count == 1
transport.reset_mock()
mock_writer.reset_mock()
async with client:
assert transport.call_count == 1
assert transport.call_args == call(host="localhost", port=4212)
assert mock_writer.close.call_count == 1
assert mock_writer.wait_closed.call_count == 1
async def test_client_connect_failure(transport: AsyncMock, client: Client) -> None:
"""Test the client transport connect failure."""
transport.side_effect = OSError("Boom")
with pytest.raises(ConnectError) as err:
await client.connect()
assert str(err.value) == "Failed to connect: Boom"
async def test_client_disconnect_failure(transport: AsyncMock, client: Client) -> None:
"""Test the client transport disconnect failure."""
mock_writer: AsyncMock = transport.return_value[1]
mock_writer.wait_closed.side_effect = OSError("Boom")
await client.connect()
# Disconnect error should be caught.
await client.disconnect()
assert transport.call_count == 1
assert transport.call_args == call(host="localhost", port=4212)
assert mock_writer.close.call_count == 1
assert mock_writer.wait_closed.call_count == 1
async def test_client_read_write(transport: AsyncMock, client: Client) -> None:
"""Test the client transport read and write."""
mock_reader: AsyncMock = transport.return_value[0]
mock_writer: AsyncMock = transport.return_value[1]
bytes_messages = [b"\xff\xfb\x01\xff\xfc\x01\r\n", b"test\n"]
mock_reader.readuntil.side_effect = bytes_messages
await client.connect()
assert transport.call_count == 1
assert transport.call_args == call(host="localhost", port=4212)
read = await client.read()
assert read == "test\n"
message = "stop\n"
await client.write("stop\n")
assert mock_writer.write.call_count == 1
assert mock_writer.write.call_args == call(message.encode())
await client.disconnect()
assert mock_writer.close.call_count == 1
assert mock_writer.wait_closed.call_count == 1
async def test_client_read_failure(transport: AsyncMock, client: Client) -> None:
"""Test the client transport read failure."""
mock_reader: AsyncMock = transport.return_value[0]
mock_writer: AsyncMock = transport.return_value[1]
await client.connect()
assert transport.call_count == 1
assert transport.call_args == call(host="localhost", port=4212)
mock_reader.readuntil.side_effect = asyncio.LimitOverrunError("Boom", consumed=2)
with pytest.raises(ConnectReadError) as err:
await client.read()
assert str(err.value) == "Failed to read: Boom."
mock_reader.readuntil.side_effect = asyncio.IncompleteReadError(
partial=b"partial_test",
expected=20,
)
with pytest.raises(ConnectReadError) as err:
await client.read()
assert str(err.value) == (
"Failed to read: 12 bytes read on a total of 20 expected bytes. "
"Partial bytes read: b'partial_test'"
)
assert err.value.partial_bytes == b"partial_test"
mock_reader.readuntil.side_effect = OSError("Boom")
with pytest.raises(ConnectError) as connect_error:
await client.read()
assert str(connect_error.value) == "Failed to read: Boom"
await client.disconnect()
assert mock_writer.close.call_count == 1
assert mock_writer.wait_closed.call_count == 1
async def test_client_write_failure(transport: AsyncMock, client: Client) -> None:
"""Test the client transport write failure."""
mock_reader: AsyncMock = transport.return_value[0]
mock_writer: AsyncMock = transport.return_value[1]
bytes_message = b"test\n"
mock_reader.readuntil.return_value = bytes_message
await client.connect()
assert transport.call_count == 1
assert transport.call_args == call(host="localhost", port=4212)
read = await client.read()
mock_writer.write.side_effect = OSError("Boom")
with pytest.raises(ConnectError) as err:
await client.write(read)
assert str(err.value) == "Failed to write: Boom"
await client.disconnect()
assert mock_writer.close.call_count == 1
assert mock_writer.wait_closed.call_count == 1
|