File: __init__.py

package info (click to toggle)
python-bellows 0.40.5-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 992 kB
  • sloc: python: 13,630; sh: 7; makefile: 4
file content (92 lines) | stat: -rw-r--r-- 3,283 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
""""EZSP Protocol version 5 protocol handler."""
from __future__ import annotations

import logging
from typing import AsyncGenerator

import voluptuous as vol

import bellows.config
import bellows.types as t

from . import commands, config
from ..v4 import EZSPv4

LOGGER = logging.getLogger(__name__)


class EZSPv5(EZSPv4):
    """EZSP Version 5 Protocol version handler."""

    VERSION = 5
    COMMANDS = commands.COMMANDS
    SCHEMAS = {
        bellows.config.CONF_EZSP_CONFIG: vol.Schema(config.EZSP_SCHEMA),
        bellows.config.CONF_EZSP_POLICIES: vol.Schema(config.EZSP_POLICIES_SCH),
    }

    def _ezsp_frame_tx(self, name: str) -> bytes:
        """Serialize the frame id."""
        cmd_id = self.COMMANDS[name][0]
        frame = [self._seq, 0x00, 0xFF, 0x00, cmd_id]
        return bytes(frame)

    def _ezsp_frame_rx(self, data: bytes) -> tuple[int, int, bytes]:
        """Handler for received data frame."""
        return data[0], data[4], data[5:]

    async def add_transient_link_key(
        self, ieee: t.EUI64, key: t.KeyData
    ) -> t.sl_Status:
        (status,) = await self.addTransientLinkKey(partner=ieee, transientKey=key)
        return t.sl_Status.from_ember_status(status)

    async def pre_permit(self, time_s: int) -> None:
        """Add pre-shared TC Link key."""
        wild_card_ieee = t.EUI64.convert("FF:FF:FF:FF:FF:FF:FF:FF")
        tc_link_key = t.KeyData(b"ZigBeeAlliance09")
        await self.add_transient_link_key(wild_card_ieee, tc_link_key)

    async def read_address_table(self) -> AsyncGenerator[tuple[t.NWK, t.EUI64], None]:
        (status, addr_table_size) = await self.getConfigurationValue(
            t.EzspConfigId.CONFIG_ADDRESS_TABLE_SIZE
        )

        for idx in range(addr_table_size):
            (nwk,) = await self.getAddressTableRemoteNodeId(addressTableIndex=idx)

            # Ignore invalid NWK entries
            if nwk in t.EmberDistinguishedNodeId.__members__.values():
                continue

            (eui64,) = await self.getAddressTableRemoteEui64(addressTableIndex=idx)

            if eui64 in (
                t.EUI64.convert("00:00:00:00:00:00:00:00"),
                t.EUI64.convert("FF:FF:FF:FF:FF:FF:FF:FF"),
            ):
                continue

            yield nwk, eui64

    async def write_nwk_frame_counter(self, frame_counter: t.uint32_t) -> None:
        # Frame counters can only be set *before* we have joined a network
        (state,) = await self.networkState()
        assert state == t.EmberNetworkStatus.NO_NETWORK

        (status,) = await self.setValue(
            valueId=t.EzspValueId.VALUE_NWK_FRAME_COUNTER,
            value=t.uint32_t(frame_counter).serialize(),
        )
        assert t.sl_Status.from_ember_status(status) == t.sl_Status.OK

    async def write_aps_frame_counter(self, frame_counter: t.uint32_t) -> None:
        # Frame counters can only be set *before* we have joined a network
        (state,) = await self.networkState()
        assert state == t.EmberNetworkStatus.NO_NETWORK

        (status,) = await self.setValue(
            valueId=t.EzspValueId.VALUE_APS_FRAME_COUNTER,
            value=t.uint32_t(frame_counter).serialize(),
        )
        assert t.sl_Status.from_ember_status(status) == t.sl_Status.OK