File: discovery.py

package info (click to toggle)
python-mystrom 2.5.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 292 kB
  • sloc: python: 901; makefile: 2
file content (115 lines) | stat: -rw-r--r-- 3,481 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
"""Support for discovering myStrom devices."""

import asyncio
import logging
from typing import List, Optional

from .device_types import DEVICE_MAPPING_NUMERIC

_LOGGER = logging.getLogger(__name__)


class DiscoveredDevice(object):
    """Representation of discovered device."""

    mac: str
    type: int
    is_child: bool
    mystrom_registered: bool
    mystrom_online: bool
    restarted: bool

    @staticmethod
    def create_from_announce_msg(raw_addr, announce_msg):
        """Create announce message."""
        _LOGGER.debug("Received announce message '%s' from %s ", announce_msg, raw_addr)
        if len(announce_msg) != 8:
            raise RuntimeError("Unexpected announcement, '%s'" % announce_msg)

        device = DiscoveredDevice(host=raw_addr[0], mac=announce_msg[0:6].hex(":"))
        device.type = announce_msg[6]

        if device.type == "102":
            device.hardware = DEVICE_MAPPING_NUMERIC[int(announce_msg[6])]
        else:
            device.hardware = "non_mystrom"
        status = announce_msg[7]

        # Parse status field
        device.is_child = status & 1 != 0
        device.mystrom_registered = status & 2 != 0
        device.mystrom_online = status & 4 != 0
        device.restarted = status & 8 != 0
        return device

    def __init__(self, host, mac):
        """Initialize the discovery."""
        self.host = host
        self.mac = mac


class DeviceRegistry(object):
    """Representation of the device registry."""

    def __init__(self):
        """Initialize the device registry."""
        self.devices_by_mac = {}

    def register(self, device):
        """Register a device."""
        self.devices_by_mac[device.mac] = device

    def devices(self):
        """Get all present devices."""
        return list(self.devices_by_mac.values())


class DiscoveryProtocol(asyncio.DatagramProtocol):
    """Representation of the discovery protocol."""

    def __init__(self, registry: DeviceRegistry):
        """ "Initialize the discovery protocol."""
        super().__init__()
        self.registry = registry

    def connection_made(self, transport):
        """Create an UDP listener."""
        _LOGGER.debug("Starting up UDP listener")
        self.transport = transport

    def datagram_received(self, data, addr):
        """Handle a datagram."""
        device = DiscoveredDevice.create_from_announce_msg(addr, data)
        self.registry.register(device)

    def connection_lost(self, exc: Optional[Exception]) -> None:
        """Stop if connection is lost."""
        _LOGGER.debug("Shutting down UDP listener")
        super().connection_lost(exc)


async def discover_devices(timeout: int = 7) -> List[DiscoveredDevice]:
    """Discover local myStrom devices.

    Some myStrom devices report their presence every ~5 seconds in an UDP
    broadcast to port 7979.
    """
    registry = DeviceRegistry()
    loop = asyncio.get_event_loop()
    (transport, protocol) = await loop.create_datagram_endpoint(
        lambda: DiscoveryProtocol(registry), local_addr=("0.0.0.0", 7979)
    )
    # Server runs in the background, meanwhile wait until timeout expires
    await asyncio.sleep(timeout)
    # Shutdown server
    transport.close()

    devices = registry.devices()
    for device in devices:
        _LOGGER.debug(
            "Discovered myStrom device %s (%s) (MAC addresse: %s)",
            device.host,
            device.type,
            device.mac,
        )
    return devices