File: device.py

package info (click to toggle)
devolo-plc-api 1.5.1%2Bgit20260208.5d3989e-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 504 kB
  • sloc: python: 1,616; makefile: 6
file content (286 lines) | stat: -rw-r--r-- 11,681 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
"""Representation of your devolo device."""

from __future__ import annotations

import asyncio
import logging
from contextlib import suppress
from datetime import date
from ipaddress import ip_address, ip_network
from struct import unpack_from
from typing import TYPE_CHECKING, cast

from httpx import AsyncClient
from ifaddr import get_adapters
from zeroconf import DNSQuestionType, ServiceInfo, ServiceStateChange, Zeroconf
from zeroconf.asyncio import AsyncServiceBrowser, AsyncServiceInfo, AsyncZeroconf

from .device_api import SERVICE_TYPE as DEVICEAPI, DeviceApi
from .exceptions import DeviceNotFound
from .plcnet_api import DEVICES_WITHOUT_PLCNET, SERVICE_TYPE as PLCNETAPI, PlcNetApi
from .zeroconf import ZeroconfServiceInfo

if TYPE_CHECKING:
    from types import TracebackType

    from typing_extensions import Self


class Device:
    """
    Representing object for your devolo PLC device. It stores all properties and functionalities discovered during setup.

    :param ip: IP address of the device to communicate with.
    :param plcnetapi: Reuse externally gathered data for the plcnet API
    :param deviceapi: Reuse externally gathered data for the device API
    :param zeroconf_instance: Zeroconf instance to be potentially reused.
    """

    MDNS_TIMEOUT = 300

    def __init__(
        self,
        ip: str,
        zeroconf_instance: AsyncZeroconf | Zeroconf | None = None,
    ) -> None:
        """Initialize the device."""
        self.ip = ip
        self.mac = ""
        self.mt_number = "0"
        self.product = ""
        self.technology = ""
        self.serial_number = "0"

        self.device: DeviceApi | None = None
        self.plcnet: PlcNetApi | None = None

        self._background_tasks: set[asyncio.Task] = set()
        self._browser: AsyncServiceBrowser | None = None
        self._connected = False
        self._info: dict[str, ZeroconfServiceInfo] = {PLCNETAPI: ZeroconfServiceInfo(), DEVICEAPI: ZeroconfServiceInfo()}
        self._logger = logging.getLogger(f"{self.__class__.__module__}.{self.__class__.__name__}")
        self._multicast = False
        self._password = ""
        self._session_instance: AsyncClient | None = None
        self._zeroconf_instance = zeroconf_instance
        logging.captureWarnings(capture=True)

        self._session: AsyncClient
        self._zeroconf: AsyncZeroconf

    def __del__(self) -> None:
        """Warn user, if the connection was not properly closed."""
        if self._connected and self._session_instance is None:
            self._logger.warning("Please disconnect properly from the device.")

    async def __aenter__(self) -> Self:
        """Connect to a device asynchronously when entering a context manager."""
        await self.async_connect()
        return self

    async def __aexit__(
        self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None
    ) -> None:
        """Disconnect to a device asynchronously when entering a context manager."""
        await self.async_disconnect()

    def __enter__(self) -> Self:
        """Connect to a device synchronously when leaving a context manager."""
        self.connect()
        return self

    def __exit__(
        self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None
    ) -> None:
        """Disconnect to a device synchronously when leaving a context manager."""
        self.disconnect()

    @property
    def firmware_date(self) -> date:
        """Date the firmware was built."""
        return date.fromisoformat(self._info[DEVICEAPI].properties.get("FirmwareDate", "1970-01-01")[:10])

    @property
    def firmware_version(self) -> str:
        """Firmware version currently installed."""
        return self._info[DEVICEAPI].properties.get("FirmwareVersion", "")

    @property
    def hostname(self) -> str:
        """mDNS hostname of the device."""  # noqa: D403
        return self._info[DEVICEAPI].hostname

    @property
    def password(self) -> str:
        """The currently set device password."""
        return self._password

    @password.setter
    def password(self, password: str) -> None:
        """Change the currently set device password."""
        self._password = password
        if self.device:
            self.device.password = password
        if self.plcnet:
            self.plcnet.password = password

    async def async_connect(self, session_instance: AsyncClient | None = None) -> None:
        """
        Connect to a device asynchronous.

        :param: session_instance: Session client instance to be potentially reused.
        """
        self._session_instance = session_instance
        self._session = self._session_instance or AsyncClient()
        if not self._zeroconf_instance:
            self._zeroconf = AsyncZeroconf(interfaces=await self._get_relevant_interfaces())
        elif isinstance(self._zeroconf_instance, Zeroconf):
            self._zeroconf = AsyncZeroconf(zc=self._zeroconf_instance)
        else:
            self._zeroconf = self._zeroconf_instance
        await self._get_zeroconf_info()
        if not self._info[DEVICEAPI].properties and not self._info[PLCNETAPI].properties:
            await self._retry_zeroconf_info()
        if not self.device and not self.plcnet:
            raise DeviceNotFound(self.ip)
        self._connected = True

    def connect(self) -> None:
        """Connect to a device synchronous."""
        asyncio.run(self.async_connect())

    async def async_disconnect(self) -> None:
        """Disconnect from a device asynchronous."""
        if self._connected:
            if self._browser:
                await self._browser.async_cancel()
            if not self._zeroconf_instance:
                await self._zeroconf.async_close()
            if not self._session_instance:
                await self._session.aclose()
            self._connected = False

    def disconnect(self) -> None:
        """Disconnect from a device synchronous."""
        asyncio.run(self.async_disconnect())

    async def _get_relevant_interfaces(self) -> list[str]:
        """Get the IP address of the relevant interface to reduce traffic."""
        interface: list[str] = []
        for adapter in get_adapters():
            interface.extend(
                cast("str", ip.ip)
                for ip in adapter.ips
                if ip.is_IPv4 and ip_address(self.ip) in ip_network(f"{ip.ip}/{ip.network_prefix}", strict=False)
            )
            interface.extend(
                cast("str", ip.ip[0])
                for ip in adapter.ips
                if ip.is_IPv6 and ip_address(self.ip) in ip_network(f"{ip.ip[0]}/{ip.network_prefix}", strict=False)
            )
        return interface

    async def _get_device_info(self) -> None:
        """Get information from the devolo Device API."""
        service_type = DEVICEAPI
        if self._info[service_type].properties:
            self.mt_number = self._info[service_type].properties.get("MT", "0")
            self.product = self._info[service_type].properties.get("Product", "")
            self.serial_number = self._info[service_type].properties["SN"]
            self.device = DeviceApi(
                ip=str(ip_address(self._info[service_type].address)),
                session=self._session,
                info=self._info[service_type],
            )
            self.device.password = self.password

    async def _get_plcnet_info(self) -> None:
        """Get information from the devolo PlcNet API."""
        service_type = PLCNETAPI
        if self._info[service_type].properties:
            self.mac = self._info[service_type].properties["PlcMacAddress"]
            self.technology = self._info[service_type].properties.get("PlcTechnology", "")
            self.plcnet = PlcNetApi(
                ip=str(ip_address(self._info[service_type].address)),
                session=self._session,
                info=self._info[service_type],
            )
            self.plcnet.password = self.password

    async def _get_zeroconf_info(self) -> None:
        """Browse for the desired mDNS service types and query them."""
        service_types = [DEVICEAPI, PLCNETAPI]
        counter = 0
        self._logger.debug("Browsing for %s", service_types)
        addr = None if self._multicast else self.ip
        question_type = DNSQuestionType.QM if self._multicast else DNSQuestionType.QU
        if self._browser:
            await self._browser.async_cancel()
            self._browser = None
        self._browser = AsyncServiceBrowser(
            zeroconf=self._zeroconf.zeroconf,
            type_=service_types,
            handlers=[self._state_change],
            addr=addr,
            question_type=question_type,
        )
        while (
            not self._info[DEVICEAPI].properties
            or (not self._info[PLCNETAPI].properties and self.mt_number not in DEVICES_WITHOUT_PLCNET)
        ) and counter < self.MDNS_TIMEOUT:
            counter += 1
            await asyncio.sleep(0.01)

    async def _retry_zeroconf_info(self) -> None:
        """Retry getting the zeroconf info using multicast."""
        self._logger.debug("Having trouble getting results via unicast messages. Switching to multicast for this device.")
        self._multicast = True
        await self._get_zeroconf_info()

    def _state_change(self, zeroconf: Zeroconf, service_type: str, name: str, state_change: ServiceStateChange) -> None:
        """Evaluate the query result."""
        if state_change == ServiceStateChange.Removed:
            return
        task = asyncio.create_task(self._get_service_info(zeroconf, service_type, name))
        self._background_tasks.add(task)
        task.add_done_callback(self._background_tasks.remove)

    async def _get_service_info(self, zeroconf: Zeroconf, service_type: str, name: str) -> None:
        """Get service information, if IP matches."""
        service_info = AsyncServiceInfo(service_type, name)
        update = {
            DEVICEAPI: self._get_device_info,
            PLCNETAPI: self._get_plcnet_info,
        }
        with suppress(RuntimeError):
            if not self._multicast:
                await service_info.async_request(zeroconf, timeout=1000, question_type=DNSQuestionType.QU, addr=self.ip)
            else:
                await service_info.async_request(zeroconf, timeout=1000, question_type=DNSQuestionType.QM)

        if not service_info.addresses or self.ip not in service_info.parsed_addresses():
            return  # No need to continue, if there are no relevant service information

        self._logger.debug("Updating service info of %s for %s", service_type, service_info.server_key)
        if info := self.info_from_service(service_info):
            self._info[service_type] = info
            await update[service_type]()

    @staticmethod
    def info_from_service(service_info: ServiceInfo) -> ZeroconfServiceInfo | None:
        """Return prepared info from mDNS entries."""
        properties = {}
        total_length = len(service_info.text)
        offset = 0
        while offset < total_length:
            (parsed_length,) = unpack_from("!B", service_info.text, offset)
            key_value = service_info.text[offset + 1 : offset + 1 + parsed_length].decode("UTF-8").split("=")
            properties[key_value[0]] = key_value[1]
            offset += parsed_length + 1

        return ZeroconfServiceInfo(
            address=service_info.addresses[0],
            hostname=service_info.server or "",
            port=service_info.port,
            properties=properties,
        )