1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430
|
"""Define tests for the System object."""
from __future__ import annotations
import asyncio
import logging
from collections import deque
from datetime import datetime, timedelta, timezone
from time import time
from typing import Any
from unittest.mock import AsyncMock, Mock
import pytest
from aiohttp.client_exceptions import (
ClientError,
ServerDisconnectedError,
WSServerHandshakeError,
)
from aiohttp.client_reqrep import ClientResponse, RequestInfo
from aiohttp.http_websocket import WSMsgType
from simplipy.const import LOGGER
from simplipy.device import DeviceTypes
from simplipy.errors import (
CannotConnectError,
ConnectionFailedError,
InvalidMessageError,
WebsocketError,
)
from simplipy.websocket import (
EVENT_DISARMED_BY_KEYPAD,
Watchdog,
WebsocketClient,
websocket_event_from_payload,
)
from .common import create_ws_message
@pytest.mark.asyncio
async def test_callbacks(
caplog: Mock, mock_api: Mock, ws_message_event: dict[str, Any], ws_messages: deque
) -> None:
"""Test that callbacks are executed correctly.
Args:
caplog: A mocked logging utility.
mock_api: A mocked API client.
ws_message_event: A websocket event payload.
ws_messages: A queue.
"""
caplog.set_level(logging.INFO)
mock_connect_callback = Mock()
mock_disconnect_callback = Mock()
mock_event_callback = Mock()
async def async_mock_connect_callback() -> None:
"""Define a mock async connect callback."""
LOGGER.info("We are connected!")
client = WebsocketClient(mock_api)
client.add_connect_callback(mock_connect_callback)
client.add_connect_callback(async_mock_connect_callback)
client.add_disconnect_callback(mock_disconnect_callback)
client.add_event_callback(mock_event_callback)
assert mock_connect_callback.call_count == 0
assert mock_disconnect_callback.call_count == 0
assert mock_event_callback.call_count == 0
await client.async_connect()
assert client.connected
await asyncio.sleep(1)
assert mock_connect_callback.call_count == 1
assert any("We are connected!" in e.message for e in caplog.records)
ws_messages.append(create_ws_message(ws_message_event))
await client.async_listen()
await asyncio.sleep(1)
expected_event = websocket_event_from_payload(ws_message_event)
mock_event_callback.assert_called_once_with(expected_event)
await client.async_disconnect()
assert not client.connected
@pytest.mark.asyncio
@pytest.mark.parametrize(
"error",
[
ClientError,
ServerDisconnectedError,
WSServerHandshakeError(Mock(RequestInfo), (Mock(ClientResponse),)),
],
)
async def test_cannot_connect(
error: BaseException, mock_api: Mock, ws_client_session: AsyncMock
) -> None:
"""Test being unable to connect to the websocket.
Args:
error: The error to raise.
mock_api: A mocked API client.
ws_client_session: A mocked websocket client session.
"""
ws_client_session.ws_connect.side_effect = error
client = WebsocketClient(mock_api)
with pytest.raises(CannotConnectError):
await client.async_connect()
assert not client.connected
@pytest.mark.asyncio
async def test_connect_disconnect(mock_api: Mock) -> None:
"""Test connecting and disconnecting the client.
Args:
mock_api: A mocked API client.
"""
client = WebsocketClient(mock_api)
await client.async_connect()
assert client.connected
# Attempt to connect again, which should just return:
await client.async_connect()
await client.async_disconnect()
assert not client.connected
def test_create_event(ws_message_event: dict[str, Any]) -> None:
"""Test creating an event object.
Args:
ws_message_event: A websocket event payload.
"""
event = websocket_event_from_payload(ws_message_event)
assert event.event_type == EVENT_DISARMED_BY_KEYPAD
assert event.info == "System Disarmed by Master PIN"
assert event.system_id == 12345
assert event.timestamp == datetime(2021, 9, 29, 23, 14, 46, tzinfo=timezone.utc)
assert event.changed_by == "Master PIN"
assert event.sensor_name == ""
assert event.sensor_serial == "abcdef12"
assert event.sensor_type == DeviceTypes.KEYPAD
assert event.media_urls is None
def test_create_motion_event(ws_motion_event: dict[str, Any]) -> None:
"""Test creating a motion event object.
Args:
ws_motion_event: A websocket motion event payload with media urls.
"""
event = websocket_event_from_payload(ws_motion_event)
assert event.media_urls is not None
assert event.media_urls["image_url"] == "https://image-url{&width}"
assert event.media_urls["clip_url"] == "https://clip-url"
assert event.media_urls["hls_url"] == "https://hls-url"
@pytest.mark.asyncio
async def test_listen_invalid_message_data(
mock_api: Mock, ws_message_event: dict[str, Any], ws_messages: deque
) -> None:
"""Test websocket message data that should raise on listen.
Args:
mock_api: A mocked API client.
ws_message_event: A websocket event payload.
ws_messages: A queue.
"""
client = WebsocketClient(mock_api)
await client.async_connect()
assert client.connected
ws_message = create_ws_message(ws_message_event)
ws_message.json.side_effect = ValueError("Boom")
ws_messages.append(ws_message)
with pytest.raises(InvalidMessageError):
await client.async_listen()
@pytest.mark.asyncio
async def test_listen(mock_api: Mock) -> None:
"""Test listening to the websocket server.
Args:
mock_api: A mocked API client.
"""
client = WebsocketClient(mock_api)
await client.async_connect()
assert client.connected
# If this succeeds without throwing an exception, listening was successful:
asyncio.create_task(client.async_listen())
await client.async_disconnect()
assert not client.connected
@pytest.mark.asyncio
@pytest.mark.parametrize(
"message_type", [WSMsgType.CLOSE, WSMsgType.CLOSED, WSMsgType.CLOSING]
)
async def test_listen_disconnect_message_types(
message_type: WSMsgType,
mock_api: Mock,
ws_client: AsyncMock,
ws_message_event: dict[str, Any],
ws_messages: deque,
) -> None:
"""Test different websocket message types that stop listen.
Args:
message_type: The message type from the websocket.
mock_api: A mocked API client.
ws_client: A mocked websocket client.
ws_message_event: A websocket event payload.
ws_messages: A queue.
"""
client = WebsocketClient(mock_api)
await client.async_connect()
assert client.connected
ws_message = create_ws_message(ws_message_event)
ws_message.type = message_type
ws_messages.append(ws_message)
# This should break out of the listen loop before handling the received message;
# otherwise there will be an error:
await client.async_listen()
# Assert that we received a message:
ws_client.receive.assert_awaited()
@pytest.mark.asyncio
@pytest.mark.parametrize(
"message_type, exception",
[
(WSMsgType.BINARY, InvalidMessageError),
(WSMsgType.ERROR, ConnectionFailedError),
],
)
async def test_listen_error_message_types(
exception: WebsocketError,
message_type: WSMsgType,
mock_api: Mock,
ws_message_event: dict[str, Any],
ws_messages: deque,
) -> None:
"""Test different websocket message types that should raise on listen.
Args:
exception: The exception being raised.
message_type: The message type from the websocket.
mock_api: A mocked API client.
ws_message_event: A websocket event payload.
ws_messages: A queue.
"""
client = WebsocketClient(mock_api)
await client.async_connect()
assert client.connected
ws_message = create_ws_message(ws_message_event)
ws_message.type = message_type
ws_messages.append(ws_message)
with pytest.raises(exception): # type: ignore[call-overload]
await client.async_listen()
@pytest.mark.asyncio
async def test_reconnect(mock_api: Mock) -> None:
"""Test reconnecting to the websocket.
Args:
mock_api: A mocked API client.
"""
client = WebsocketClient(mock_api)
await client.async_connect()
assert client.connected
await client.async_reconnect()
@pytest.mark.asyncio
async def test_remove_callback_callback(mock_api: Mock) -> None:
"""Test that a removed callback doesn't get executed.
Args:
mock_api: A mocked API client.
"""
mock_callback = Mock()
client = WebsocketClient(mock_api)
remove = client.add_connect_callback(mock_callback)
remove()
await client.async_connect()
assert client.connected
assert mock_callback.call_count == 0
await client.async_disconnect()
assert not client.connected
def test_unknown_event(caplog: Mock, ws_message_event: dict[str, Any]) -> None:
"""Test that an unknown event type is handled correctly.
Args:
caplog: A mocked logging utility.
ws_message_event: A websocket event payload.
"""
ws_message_event["data"]["eventCid"] = 9999
event = websocket_event_from_payload(ws_message_event)
assert event.event_type is None
assert any(
"Encountered unknown websocket event type" in e.message for e in caplog.records
)
def test_unknown_sensor_type_in_event(
caplog: Mock, ws_message_event: dict[str, Any]
) -> None:
"""Test that an unknown sensor type in a websocket event is handled correctly.
Args:
caplog: A mocked logging utility.
ws_message_event: A websocket event payload.
"""
ws_message_event["data"]["sensorType"] = 999
event = websocket_event_from_payload(ws_message_event)
assert event.sensor_type is None
assert any("Encountered unknown device type" in e.message for e in caplog.records)
@pytest.mark.asyncio
async def test_watchdog_async_trigger(caplog: Mock) -> None:
"""Test that the watchdog works with a coroutine as a trigger.
Args:
caplog: A mocked logging utility.
"""
caplog.set_level(logging.INFO)
async def mock_trigger() -> None:
"""Define a mock trigger."""
LOGGER.info("Triggered mock_trigger")
watchdog = Watchdog(mock_trigger, timeout=timedelta(seconds=0))
watchdog.trigger()
assert any("Websocket watchdog triggered" in e.message for e in caplog.records)
await asyncio.sleep(1)
assert any("Websocket watchdog expired" in e.message for e in caplog.records)
assert any("Triggered mock_trigger" in e.message for e in caplog.records)
@pytest.mark.asyncio
async def test_watchdog_cancel(caplog: Mock) -> None:
"""Test that canceling the watchdog resets and stops it.
Args:
caplog: A mocked logging utility.
"""
caplog.set_level(logging.INFO)
async def mock_trigger() -> None:
"""Define a mock trigger."""
LOGGER.info("Triggered mock_trigger")
# We test this by ensuring that, although the watchdog has a 5-second timeout,
# canceling it ensures that task is stopped:
start = time()
watchdog = Watchdog(mock_trigger, timeout=timedelta(seconds=5))
watchdog.trigger()
await asyncio.sleep(1)
watchdog.cancel()
end = time()
assert (end - start) < 5
assert not any("Triggered mock_trigger" in e.message for e in caplog.records)
@pytest.mark.asyncio
async def test_watchdog_quick_trigger(caplog: Mock) -> None:
"""Test that quick triggering of the watchdog resets the timer task.
Args:
caplog: A mocked logging utility.
"""
caplog.set_level(logging.INFO)
mock_trigger = Mock()
watchdog = Watchdog(mock_trigger, timeout=timedelta(seconds=1))
watchdog.trigger()
await asyncio.sleep(1)
watchdog.trigger()
await asyncio.sleep(1)
assert mock_trigger.call_count == 2
@pytest.mark.asyncio
async def test_watchdog_sync_trigger(caplog: Mock) -> None:
"""Test that the watchdog works with a normal function as a trigger.
Args:
caplog: A mocked logging utility.
"""
caplog.set_level(logging.INFO)
mock_trigger = Mock()
watchdog = Watchdog(mock_trigger, timeout=timedelta(seconds=0))
watchdog.trigger()
assert any("Websocket watchdog triggered" in e.message for e in caplog.records)
await asyncio.sleep(1)
assert any("Websocket watchdog expired" in e.message for e in caplog.records)
assert mock_trigger.call_count == 1
|