1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170
|
"""Discover switchbot devices."""
from __future__ import annotations
import asyncio
import logging
from collections.abc import Callable
import bleak
from bleak.backends.device import BLEDevice
from bleak.backends.scanner import AdvertisementData
from .adv_parser import parse_advertisement_data
from .const import DEFAULT_RETRY_COUNT, DEFAULT_RETRY_TIMEOUT, DEFAULT_SCAN_TIMEOUT
from .models import SwitchBotAdvertisement
_LOGGER = logging.getLogger(__name__)
CONNECT_LOCK = asyncio.Lock()
class GetSwitchbotDevices:
"""Scan for all Switchbot devices and return by type."""
def __init__(
self,
interface: int = 0,
callback: Callable[[SwitchBotAdvertisement], None] | None = None,
) -> None:
"""Get switchbot devices class constructor."""
self._interface = f"hci{interface}"
self._adv_data: dict[str, SwitchBotAdvertisement] = {}
self._callback = callback
def detection_callback(
self,
device: BLEDevice,
advertisement_data: AdvertisementData,
) -> None:
"""Callback for device detection."""
discovery = parse_advertisement_data(device, advertisement_data)
if discovery:
self._adv_data[discovery.address] = discovery
if self._callback is not None:
try:
self._callback(discovery)
except Exception:
_LOGGER.exception("Error in discovery callback")
async def discover(
self, retry: int = DEFAULT_RETRY_COUNT, scan_timeout: int = DEFAULT_SCAN_TIMEOUT
) -> dict:
"""Find switchbot devices and their advertisement data."""
devices = None
devices = bleak.BleakScanner(
detection_callback=self.detection_callback,
# TODO: Find new UUIDs to filter on. For example, see
# https://github.com/OpenWonderLabs/SwitchBotAPI-BLE/blob/4ad138bb09f0fbbfa41b152ca327a78c1d0b6ba9/devicetypes/meter.md
adapter=self._interface,
)
async with CONNECT_LOCK:
await devices.start()
await asyncio.sleep(scan_timeout)
await devices.stop()
if devices is None:
if retry < 1:
_LOGGER.error(
"Scanning for Switchbot devices failed. Stop trying", exc_info=True
)
return self._adv_data
_LOGGER.warning(
"Error scanning for Switchbot devices. Retrying (remaining: %d)",
retry,
)
await asyncio.sleep(DEFAULT_RETRY_TIMEOUT)
return await self.discover(retry - 1, scan_timeout)
return self._adv_data
async def _get_devices_by_model(
self,
model: str,
) -> dict:
"""Get switchbot devices by type."""
if not self._adv_data:
await self.discover()
return {
address: adv
for address, adv in self._adv_data.items()
if adv.data.get("model") == model
}
async def get_blind_tilts(self) -> dict[str, SwitchBotAdvertisement]:
"""Return all WoBlindTilt/BlindTilts devices with services data."""
regular_blinds = await self._get_devices_by_model("x")
pairing_blinds = await self._get_devices_by_model("X")
return {**regular_blinds, **pairing_blinds}
async def get_curtains(self) -> dict[str, SwitchBotAdvertisement]:
"""Return all WoCurtain/Curtains devices with services data."""
regular_curtains = await self._get_devices_by_model("c")
pairing_curtains = await self._get_devices_by_model("C")
regular_curtains3 = await self._get_devices_by_model("{")
pairing_curtains3 = await self._get_devices_by_model("[")
return {
**regular_curtains,
**pairing_curtains,
**regular_curtains3,
**pairing_curtains3,
}
async def get_bots(self) -> dict[str, SwitchBotAdvertisement]:
"""Return all WoHand/Bot devices with services data."""
return await self._get_devices_by_model("H")
async def get_tempsensors(self) -> dict[str, SwitchBotAdvertisement]:
"""Return all WoSensorTH/Temp sensor devices with services data."""
base_meters = await self._get_devices_by_model("T")
plus_meters = await self._get_devices_by_model("i")
io_meters = await self._get_devices_by_model("w")
hub2_meters = await self._get_devices_by_model("v")
hubmini_matter_meters = await self._get_devices_by_model("%")
return {
**base_meters,
**plus_meters,
**io_meters,
**hub2_meters,
**hubmini_matter_meters,
}
async def get_contactsensors(self) -> dict[str, SwitchBotAdvertisement]:
"""Return all WoContact/Contact sensor devices with services data."""
return await self._get_devices_by_model("d")
async def get_leakdetectors(self) -> dict[str, SwitchBotAdvertisement]:
"""Return all Leak Detectors with services data."""
return await self._get_devices_by_model("&")
async def get_locks(self) -> dict[str, SwitchBotAdvertisement]:
"""Return all WoLock/Locks devices with services data."""
locks = await self._get_devices_by_model("o")
lock_pros = await self._get_devices_by_model("$")
return {**locks, **lock_pros}
async def get_keypads(self) -> dict[str, SwitchBotAdvertisement]:
"""Return all WoKeypad/Keypad devices with services data."""
return await self._get_devices_by_model("y")
async def get_humidifiers(self) -> dict[str, SwitchBotAdvertisement]:
"""Return all humidifier devices with services data."""
humidifiers = await self._get_devices_by_model("e")
evaporative_humidifiers = await self._get_devices_by_model("#")
return {**humidifiers, **evaporative_humidifiers}
async def get_device_data(
self, address: str
) -> dict[str, SwitchBotAdvertisement] | None:
"""Return data for specific device."""
if not self._adv_data:
await self.discover()
return {
device: adv
for device, adv in self._adv_data.items()
# MacOS uses UUIDs instead of MAC addresses
if adv.data.get("address") == address
}
|