File: api.py

package info (click to toggle)
python-roborock 2.38.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky
  • size: 1,092 kB
  • sloc: python: 9,722; makefile: 17
file content (113 lines) | stat: -rw-r--r-- 3,800 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
"""The Roborock api."""

from __future__ import annotations

import asyncio
import logging
import time
from abc import ABC, abstractmethod
from typing import Any

from .containers import (
    DeviceData,
)
from .exceptions import (
    RoborockTimeout,
    UnknownMethodError,
)
from .roborock_future import RoborockFuture
from .roborock_message import (
    RoborockMessage,
    RoborockMessageProtocol,
)
from .util import get_next_int

_LOGGER = logging.getLogger(__name__)
KEEPALIVE = 60


class RoborockClient(ABC):
    """Roborock client base class."""

    _logger: logging.LoggerAdapter
    queue_timeout: int

    def __init__(self, device_info: DeviceData) -> None:
        """Initialize RoborockClient."""
        self.device_info = device_info
        self._waiting_queue: dict[int, RoborockFuture] = {}
        self._last_device_msg_in = time.monotonic()
        self._last_disconnection = time.monotonic()
        self.keep_alive = KEEPALIVE
        self._diagnostic_data: dict[str, dict[str, Any]] = {}
        self.is_available: bool = True

    async def async_release(self) -> None:
        await self.async_disconnect()

    @property
    def diagnostic_data(self) -> dict:
        return self._diagnostic_data

    @abstractmethod
    async def async_connect(self):
        """Connect to the Roborock device."""

    @abstractmethod
    async def async_disconnect(self) -> Any:
        """Disconnect from the Roborock device."""

    @abstractmethod
    def is_connected(self) -> bool:
        """Return True if the client is connected to the device."""

    @abstractmethod
    def on_message_received(self, messages: list[RoborockMessage]) -> None:
        """Handle received incoming messages from the device."""

    def on_connection_lost(self, exc: Exception | None) -> None:
        self._last_disconnection = time.monotonic()
        self._logger.info("Roborock client disconnected")
        if exc is not None:
            self._logger.warning(exc)

    def should_keepalive(self) -> bool:
        now = time.monotonic()
        # noinspection PyUnresolvedReferences
        if now - self._last_disconnection > self.keep_alive**2 and now - self._last_device_msg_in > self.keep_alive:
            return False
        return True

    async def validate_connection(self) -> None:
        if not self.should_keepalive():
            self._logger.info("Resetting Roborock connection due to keepalive timeout")
            await self.async_disconnect()
        await self.async_connect()

    async def _wait_response(self, request_id: int, queue: RoborockFuture) -> Any:
        try:
            response = await queue.async_get(self.queue_timeout)
            if response == "unknown_method":
                raise UnknownMethodError("Unknown method")
            return response
        except (asyncio.TimeoutError, asyncio.CancelledError):
            raise RoborockTimeout(f"id={request_id} Timeout after {self.queue_timeout} seconds") from None
        finally:
            self._waiting_queue.pop(request_id, None)

    def _async_response(self, request_id: int, protocol_id: int = 0) -> Any:
        queue = RoborockFuture(protocol_id)
        if request_id in self._waiting_queue and not (
            request_id == 2 and protocol_id == RoborockMessageProtocol.PING_REQUEST
        ):
            new_id = get_next_int(10000, 32767)
            self._logger.warning(
                "Attempting to create a future with an existing id %s (%s)... New id is %s. "
                "Code may not function properly.",
                request_id,
                protocol_id,
                new_id,
            )
            request_id = new_id
        self._waiting_queue[request_id] = queue
        return asyncio.ensure_future(self._wait_response(request_id, queue))