File: application.py

package info (click to toggle)
zigpy-zigate 0.13.3-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 288 kB
  • sloc: python: 2,167; makefile: 3
file content (346 lines) | stat: -rw-r--r-- 12,376 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
from __future__ import annotations

import asyncio
import importlib.metadata
import logging
from typing import Any

import zigpy.application
import zigpy.config
import zigpy.device
import zigpy.exceptions
import zigpy.types
import zigpy.util
import zigpy.zdo

from zigpy_zigate import common as c, types as t
from zigpy_zigate.api import (
    PDM_EVENT,
    CommandNotSupportedError,
    NoResponseError,
    ResponseId,
    ZiGate,
)

LIB_VERSION = importlib.metadata.version("zigpy-zigate")
LOGGER = logging.getLogger(__name__)


class ControllerApplication(zigpy.application.ControllerApplication):
    def __init__(self, config: dict[str, Any]):
        super().__init__(config)
        self._api: ZiGate | None = None

        self._pending = {}
        self._pending_join = []

        self.version: str = ""

    async def _watchdog_feed(self):
        await self._api.set_time()

    async def connect(self):
        api = await ZiGate.new(self._config[zigpy.config.CONF_DEVICE], self)

        try:
            await api.set_raw_mode()
            await api.set_time()

            (_, version), lqi = await api.version()
        except Exception:
            await api.disconnect()
            raise

        major, minor = version.to_bytes(2, "big")
        self.version = f"{major:x}.{minor:x}"

        self._api = api

        if self.version < "3.21":
            LOGGER.error(
                "Old ZiGate firmware detected, you should upgrade to 3.21 or newer"
            )

    async def disconnect(self):
        # TODO: how do you stop the network? Is it possible?
        if self._api is not None:
            try:
                await self._api.reset(wait=False)
            except Exception as e:
                LOGGER.warning("Failed to reset before disconnect: %s", e)
            finally:
                await self._api.disconnect()
                self._api = None

    async def start_network(self):
        # TODO: how do you start the network? Is it always automatically started?
        dev = self.add_device(
            ieee=self.state.node_info.ieee, nwk=self.state.node_info.nwk
        )
        await dev.schedule_initialize()

    async def load_network_info(self, *, load_devices: bool = False):
        network_state, lqi = await self._api.get_network_state()

        if not network_state or network_state[3] == 0 or network_state[0] == 0xFFFF:
            raise zigpy.exceptions.NetworkNotFormed()

        port = self._config[zigpy.config.CONF_DEVICE][zigpy.config.CONF_DEVICE_PATH]

        if c.is_zigate_wifi(port):
            model = "ZiGate WiFi"
        elif await c.async_is_pizigate(port):
            model = "PiZiGate"
        elif await c.async_is_zigate_din(port):
            model = "ZiGate USB-DIN"
        else:
            model = "ZiGate USB-TTL"

        self.state.node_info = zigpy.state.NodeInfo(
            nwk=zigpy.types.NWK(network_state[0]),
            ieee=zigpy.types.EUI64(network_state[1]),
            logical_type=zigpy.zdo.types.LogicalType.Coordinator,
            model=model,
            manufacturer="ZiGate",
            version=self.version,
        )

        epid, _ = zigpy.types.ExtendedPanId.deserialize(
            zigpy.types.uint64_t(network_state[3]).serialize()
        )

        try:
            network_key_data = await self._api.get_network_key()
            network_key = zigpy.state.Key(key=network_key_data)
        except CommandNotSupportedError:
            network_key = zigpy.state.Key()

        self.state.network_info = zigpy.state.NetworkInfo(
            source=f"zigpy-zigate@{LIB_VERSION}",
            extended_pan_id=epid,
            pan_id=zigpy.types.PanId(network_state[2]),
            nwk_update_id=0,
            nwk_manager_id=zigpy.types.NWK(0x0000),
            channel=network_state[4],
            channel_mask=zigpy.types.Channels.from_channel_list([network_state[4]]),
            security_level=5,
            network_key=network_key,
            # tc_link_key=zigpy.state.Key(),
            children=[],
            key_table=[],
            nwk_addresses={},
            stack_specific={},
            metadata={
                "zigate": {
                    "version": self.version,
                }
            },
        )

        self.state.network_info.tc_link_key.partner_ieee = self.state.node_info.ieee

        if not load_devices:
            return

        for device in await self._api.get_devices_list():
            if device.power_source != 0:  # only battery-powered devices
                continue

            ieee = zigpy.types.EUI64(device.ieee_addr)
            self.state.network_info.children.append(ieee)
            self.state.network_info.nwk_addresses[ieee] = zigpy.types.NWK(
                device.short_addr
            )

    async def reset_network_info(self):
        await self._api.erase_persistent_data()

    async def write_network_info(self, *, network_info, node_info):
        LOGGER.warning("Setting the pan_id is not supported by ZiGate")

        await self.reset_network_info()
        await self._api.set_channel(network_info.channel)

        epid, _ = zigpy.types.uint64_t.deserialize(
            network_info.extended_pan_id.serialize()
        )
        await self._api.set_extended_panid(epid)

        network_formed, lqi = await self._api.start_network()

        if network_formed[0] not in (
            t.Status.Success,
            t.Status.IncorrectParams,
            t.Status.Busy,
        ):
            raise zigpy.exceptions.FormationFailure(
                f"Unexpected error starting network: {network_formed!r}"
            )

        LOGGER.warning("Starting network got status %s, wait...", network_formed[0])
        for attempt in range(3):
            await asyncio.sleep(1)

            try:
                await self.load_network_info()
            except zigpy.exceptions.NetworkNotFormed as e:
                if attempt == 2:
                    raise zigpy.exceptions.FormationFailure() from e

    async def permit_with_link_key(self, node, link_key, time_s=60):
        LOGGER.warning("ZiGate does not support joins with link keys")

    async def _move_network_to_channel(
        self, new_channel: int, *, new_nwk_update_id: int
    ) -> None:
        """Moves the network to a new channel."""
        await self._api.set_channel(new_channel)

    async def energy_scan(
        self, channels: zigpy.types.Channels, duration_exp: int, count: int
    ) -> dict[int, float]:
        """Runs an energy detection scan and returns the per-channel scan results."""

        LOGGER.warning("Coordinator does not support energy scanning")
        return {c: 0 for c in channels}

    async def force_remove(self, dev):
        await self._api.remove_device(self.state.node_info.ieee, dev.ieee)

    async def add_endpoint(self, descriptor):
        # ZiGate does not support adding new endpoints
        pass

    def zigate_callback_handler(self, msg, response, lqi):
        LOGGER.debug("zigate_callback_handler %s %s", msg, response)

        if msg == ResponseId.LEAVE_INDICATION:
            nwk = 0
            ieee = zigpy.types.EUI64(response[0])
            self.handle_leave(nwk, ieee)
        elif msg == ResponseId.DEVICE_ANNOUNCE:
            nwk = response[0]
            ieee = zigpy.types.EUI64(response[1])
            parent_nwk = 0
            self.handle_join(nwk, ieee, parent_nwk)
            # Temporary disable two stages pairing due to firmware bug
            # rejoin = response[3]
            # if nwk in self._pending_join or rejoin:
            #     LOGGER.debug('Finish pairing {} (2nd device announce)'.format(nwk))
            #     if nwk in self._pending_join:
            #         self._pending_join.remove(nwk)
            #     self.handle_join(nwk, ieee, parent_nwk)
            # else:
            #     LOGGER.debug('Start pairing {} (1st device announce)'.format(nwk))
            #     self._pending_join.append(nwk)
        elif msg == ResponseId.DATA_INDICATION:
            (
                status,
                profile_id,
                cluster_id,
                src_ep,
                dst_ep,
                src,
                dst,
                payload,
            ) = response

            packet = zigpy.types.ZigbeePacket(
                src=src.to_zigpy_type()[0],
                src_ep=src_ep,
                dst=dst.to_zigpy_type()[0],
                dst_ep=dst_ep,
                profile_id=profile_id,
                cluster_id=cluster_id,
                data=zigpy.types.SerializableBytes(payload),
                lqi=lqi,
                rssi=None,
            )

            self.packet_received(packet)
        elif msg == ResponseId.ACK_DATA:
            LOGGER.debug("ACK Data received %s %s", response[4], response[0])
            # disabled because of https://github.com/fairecasoimeme/ZiGate/issues/324
            # self._handle_frame_failure(response[4], response[0])
        elif msg == ResponseId.APS_DATA_CONFIRM:
            LOGGER.debug(
                "ZPS Event APS data confirm, message routed to %s %s",
                response[3],
                response[0],
            )
        elif msg == ResponseId.PDM_EVENT:
            try:
                event = PDM_EVENT(response[0]).name
            except ValueError:
                event = "Unknown event"
            LOGGER.debug("PDM Event %s %s, record %s", response[0], event, response[1])
        elif msg == ResponseId.APS_DATA_CONFIRM_FAILED:
            LOGGER.debug("APS Data confirm Fail %s %s", response[4], response[0])
            self._handle_frame_failure(response[4], response[0])
        elif msg == ResponseId.EXTENDED_ERROR:
            LOGGER.warning("Extended error code %s", response[0])

    def _handle_frame_failure(self, message_tag, status):
        try:
            send_fut = self._pending.pop(message_tag)
            send_fut.set_result(status)
        except KeyError:
            LOGGER.warning("Unexpected message send failure")
        except asyncio.futures.InvalidStateError as exc:
            LOGGER.debug(
                "Invalid state on future - probably duplicate response: %s", exc
            )

    async def send_packet(self, packet):
        LOGGER.debug("Sending packet %r", packet)

        # Firmwares 3.1d and below allow a couple of _NO_ACK packets to send but all
        # subsequent ones will fail. ACKs must be enabled.
        ack = (
            zigpy.types.TransmitOptions.ACK in packet.tx_options
            or self.version <= "3.1d"
        )

        try:
            (status, tsn, packet_type, _), _ = await self._api.raw_aps_data_request(
                addr=packet.dst.address,
                src_ep=(
                    1 if packet.dst_ep is None or packet.dst_ep > 0 else 0
                ),  # ZiGate only support endpoint 1
                dst_ep=packet.dst_ep or 0,
                profile=packet.profile_id,
                cluster=packet.cluster_id,
                payload=packet.data.serialize(),
                addr_mode=t.ZIGPY_TO_ZIGATE_ADDR_MODE[packet.dst.addr_mode, ack],
                radius=packet.radius,
            )
        except NoResponseError:
            raise zigpy.exceptions.DeliveryError("ZiGate did not respond to command")

        self._pending[tsn] = asyncio.get_running_loop().create_future()

        if status != t.Status.Success:
            self._pending.pop(tsn)

            # Firmwares 3.1d and below fail to send packets on every request
            if status == t.Status.InvalidParameter and self.version <= "3.1d":
                pass
            else:
                raise zigpy.exceptions.DeliveryError(
                    f"Failed to send packet: {status!r}", status=status
                )

        # disabled because of https://github.com/fairecasoimeme/ZiGate/issues/324
        # try:
        #     v = await asyncio.wait_for(send_fut, 120)
        # except asyncio.TimeoutError:
        #     return 1, "timeout waiting for message %s send ACK" % (sequence, )
        # finally:
        #     self._pending.pop(tsn)
        # return v, "Message sent"

    async def permit_ncp(self, time_s=60):
        assert 0 <= time_s <= 254
        status, lqi = await self._api.permit_join(time_s)
        if status[0] != t.Status.Success:
            await self._api.reset()