1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124
|
"""Support for discovering myStrom devices."""
import asyncio
import logging
from typing import Optional, List
_LOGGER = logging.getLogger(__name__)
DEVICE_MAPPING = {
"101": "myStrom Switch v1",
"102": "myStrom Bulb",
"103": "myStrom Button+",
"104": "myStrom Button",
"105": "myStrom LED strip",
"106": "myStzrom Switch v2",
"107": "myStrom Switch EU",
"110": "myStrom Motion sensor",
"120": "myStrom Switch Zero",
}
class DiscoveredDevice(object):
"""Representation of discovered device."""
mac: str
type: int
is_child: bool
mystrom_registered: bool
mystrom_online: bool
restarted: bool
@staticmethod
def create_from_announce_msg(raw_addr, announce_msg):
"""Create announce message."""
_LOGGER.debug("Received announce message '%s' from %s ", announce_msg, raw_addr)
if len(announce_msg) != 8:
raise RuntimeError("Unexpected announcement, '%s'" % announce_msg)
device = DiscoveredDevice(host=raw_addr[0], mac=announce_msg[0:6].hex(":"))
device.type = announce_msg[6]
if device.type == "102":
device.hardware = DEVICE_MAPPING[str(announce_msg[6])]
else:
device.hardware = "non_mystrom"
status = announce_msg[7]
# Parse status field
device.is_child = status & 1 != 0
device.mystrom_registered = status & 2 != 0
device.mystrom_online = status & 4 != 0
device.restarted = status & 8 != 0
return device
def __init__(self, host, mac):
"""Initialize the discovery."""
self.host = host
self.mac = mac
class DeviceRegistry(object):
"""Representation of the device registry."""
def __init__(self):
"""Initialize the device registry."""
self.devices_by_mac = {}
def register(self, device):
"""Register a device."""
self.devices_by_mac[device.mac] = device
def devices(self):
"""Get all present devices."""
return list(self.devices_by_mac.values())
class DiscoveryProtocol(asyncio.DatagramProtocol):
"""Representation of the discovery protocol."""
def __init__(self, registry: DeviceRegistry):
""" "Initialize the discovery protocol."""
super().__init__()
self.registry = registry
def connection_made(self, transport):
"""Create an UDP listener."""
_LOGGER.debug("Starting up UDP listener")
self.transport = transport
def datagram_received(self, data, addr):
"""Handle a datagram."""
device = DiscoveredDevice.create_from_announce_msg(addr, data)
self.registry.register(device)
def connection_lost(self, exc: Optional[Exception]) -> None:
"""Stop if connection is lost."""
_LOGGER.debug("Shutting down UDP listener")
super().connection_lost(exc)
async def discover_devices(timeout: int = 7) -> List[DiscoveredDevice]:
"""Discover local myStrom devices.
Some myStrom devices report their presence every ~5 seconds in an UDP
broadcast to port 7979.
"""
registry = DeviceRegistry()
loop = asyncio.get_event_loop()
(transport, protocol) = await loop.create_datagram_endpoint(
lambda: DiscoveryProtocol(registry), local_addr=("0.0.0.0", 7979)
)
# Server runs in the background, meanwhile wait until timeout expires
await asyncio.sleep(timeout)
# Shutdown server
transport.close()
devices = registry.devices()
for device in devices:
_LOGGER.debug(
"Discovered myStrom device %s (%s) (MAC addresse: %s)",
device.host,
device.type,
device.mac,
)
return devices
|