File: pahomqtt_fixtures.py

package info (click to toggle)
python-roborock 4.17.2-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 2,528 kB
  • sloc: python: 17,280; makefile: 18; sh: 6
file content (102 lines) | stat: -rw-r--r-- 3,919 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
"""Common code for MQTT tests."""

import logging
import warnings
from collections.abc import Callable, Generator
from queue import Queue
from typing import Any
from unittest.mock import Mock, patch

import pytest

from .logging import CapturedRequestLog
from .mqtt import FakeMqttSocketHandler

pytest_plugins = [
    "tests.fixtures.logging_fixtures",
]

_LOGGER = logging.getLogger(__name__)

# Used by fixtures to handle incoming requests and prepare responses
MqttRequestHandler = Callable[[bytes], bytes | None]


@pytest.fixture(name="mock_paho_mqtt_create_connection")
def create_connection_fixture(mock_sock: Mock) -> Generator[None, None, None]:
    """Fixture that overrides the MQTT socket creation to wire it up to the mock socket."""
    with patch("paho.mqtt.client.socket.create_connection", return_value=mock_sock):
        yield


@pytest.fixture(name="mock_paho_mqtt_select")
def select_fixture(mock_sock: Mock, fake_mqtt_socket_handler: FakeMqttSocketHandler) -> Generator[None, None, None]:
    """Fixture that overrides the MQTT client select calls to make select work on the mock socket.

    This patch select to activate our mock socket when ready with data. Internal mqtt sockets are
    always ready since they are used internally to wake the select loop. Ours is ready if there
    is data in the buffer.
    """

    def is_ready(sock: Any) -> bool:
        return sock is not mock_sock or (fake_mqtt_socket_handler.pending() > 0)

    def handle_select(rlist: list, wlist: list, *args: Any) -> list:
        return [list(filter(is_ready, rlist)), list(filter(is_ready, wlist))]

    with patch("paho.mqtt.client.select.select", side_effect=handle_select):
        yield


@pytest.fixture(name="fake_mqtt_socket_handler")
def fake_mqtt_socket_handler_fixture(
    mqtt_request_handler: MqttRequestHandler, mqtt_response_queue: Queue[bytes], log: CapturedRequestLog
) -> Generator[FakeMqttSocketHandler, None, None]:
    """Fixture that creates a fake MQTT broker."""
    socket_handler = FakeMqttSocketHandler(mqtt_request_handler, mqtt_response_queue, log)
    yield socket_handler
    if len(socket_handler.response_buf.getvalue()) > 0:
        warnings.warn("Some enqueued MQTT responses were not consumed during the test")


@pytest.fixture(name="mock_sock")
def mock_sock_fixture(fake_mqtt_socket_handler: FakeMqttSocketHandler) -> Mock:
    """Fixture that creates a mock socket connection and wires it to the handler."""
    mock_sock = Mock()
    mock_sock.recv = fake_mqtt_socket_handler.handle_socket_recv
    mock_sock.send = fake_mqtt_socket_handler.handle_socket_send
    mock_sock.pending = fake_mqtt_socket_handler.pending
    return mock_sock


@pytest.fixture(name="mqtt_received_requests")
def received_requests_fixture() -> Queue[bytes]:
    """Fixture that provides access to the received requests."""
    return Queue()


@pytest.fixture(name="mqtt_response_queue")
def response_queue_fixture() -> Generator[Queue[bytes], None, None]:
    """Fixture that provides a queue for enqueueing responses to be sent to the client under test."""
    response_queue: Queue[bytes] = Queue()
    yield response_queue
    if not response_queue.empty():
        warnings.warn("Some enqueued MQTT responses were not consumed during the test")


@pytest.fixture(name="mqtt_request_handler")
def mqtt_request_handler_fixture(
    mqtt_received_requests: Queue[bytes], mqtt_response_queue: Queue[bytes]
) -> MqttRequestHandler:
    """Fixture records incoming requests and replies with responses from the queue."""

    def handle_request(client_request: bytes) -> bytes | None:
        """Handle an incoming request from the client."""
        mqtt_received_requests.put(client_request)

        # Insert a prepared response into the response buffer
        if not mqtt_response_queue.empty():
            return mqtt_response_queue.get()
        return None

    return handle_request