File: types.py

package info (click to toggle)
zigpy-deconz 0.25.5-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 360 kB
  • sloc: python: 3,911; makefile: 5
file content (238 lines) | stat: -rw-r--r-- 5,864 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
"""Data types module."""


import zigpy.types as zigpy_t
from zigpy.types import (  # noqa: F401
    EUI64,
    NWK,
    ExtendedPanId,
    IntStruct,
    LongOctetString,
    LVBytes,
    LVList,
    PanId,
    Struct,
    bitmap3,
    bitmap5,
    bitmap6,
    bitmap8,
    bitmap16,
    enum2,
    enum3,
    enum8,
    int8s,
    uint8_t,
    uint16_t,
    uint32_t,
    uint64_t,
)


def serialize_dict(data, schema):
    chunks = []

    for key in schema:
        value = data[key]
        if value is None:
            break

        if not isinstance(value, schema[key]):
            value = schema[key](value)

        chunks.append(value.serialize())

    return b"".join(chunks)


def deserialize_dict(data, schema):
    result = {}
    for name, type_ in schema.items():
        try:
            result[name], data = type_.deserialize(data)
        except ValueError:
            if data:
                raise

            result[name] = None
    return result, data


def list_replace(lst: list, old: object, new: object) -> list:
    """Replace all occurrences of `old` with `new` in `lst`."""
    return [new if x == old else x for x in lst]


class Bytes(bytes):
    def serialize(self):
        return self

    @classmethod
    def deserialize(cls, data):
        return cls(data), b""


class AddressMode(enum8):
    # Address modes used in deconz protocol

    GROUP = 0x01
    NWK = 0x02
    IEEE = 0x03
    NWK_AND_IEEE = 0x04


class DeconzSendDataFlags(bitmap8):
    NONE = 0x00
    NODE_ID = 0x01
    RELAYS = 0x02


class DeconzTransmitOptions(bitmap8):
    NONE = 0x00
    SECURITY_ENABLED = 0x01
    USE_NWK_KEY_SECURITY = 0x02
    USE_APS_ACKS = 0x04
    ALLOW_FRAGMENTATION = 0x08


class NWKList(LVList):
    _length_type = uint8_t
    _item_type = NWK


ZIGPY_ADDR_MODE_MAPPING = {
    zigpy_t.AddrMode.NWK: AddressMode.NWK,
    zigpy_t.AddrMode.IEEE: AddressMode.IEEE,
    zigpy_t.AddrMode.Group: AddressMode.GROUP,
    zigpy_t.AddrMode.Broadcast: AddressMode.NWK,
}


ZIGPY_ADDR_TYPE_MAPPING = {
    zigpy_t.AddrMode.NWK: NWK,
    zigpy_t.AddrMode.IEEE: EUI64,
    zigpy_t.AddrMode.Group: NWK,
    zigpy_t.AddrMode.Broadcast: NWK,
}


ZIGPY_ADDR_MODE_REVERSE_MAPPING = {
    AddressMode.NWK: zigpy_t.AddrMode.NWK,
    AddressMode.IEEE: zigpy_t.AddrMode.IEEE,
    AddressMode.GROUP: zigpy_t.AddrMode.Group,
    AddressMode.NWK_AND_IEEE: zigpy_t.AddrMode.IEEE,
}


ZIGPY_ADDR_TYPE_REVERSE_MAPPING = {
    AddressMode.NWK: zigpy_t.NWK,
    AddressMode.IEEE: zigpy_t.EUI64,
    AddressMode.GROUP: zigpy_t.Group,
    AddressMode.NWK_AND_IEEE: zigpy_t.NWK,
}


class DeconzAddress(Struct):
    address_mode: AddressMode
    address: EUI64
    ieee: EUI64

    @classmethod
    def deserialize(cls, data):
        r = cls()
        mode, data = AddressMode.deserialize(data)
        r.address_mode = mode
        if mode in [AddressMode.GROUP, AddressMode.NWK, AddressMode.NWK_AND_IEEE]:
            r.address, data = NWK.deserialize(data)
        elif mode == AddressMode.IEEE:
            r.address, data = EUI64.deserialize(data)
        if mode == AddressMode.NWK_AND_IEEE:
            r.ieee, data = EUI64.deserialize(data)
        return r, data

    def serialize(self):
        r = self.address_mode.serialize() + self.address.serialize()
        if self.address_mode == AddressMode.NWK_AND_IEEE:
            r += self.ieee.serialize()
        return r

    def as_zigpy_type(self):
        addr_mode = ZIGPY_ADDR_MODE_REVERSE_MAPPING[self.address_mode]
        address = ZIGPY_ADDR_TYPE_REVERSE_MAPPING[self.address_mode](self.address)

        if self.address_mode == AddressMode.NWK and self.address > 0xFFF7:
            addr_mode = zigpy_t.AddrMode.Broadcast
            address = zigpy_t.BroadcastAddress(self.address)
        elif self.address_mode == AddressMode.NWK_AND_IEEE:
            address = zigpy_t.EUI64(self.ieee)

        return zigpy_t.AddrModeAddress(
            addr_mode=addr_mode,
            address=address,
        )

    @classmethod
    def from_zigpy_type(cls, addr):
        instance = cls()
        instance.address_mode = ZIGPY_ADDR_MODE_MAPPING[addr.addr_mode]
        instance.address = ZIGPY_ADDR_TYPE_MAPPING[addr.addr_mode](addr.address)

        return instance


class DeconzAddressEndpoint(Struct):
    address_mode: AddressMode
    address: EUI64
    ieee: EUI64
    endpoint: uint8_t

    @classmethod
    def deserialize(cls, data):
        r, data = DeconzAddress.deserialize.__func__(cls, data)

        if r.address_mode in (
            AddressMode.NWK,
            AddressMode.IEEE,
            AddressMode.NWK_AND_IEEE,
        ):
            r.endpoint, data = uint8_t.deserialize(data)
        else:
            r.endpoint = None

        return r, data

    def serialize(self):
        r = uint8_t(self.address_mode).serialize()

        if self.address_mode in (AddressMode.NWK, AddressMode.NWK_AND_IEEE):
            r += NWK(self.address).serialize()
        elif self.address_mode == AddressMode.GROUP:
            r += NWK(self.address).serialize()

        if self.address_mode in (AddressMode.IEEE, AddressMode.NWK_AND_IEEE):
            r += EUI64(self.address).serialize()

        if self.address_mode in (
            AddressMode.NWK,
            AddressMode.IEEE,
            AddressMode.NWK_AND_IEEE,
        ):
            r += uint8_t(self.endpoint).serialize()

        return r

    @classmethod
    def from_zigpy_type(cls, addr, endpoint):
        temp_addr = DeconzAddress.from_zigpy_type(addr)

        instance = cls()
        instance.address_mode = temp_addr.address_mode
        instance.address = temp_addr.address
        instance.endpoint = endpoint

        return instance


class DataIndicationFlags(bitmap8):
    Always_Use_NWK_Source_Addr = 0b00000001
    Last_Hop_In_Reserved_Bytes = 0b00000010
    Include_Both_NWK_And_IEEE = 0b00000100