1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180
|
"""Library to handle connection with Switchbot."""
from __future__ import annotations
import logging
from typing import Any
from ..models import SwitchBotAdvertisement
from .base_cover import COVER_COMMAND, COVER_EXT_SUM_KEY, SwitchbotBaseCover
from .device import REQ_HEADER, update_after_operation
# For second element of open and close arrs we should add two bytes i.e. ff00
# First byte [ff] stands for speed (00 or ff - normal, 01 - slow) *
# * Only for curtains 3. For other models use ff
# Second byte [00] is a command (00 - open, 64 - close)
OPEN_KEYS = [
f"{REQ_HEADER}{COVER_COMMAND}010100",
f"{REQ_HEADER}{COVER_COMMAND}05", # +speed + "00"
]
CLOSE_KEYS = [
f"{REQ_HEADER}{COVER_COMMAND}010164",
f"{REQ_HEADER}{COVER_COMMAND}05", # +speed + "64"
]
POSITION_KEYS = [
f"{REQ_HEADER}{COVER_COMMAND}0101",
f"{REQ_HEADER}{COVER_COMMAND}05", # +speed
] # +actual_position
STOP_KEYS = [f"{REQ_HEADER}{COVER_COMMAND}0001", f"{REQ_HEADER}{COVER_COMMAND}00ff"]
CURTAIN_EXT_CHAIN_INFO_KEY = f"{REQ_HEADER}468101"
_LOGGER = logging.getLogger(__name__)
class SwitchbotCurtain(SwitchbotBaseCover):
"""Representation of a Switchbot Curtain."""
def __init__(self, *args: Any, **kwargs: Any) -> None:
"""Switchbot Curtain/WoCurtain constructor."""
# The position of the curtain is saved returned with 0 = open and 100 = closed.
# This is independent of the calibration of the curtain bot (Open left to right/
# Open right to left/Open from the middle).
# The parameter 'reverse_mode' reverse these values,
# if 'reverse_mode' = True, position = 0 equals close
# and position = 100 equals open. The parameter is default set to True so that
# the definition of position is the same as in Home Assistant.
self._reverse: bool = kwargs.pop("reverse_mode", True)
super().__init__(self._reverse, *args, **kwargs)
self._settings: dict[str, Any] = {}
self.ext_info_sum: dict[str, Any] = {}
self.ext_info_adv: dict[str, Any] = {}
def _set_parsed_data(
self, advertisement: SwitchBotAdvertisement, data: dict[str, Any]
) -> None:
"""Set data."""
in_motion = data["inMotion"]
previous_position = self._get_adv_value("position")
new_position = data["position"]
self._update_motion_direction(in_motion, previous_position, new_position)
super()._set_parsed_data(advertisement, data)
@update_after_operation
async def open(self, speed: int = 255) -> bool:
"""Send open command. Speed 255 - normal, 1 - slow"""
self._is_opening = True
self._is_closing = False
return await self._send_multiple_commands(
[OPEN_KEYS[0], f"{OPEN_KEYS[1]}{speed:02X}00"]
)
@update_after_operation
async def close(self, speed: int = 255) -> bool:
"""Send close command. Speed 255 - normal, 1 - slow"""
self._is_closing = True
self._is_opening = False
return await self._send_multiple_commands(
[CLOSE_KEYS[0], f"{CLOSE_KEYS[1]}{speed:02X}64"]
)
@update_after_operation
async def stop(self) -> bool:
"""Send stop command to device."""
self._is_opening = self._is_closing = False
return await super().stop()
@update_after_operation
async def set_position(self, position: int, speed: int = 255) -> bool:
"""Send position command (0-100) to device. Speed 255 - normal, 1 - slow"""
direction_adjusted_position = (100 - position) if self._reverse else position
self._update_motion_direction(
True, self._get_adv_value("position"), direction_adjusted_position
)
return await super().set_position(position, speed)
def get_position(self) -> Any:
"""Return cached position (0-100) of Curtain."""
# To get actual position call update() first.
return self._get_adv_value("position")
async def get_basic_info(self) -> dict[str, Any] | None:
"""Get device basic settings."""
if not (_data := await self._get_basic_info()):
return None
_position = max(min(_data[6], 100), 0)
_direction_adjusted_position = (100 - _position) if self._reverse else _position
_previous_position = self._get_adv_value("position")
_in_motion = bool(_data[5] & 0b01000011)
self._update_motion_direction(
_in_motion, _previous_position, _direction_adjusted_position
)
return {
"battery": _data[1],
"firmware": _data[2] / 10.0,
"chainLength": _data[3],
"openDirection": (
"right_to_left" if _data[4] & 0b10000000 == 128 else "left_to_right"
),
"touchToOpen": bool(_data[4] & 0b01000000),
"light": bool(_data[4] & 0b00100000),
"fault": bool(_data[4] & 0b00001000),
"solarPanel": bool(_data[5] & 0b00001000),
"calibration": bool(_data[5] & 0b00000100),
"calibrated": bool(_data[5] & 0b00000100),
"inMotion": _in_motion,
"position": _direction_adjusted_position,
"timers": _data[7],
}
def _update_motion_direction(
self, in_motion: bool, previous_position: int | None, new_position: int
) -> None:
"""Update opening/closing status based on movement."""
if previous_position is None:
return
if in_motion is False:
self._is_closing = self._is_opening = False
return
if new_position != previous_position:
self._is_opening = new_position > previous_position
self._is_closing = new_position < previous_position
async def get_extended_info_summary(self) -> dict[str, Any] | None:
"""Get extended info for all devices in chain."""
_data = await self._send_command(key=COVER_EXT_SUM_KEY)
if not _data:
_LOGGER.error("%s: Unsuccessful, no result from device", self.name)
return None
if _data in (b"\x07", b"\x00"):
_LOGGER.error("%s: Unsuccessful, please try again", self.name)
return None
self.ext_info_sum["device0"] = {
"openDirectionDefault": not bool(_data[1] & 0b10000000),
"touchToOpen": bool(_data[1] & 0b01000000),
"light": bool(_data[1] & 0b00100000),
"openDirection": (
"left_to_right" if _data[1] & 0b00010000 else "right_to_left"
),
}
# if grouped curtain device present.
if _data[2] != 0:
self.ext_info_sum["device1"] = {
"openDirectionDefault": not bool(_data[2] & 0b10000000),
"touchToOpen": bool(_data[2] & 0b01000000),
"light": bool(_data[2] & 0b00100000),
"openDirection": (
"left_to_right" if _data[2] & 0b00010000 else "right_to_left"
),
}
return self.ext_info_sum
|