1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136
|
"""Traits for Q7 B01 devices.
Potentially other devices may fall into this category in the future."""
from typing import Any
from roborock import B01Props
from roborock.data.b01_q7.b01_q7_code_mappings import (
CleanPathPreferenceMapping,
CleanRepeatMapping,
CleanTaskTypeMapping,
CleanTypeMapping,
SCDeviceCleanParam,
SCWindMapping,
WaterLevelMapping,
)
from roborock.devices.rpc.b01_q7_channel import send_decoded_command
from roborock.devices.traits import Trait
from roborock.devices.transport.mqtt_channel import MqttChannel
from roborock.protocols.b01_q7_protocol import CommandType, ParamsType, Q7RequestMessage
from roborock.roborock_message import RoborockB01Props
from roborock.roborock_typing import RoborockB01Q7Methods
from .clean_summary import CleanSummaryTrait
__all__ = [
"Q7PropertiesApi",
"CleanSummaryTrait",
]
class Q7PropertiesApi(Trait):
"""API for interacting with B01 devices."""
clean_summary: CleanSummaryTrait
"""Trait for clean records / clean summary (Q7 `service.get_record_list`)."""
def __init__(self, channel: MqttChannel) -> None:
"""Initialize the B01Props API."""
self._channel = channel
self.clean_summary = CleanSummaryTrait(channel)
async def query_values(self, props: list[RoborockB01Props]) -> B01Props | None:
"""Query the device for the values of the given Q7 properties."""
result = await self.send(
RoborockB01Q7Methods.GET_PROP,
{"property": props},
)
if not isinstance(result, dict):
raise TypeError(f"Unexpected response type for GET_PROP: {type(result).__name__}: {result!r}")
return B01Props.from_dict(result)
async def set_prop(self, prop: RoborockB01Props, value: Any) -> None:
"""Set a property on the device."""
await self.send(
command=RoborockB01Q7Methods.SET_PROP,
params={prop: value},
)
async def set_fan_speed(self, fan_speed: SCWindMapping) -> None:
"""Set the fan speed (wind)."""
await self.set_prop(RoborockB01Props.WIND, fan_speed.code)
async def set_water_level(self, water_level: WaterLevelMapping) -> None:
"""Set the water level (water)."""
await self.set_prop(RoborockB01Props.WATER, water_level.code)
async def set_mode(self, mode: CleanTypeMapping) -> None:
"""Set the cleaning mode (vacuum, mop, or vacuum and mop)."""
await self.set_prop(RoborockB01Props.MODE, mode.code)
async def set_clean_path_preference(self, preference: CleanPathPreferenceMapping) -> None:
"""Set the cleaning path preference (route)."""
await self.set_prop(RoborockB01Props.CLEAN_PATH_PREFERENCE, preference.code)
async def set_repeat_state(self, repeat: CleanRepeatMapping) -> None:
"""Set the cleaning repeat state (cycles)."""
await self.set_prop(RoborockB01Props.REPEAT_STATE, repeat.code)
async def start_clean(self) -> None:
"""Start cleaning."""
await self.send(
command=RoborockB01Q7Methods.SET_ROOM_CLEAN,
params={
"clean_type": CleanTaskTypeMapping.ALL.code,
"ctrl_value": SCDeviceCleanParam.START.code,
"room_ids": [],
},
)
async def pause_clean(self) -> None:
"""Pause cleaning."""
await self.send(
command=RoborockB01Q7Methods.SET_ROOM_CLEAN,
params={
"clean_type": CleanTaskTypeMapping.ALL.code,
"ctrl_value": SCDeviceCleanParam.PAUSE.code,
"room_ids": [],
},
)
async def stop_clean(self) -> None:
"""Stop cleaning."""
await self.send(
command=RoborockB01Q7Methods.SET_ROOM_CLEAN,
params={
"clean_type": CleanTaskTypeMapping.ALL.code,
"ctrl_value": SCDeviceCleanParam.STOP.code,
"room_ids": [],
},
)
async def return_to_dock(self) -> None:
"""Return to dock."""
await self.send(
command=RoborockB01Q7Methods.START_RECHARGE,
params={},
)
async def find_me(self) -> None:
"""Locate the robot."""
await self.send(
command=RoborockB01Q7Methods.FIND_DEVICE,
params={},
)
async def send(self, command: CommandType, params: ParamsType) -> Any:
"""Send a command to the device."""
return await send_decoded_command(
self._channel,
Q7RequestMessage(dps=10000, command=command, params=params),
)
def create(channel: MqttChannel) -> Q7PropertiesApi:
"""Create traits for B01 devices."""
return Q7PropertiesApi(channel)
|