File: channel_fixtures.py

package info (click to toggle)
python-roborock 4.10.0-1
  • links: PTS, VCS
  • area: main
  • in suites:
  • size: 2,476 kB
  • sloc: python: 16,570; makefile: 17; sh: 6
file content (53 lines) | stat: -rw-r--r-- 2,154 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
from collections.abc import Callable
from unittest.mock import AsyncMock, MagicMock

from roborock.mqtt.health_manager import HealthManager
from roborock.protocols.v1_protocol import LocalProtocolVersion
from roborock.roborock_message import RoborockMessage


class FakeChannel:
    """A fake channel that handles publish and subscribe calls."""

    def __init__(self):
        """Initialize the fake channel."""
        self.subscribers: list[Callable[[RoborockMessage], None]] = []
        self.published_messages: list[RoborockMessage] = []
        self.response_queue: list[RoborockMessage] = []
        self._is_connected = False
        self.publish_side_effect: Exception | None = None
        self.publish = AsyncMock(side_effect=self._publish)
        self.subscribe = AsyncMock(side_effect=self._subscribe)
        self.connect = AsyncMock(side_effect=self._connect)
        self.close = MagicMock(side_effect=self._close)
        self.protocol_version = LocalProtocolVersion.V1
        self.restart = AsyncMock()
        self.health_manager = HealthManager(self.restart)

    async def _connect(self) -> None:
        self._is_connected = True

    def _close(self) -> None:
        self._is_connected = False

    @property
    def is_connected(self) -> bool:
        """Return true if connected."""
        return self._is_connected

    async def _publish(self, message: RoborockMessage) -> None:
        """Simulate publishing a message and triggering a response."""
        self.published_messages.append(message)
        if self.publish_side_effect:
            raise self.publish_side_effect
        # When a message is published, simulate a response
        if self.response_queue:
            response = self.response_queue.pop(0)
            # Give a chance for the subscriber to be registered
            for subscriber in list(self.subscribers):
                subscriber(response)

    async def _subscribe(self, callback: Callable[[RoborockMessage], None]) -> Callable[[], None]:
        """Simulate subscribing to messages."""
        self.subscribers.append(callback)
        return lambda: self.subscribers.remove(callback)