File: health_manager.py

package info (click to toggle)
python-roborock 4.12.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 2,480 kB
  • sloc: python: 16,602; makefile: 17; sh: 6
file content (60 lines) | stat: -rw-r--r-- 2,208 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
"""A health manager for monitoring MQTT connections to Roborock devices.

We observe a problem where sometimes the MQTT connection appears to be alive but
no messages are being received. To mitigate this, we track consecutive timeouts
and restart the connection if too many timeouts occur in succession.
"""

import datetime
import logging
from collections.abc import Awaitable, Callable

_LOGGER = logging.getLogger(__name__)

# Number of consecutive timeouts before considering the connection unhealthy.
TIMEOUT_THRESHOLD = 3

# We won't restart the session more often than this interval.
RESTART_COOLDOWN = datetime.timedelta(minutes=30)


class HealthManager:
    """Manager for monitoring the health of MQTT connections.

    This tracks communication timeouts and can trigger restarts of the MQTT
    session if too many timeouts occur in succession.
    """

    def __init__(self, restart: Callable[[], Awaitable[None]]) -> None:
        """Initialize the health manager.

        Args:
            restart: A callable to restart the MQTT session.
        """
        self._consecutive_timeouts = 0
        self._restart = restart
        self._last_restart: datetime.datetime | None = None

    async def on_success(self) -> None:
        """Record a successful communication event."""
        self._consecutive_timeouts = 0

    async def on_timeout(self) -> None:
        """Record a timeout event.

        This may trigger a restart of the MQTT session if too many timeouts
        have occurred in succession.
        """
        self._consecutive_timeouts += 1
        if self._consecutive_timeouts >= TIMEOUT_THRESHOLD:
            now = datetime.datetime.now(datetime.UTC)
            since_last = (now - self._last_restart) if self._last_restart else None
            if since_last is None or since_last >= RESTART_COOLDOWN:
                _LOGGER.debug(
                    "Restarting MQTT session after %d consecutive timeouts (duration since last restart %s)",
                    self._consecutive_timeouts,
                    since_last,
                )
                await self._restart()
                self._last_restart = now
                self._consecutive_timeouts = 0