1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131
|
"""Library to handle connection with Switchbot."""
from __future__ import annotations
import logging
import struct
from typing import Any
from bleak.backends.device import BLEDevice
from ..adv_parsers.air_purifier import get_air_purifier_mode
from ..const import SwitchbotModel
from ..const.air_purifier import AirPurifierMode, AirQualityLevel
from .device import (
SwitchbotEncryptedDevice,
SwitchbotSequenceDevice,
update_after_operation,
)
_LOGGER = logging.getLogger(__name__)
COMMAND_HEAD = "570f4c"
COMMAND_SET_MODE = {
AirPurifierMode.LEVEL_1.name.lower(): f"{COMMAND_HEAD}01010100",
AirPurifierMode.LEVEL_2.name.lower(): f"{COMMAND_HEAD}01010132",
AirPurifierMode.LEVEL_3.name.lower(): f"{COMMAND_HEAD}01010164",
AirPurifierMode.AUTO.name.lower(): f"{COMMAND_HEAD}01010200",
AirPurifierMode.SLEEP.name.lower(): f"{COMMAND_HEAD}01010300",
AirPurifierMode.PET.name.lower(): f"{COMMAND_HEAD}01010400",
}
DEVICE_GET_BASIC_SETTINGS_KEY = "570f4d81"
class SwitchbotAirPurifier(SwitchbotSequenceDevice, SwitchbotEncryptedDevice):
"""Representation of a Switchbot Air Purifier."""
_turn_on_command = f"{COMMAND_HEAD}010100"
_turn_off_command = f"{COMMAND_HEAD}010000"
def __init__(
self,
device: BLEDevice,
key_id: str,
encryption_key: str,
interface: int = 0,
model: SwitchbotModel = SwitchbotModel.AIR_PURIFIER,
**kwargs: Any,
) -> None:
super().__init__(device, key_id, encryption_key, model, interface, **kwargs)
@classmethod
async def verify_encryption_key(
cls,
device: BLEDevice,
key_id: str,
encryption_key: str,
model: SwitchbotModel = SwitchbotModel.AIR_PURIFIER,
**kwargs: Any,
) -> bool:
return await super().verify_encryption_key(
device, key_id, encryption_key, model, **kwargs
)
async def get_basic_info(self) -> dict[str, Any] | None:
"""Get device basic settings."""
if not (_data := await self._get_basic_info()):
return None
_LOGGER.debug("data: %s", _data)
isOn = bool(_data[2] & 0b10000000)
version_info = (_data[2] & 0b00110000) >> 4
_mode = _data[2] & 0b00000111
isAqiValid = bool(_data[3] & 0b00000100)
child_lock = bool(_data[3] & 0b00000010)
_aqi_level = (_data[4] & 0b00000110) >> 1
aqi_level = AirQualityLevel(_aqi_level).name.lower()
speed = _data[6] & 0b01111111
pm25 = struct.unpack("<H", _data[12:14])[0] & 0xFFF
firmware = _data[15] / 10.0
mode = get_air_purifier_mode(_mode, speed)
return {
"isOn": isOn,
"version_info": version_info,
"mode": mode,
"isAqiValid": isAqiValid,
"child_lock": child_lock,
"aqi_level": aqi_level,
"speed": speed,
"pm25": pm25,
"firmware": firmware,
}
async def _get_basic_info(self) -> bytes | None:
"""Return basic info of device."""
_data = await self._send_command(
key=DEVICE_GET_BASIC_SETTINGS_KEY, retry=self._retry_count
)
if _data in (b"\x07", b"\x00"):
_LOGGER.error("Unsuccessful, please try again")
return None
return _data
@update_after_operation
async def set_preset_mode(self, preset_mode: str) -> bool:
"""Send command to set air purifier preset_mode."""
result = await self._send_command(COMMAND_SET_MODE[preset_mode])
return self._check_command_result(result, 0, {1})
def get_current_percentage(self) -> Any:
"""Return cached percentage."""
return self._get_adv_value("speed")
def is_on(self) -> bool | None:
"""Return air purifier state from cache."""
return self._get_adv_value("isOn")
def get_current_aqi_level(self) -> Any:
"""Return cached aqi level."""
return self._get_adv_value("aqi_level")
def get_current_pm25(self) -> Any:
"""Return cached pm25."""
return self._get_adv_value("pm25")
def get_current_mode(self) -> Any:
"""Return cached mode."""
return self._get_adv_value("mode")
|