1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
|
"""An MQTT session for sending and receiving messages."""
from abc import ABC, abstractmethod
from collections.abc import Callable
from dataclasses import dataclass
from roborock.exceptions import RoborockException
DEFAULT_TIMEOUT = 30.0
@dataclass
class MqttParams:
"""MQTT parameters for the connection."""
host: str
"""MQTT host to connect to."""
port: int
"""MQTT port to connect to."""
tls: bool
"""Use TLS for the connection."""
username: str
"""MQTT username to use for authentication."""
password: str
"""MQTT password to use for authentication."""
timeout: float = DEFAULT_TIMEOUT
"""Timeout for communications with the broker in seconds."""
class MqttSession(ABC):
"""An MQTT session for sending and receiving messages."""
@property
@abstractmethod
def connected(self) -> bool:
"""True if the session is connected to the broker."""
@abstractmethod
async def subscribe(self, device_id: str, callback: Callable[[bytes], None]) -> Callable[[], None]:
"""Invoke the callback when messages are received on the topic.
The returned callable unsubscribes from the topic when called.
"""
@abstractmethod
async def publish(self, topic: str, message: bytes) -> None:
"""Publish a message on the specified topic.
This will raise an exception if the message could not be sent.
"""
@abstractmethod
async def close(self) -> None:
"""Cancels the mqtt loop"""
class MqttSessionException(RoborockException):
""" "Raised when there is an error communicating with MQTT."""
|