File: discovery.py

package info (click to toggle)
python-mystrom 2.2.0-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, trixie
  • size: 188 kB
  • sloc: python: 855; makefile: 3
file content (124 lines) | stat: -rw-r--r-- 3,729 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
"""Support for discovering myStrom devices."""
import asyncio
import logging
from typing import Optional, List

_LOGGER = logging.getLogger(__name__)

DEVICE_MAPPING = {
    "101": "myStrom Switch v1",
    "102": "myStrom Bulb",
    "103": "myStrom Button+",
    "104": "myStrom Button",
    "105": "myStrom LED strip",
    "106": "myStzrom Switch v2",
    "107": "myStrom Switch EU",
    "110": "myStrom Motion sensor",
    "120": "myStrom Switch Zero",
}


class DiscoveredDevice(object):
    """Representation of discovered device."""

    mac: str
    type: int
    is_child: bool
    mystrom_registered: bool
    mystrom_online: bool
    restarted: bool

    @staticmethod
    def create_from_announce_msg(raw_addr, announce_msg):
        """Create announce message."""
        _LOGGER.debug("Received announce message '%s' from %s ", announce_msg, raw_addr)
        if len(announce_msg) != 8:
            raise RuntimeError("Unexpected announcement, '%s'" % announce_msg)

        device = DiscoveredDevice(host=raw_addr[0], mac=announce_msg[0:6].hex(":"))
        device.type = announce_msg[6]

        if device.type == "102":
            device.hardware = DEVICE_MAPPING[str(announce_msg[6])]
        else:
            device.hardware = "non_mystrom"
        status = announce_msg[7]

        # Parse status field
        device.is_child = status & 1 != 0
        device.mystrom_registered = status & 2 != 0
        device.mystrom_online = status & 4 != 0
        device.restarted = status & 8 != 0
        return device

    def __init__(self, host, mac):
        """Initialize the discovery."""
        self.host = host
        self.mac = mac


class DeviceRegistry(object):
    """Representation of the device registry."""

    def __init__(self):
        """Initialize the device registry."""
        self.devices_by_mac = {}

    def register(self, device):
        """Register a device."""
        self.devices_by_mac[device.mac] = device

    def devices(self):
        """Get all present devices."""
        return list(self.devices_by_mac.values())


class DiscoveryProtocol(asyncio.DatagramProtocol):
    """Representation of the discovery protocol."""

    def __init__(self, registry: DeviceRegistry):
        """ "Initialize the discovery protocol."""
        super().__init__()
        self.registry = registry

    def connection_made(self, transport):
        """Create an UDP listener."""
        _LOGGER.debug("Starting up UDP listener")
        self.transport = transport

    def datagram_received(self, data, addr):
        """Handle a datagram."""
        device = DiscoveredDevice.create_from_announce_msg(addr, data)
        self.registry.register(device)

    def connection_lost(self, exc: Optional[Exception]) -> None:
        """Stop if connection is lost."""
        _LOGGER.debug("Shutting down UDP listener")
        super().connection_lost(exc)


async def discover_devices(timeout: int = 7) -> List[DiscoveredDevice]:
    """Discover local myStrom devices.

    Some myStrom devices report their presence every ~5 seconds in an UDP
    broadcast to port 7979.
    """
    registry = DeviceRegistry()
    loop = asyncio.get_event_loop()
    (transport, protocol) = await loop.create_datagram_endpoint(
        lambda: DiscoveryProtocol(registry), local_addr=("0.0.0.0", 7979)
    )
    # Server runs in the background, meanwhile wait until timeout expires
    await asyncio.sleep(timeout)
    # Shutdown server
    transport.close()

    devices = registry.devices()
    for device in devices:
        _LOGGER.debug(
            "Discovered myStrom device %s (%s) (MAC addresse: %s)",
            device.host,
            device.type,
            device.mac,
        )
    return devices