1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159
|
import logging
from abc import ABC, abstractmethod
from collections.abc import Callable
from datetime import time
from typing import Any
from roborock import DeviceData
from roborock.api import RoborockClient
from roborock.code_mappings import (
DyadBrushSpeed,
DyadCleanMode,
DyadError,
DyadSelfCleanLevel,
DyadSelfCleanMode,
DyadSuction,
DyadWarmLevel,
DyadWaterLevel,
RoborockDyadStateCode,
ZeoDetergentType,
ZeoDryingMode,
ZeoError,
ZeoMode,
ZeoProgram,
ZeoRinse,
ZeoSoftenerType,
ZeoSpin,
ZeoState,
ZeoTemperature,
)
from roborock.containers import DyadProductInfo, DyadSndState, RoborockCategory
from roborock.exceptions import RoborockException
from roborock.protocols.a01_protocol import decode_rpc_response
from roborock.roborock_message import (
RoborockDyadDataProtocol,
RoborockMessage,
RoborockMessageProtocol,
RoborockZeoProtocol,
)
_LOGGER = logging.getLogger(__name__)
DYAD_PROTOCOL_ENTRIES: dict[RoborockDyadDataProtocol, Callable] = {
RoborockDyadDataProtocol.STATUS: lambda val: RoborockDyadStateCode(val).name,
RoborockDyadDataProtocol.SELF_CLEAN_MODE: lambda val: DyadSelfCleanMode(val).name,
RoborockDyadDataProtocol.SELF_CLEAN_LEVEL: lambda val: DyadSelfCleanLevel(val).name,
RoborockDyadDataProtocol.WARM_LEVEL: lambda val: DyadWarmLevel(val).name,
RoborockDyadDataProtocol.CLEAN_MODE: lambda val: DyadCleanMode(val).name,
RoborockDyadDataProtocol.SUCTION: lambda val: DyadSuction(val).name,
RoborockDyadDataProtocol.WATER_LEVEL: lambda val: DyadWaterLevel(val).name,
RoborockDyadDataProtocol.BRUSH_SPEED: lambda val: DyadBrushSpeed(val).name,
RoborockDyadDataProtocol.POWER: lambda val: int(val),
RoborockDyadDataProtocol.AUTO_DRY: lambda val: bool(val),
RoborockDyadDataProtocol.MESH_LEFT: lambda val: int(360000 - val * 60),
RoborockDyadDataProtocol.BRUSH_LEFT: lambda val: int(360000 - val * 60),
RoborockDyadDataProtocol.ERROR: lambda val: DyadError(val).name,
RoborockDyadDataProtocol.VOLUME_SET: lambda val: int(val),
RoborockDyadDataProtocol.STAND_LOCK_AUTO_RUN: lambda val: bool(val),
RoborockDyadDataProtocol.AUTO_DRY_MODE: lambda val: bool(val),
RoborockDyadDataProtocol.SILENT_DRY_DURATION: lambda val: int(val), # in minutes
RoborockDyadDataProtocol.SILENT_MODE: lambda val: bool(val),
RoborockDyadDataProtocol.SILENT_MODE_START_TIME: lambda val: time(
hour=int(val / 60), minute=val % 60
), # in minutes since 00:00
RoborockDyadDataProtocol.SILENT_MODE_END_TIME: lambda val: time(
hour=int(val / 60), minute=val % 60
), # in minutes since 00:00
RoborockDyadDataProtocol.RECENT_RUN_TIME: lambda val: [
int(v) for v in val.split(",")
], # minutes of cleaning in past few days.
RoborockDyadDataProtocol.TOTAL_RUN_TIME: lambda val: int(val),
RoborockDyadDataProtocol.SND_STATE: lambda val: DyadSndState.from_dict(val),
RoborockDyadDataProtocol.PRODUCT_INFO: lambda val: DyadProductInfo.from_dict(val),
}
ZEO_PROTOCOL_ENTRIES: dict[RoborockZeoProtocol, Callable] = {
# ro
RoborockZeoProtocol.STATE: lambda val: ZeoState(val).name,
RoborockZeoProtocol.COUNTDOWN: lambda val: int(val),
RoborockZeoProtocol.WASHING_LEFT: lambda val: int(val),
RoborockZeoProtocol.ERROR: lambda val: ZeoError(val).name,
RoborockZeoProtocol.TIMES_AFTER_CLEAN: lambda val: int(val),
RoborockZeoProtocol.DETERGENT_EMPTY: lambda val: bool(val),
RoborockZeoProtocol.SOFTENER_EMPTY: lambda val: bool(val),
# rw
RoborockZeoProtocol.MODE: lambda val: ZeoMode(val).name,
RoborockZeoProtocol.PROGRAM: lambda val: ZeoProgram(val).name,
RoborockZeoProtocol.TEMP: lambda val: ZeoTemperature(val).name,
RoborockZeoProtocol.RINSE_TIMES: lambda val: ZeoRinse(val).name,
RoborockZeoProtocol.SPIN_LEVEL: lambda val: ZeoSpin(val).name,
RoborockZeoProtocol.DRYING_MODE: lambda val: ZeoDryingMode(val).name,
RoborockZeoProtocol.DETERGENT_TYPE: lambda val: ZeoDetergentType(val).name,
RoborockZeoProtocol.SOFTENER_TYPE: lambda val: ZeoSoftenerType(val).name,
RoborockZeoProtocol.SOUND_SET: lambda val: bool(val),
}
def convert_dyad_value(protocol: int, value: Any) -> Any:
"""Convert a dyad protocol value to its corresponding type."""
protocol_value = RoborockDyadDataProtocol(protocol)
if (converter := DYAD_PROTOCOL_ENTRIES.get(protocol_value)) is not None:
return converter(value)
return None
def convert_zeo_value(protocol: int, value: Any) -> Any:
"""Convert a zeo protocol value to its corresponding type."""
protocol_value = RoborockZeoProtocol(protocol)
if (converter := ZEO_PROTOCOL_ENTRIES.get(protocol_value)) is not None:
return converter(value)
return None
class RoborockClientA01(RoborockClient, ABC):
"""Roborock client base class for A01 devices."""
value_converter: Callable[[int, Any], Any] | None = None
def __init__(self, device_info: DeviceData, category: RoborockCategory):
"""Initialize the Roborock client."""
super().__init__(device_info)
if category == RoborockCategory.WET_DRY_VAC:
self.value_converter = convert_dyad_value
elif category == RoborockCategory.WASHING_MACHINE:
self.value_converter = convert_zeo_value
else:
_LOGGER.debug("Device category %s is not (yet) supported", category)
self.value_converter = None
def on_message_received(self, messages: list[RoborockMessage]) -> None:
if self.value_converter is None:
return
for message in messages:
protocol = message.protocol
if message.payload and protocol in [
RoborockMessageProtocol.RPC_RESPONSE,
RoborockMessageProtocol.GENERAL_REQUEST,
]:
try:
data_points = decode_rpc_response(message)
except RoborockException as err:
self._logger.debug("Failed to decode message: %s", err)
continue
for data_point_number, data_point in data_points.items():
self._logger.debug("received msg with dps, protocol: %s, %s", data_point_number, protocol)
if converted_response := self.value_converter(data_point_number, data_point):
queue = self._waiting_queue.get(int(data_point_number))
if queue and queue.protocol == protocol:
queue.set_result(converted_response)
else:
self._logger.debug(
"Received unknown data point %s for protocol %s, ignoring", data_point_number, protocol
)
@abstractmethod
async def update_values(
self, dyad_data_protocols: list[RoborockDyadDataProtocol | RoborockZeoProtocol]
) -> dict[RoborockDyadDataProtocol | RoborockZeoProtocol, Any]:
"""This should handle updating for each given protocol."""
|