File: data_secure.py

package info (click to toggle)
python-xknx 3.6.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 4,012 kB
  • sloc: python: 39,710; javascript: 8,556; makefile: 27; sh: 12
file content (259 lines) | stat: -rw-r--r-- 10,451 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
"""Module for KNX Data Secure."""

from __future__ import annotations

from collections.abc import Iterator
from contextlib import contextmanager
from copy import copy
from datetime import datetime, timezone
import logging
import time

from xknx.cemi import CEMILData
from xknx.exceptions import DataSecureError
from xknx.telegram.address import GroupAddress, IndividualAddress
from xknx.telegram.apci import APCI, SecureAPDU

from .data_secure_asdu import (
    SecureData,
    SecurityAlgorithmIdentifier,
    SecurityALService,
    SecurityControlField,
)
from .keyring import Keyring

_LOGGER = logging.getLogger("xknx.data_secure")

# Same timedelta in milliseconds as used in Falcon used for initial sequence_number_sending
# py3.10 backwards compatibility - py3.11 "2018-01-05T00:00:00Z" is supported
_SEQUENCE_NUMBER_INIT_TIMESTAMP = datetime.fromisoformat(
    "2018-01-05T00:00:00+00:00"
).timestamp()


def _initial_sequence_number() -> int:
    """Return an initial sequence number for sending Data Secure Telegrams."""
    return int((time.time() - _SEQUENCE_NUMBER_INIT_TIMESTAMP) * 1000)


class DataSecure:
    """Class for KNX Data Secure handling."""

    __slots__ = (
        "_group_key_table",
        "_individual_address_table",
        "_sequence_number_sending",
    )

    def __init__(
        self,
        *,
        group_key_table: dict[GroupAddress, bytes],
        individual_address_table: dict[IndividualAddress, int],
        last_sequence_number_sending: int | None = None,
    ) -> None:
        """Initialize DataSecure class."""
        self._group_key_table = group_key_table
        self._individual_address_table = individual_address_table
        self._sequence_number_sending = (
            last_sequence_number_sending or _initial_sequence_number()
        )
        # Holds the last valid sequence number for each individual address.
        # Use sequence_number from keyfile as initial value or 0 from senders for all IAs ?

        if not 0 < self._sequence_number_sending < 0xFFFFFFFFFFFF:
            _local_time_info = (
                f" Local time not set properly? {datetime.now(timezone.utc).isoformat()}"
                if not last_sequence_number_sending
                else ""
            )
            raise DataSecureError(
                f"Initial sequence number out of range: {self._sequence_number_sending}"
                f"{_local_time_info}"
            )
        _LOGGER.info(
            "Data Secure initialized for %s group addresses from %s individual addresses.",
            len(self._group_key_table),
            len(self._individual_address_table),
        )
        _LOGGER.debug(
            "Data Secure initial sequence number: %s, groups: %s, senders: %s",
            self._sequence_number_sending,
            [str(ga) for ga in self._group_key_table],
            [str(ia) for ia in self._individual_address_table],
        )

    @staticmethod
    def init_from_keyring(keyring: Keyring) -> DataSecure | None:
        """
        Initialize DataSecure from Keyring.

        Return None if no Data Secure information is found in the Keyring.
        """
        ga_key_table = keyring.get_data_secure_group_keys()
        ia_seq_table = keyring.get_data_secure_senders()
        # TODO: persist local individual_address_table and update from that file on start
        #       to have more fresh initial sequence numbers
        if not ga_key_table:
            return None
        return DataSecure(
            group_key_table=ga_key_table,
            individual_address_table=ia_seq_table,
        )

    def get_sequence_number(self) -> int:
        """Return current sequence number sending and increment local stored value."""
        seq_nr = self._sequence_number_sending
        self._sequence_number_sending += 1
        return seq_nr

    @contextmanager
    def check_sequence_number(
        self, source_address: IndividualAddress, received_sequence_number: int
    ) -> Iterator[None]:
        """
        Check the last valid sequence number for incoming frames from `source_address`.

        Update the Security Individual Address Table if no further exception is raised.
        Raise `DataSecureError` if sequence number is invalid or sender is not known.
        """
        try:
            last_valid_sequence_number = self._individual_address_table[source_address]
        except KeyError:
            raise DataSecureError(
                f"Source address not found in Security Individual Address Table: {source_address}",
                log_level=logging.INFO,
            ) from None
        if not received_sequence_number > last_valid_sequence_number:
            # TODO: implement and increment Security Failure Log counter (not when equal)
            raise DataSecureError(
                f"Sequence number too low for {source_address}: "
                f"{received_sequence_number} received, {last_valid_sequence_number} last valid",
                log_level=logging.WARNING,
            )

        yield
        # Don't increment sequence number if exception is raised while decrypting (yield)
        self._individual_address_table[source_address] = received_sequence_number

    def received_cemi(self, cemi_data: CEMILData) -> CEMILData:
        """Handle received CEMI frame."""
        # Data Secure frame
        if isinstance(cemi_data.payload, SecureAPDU):
            return self._received_secure_cemi(cemi_data, cemi_data.payload)
        # Plain group communication frame
        if isinstance(cemi_data.dst_addr, GroupAddress):
            if cemi_data.dst_addr in self._group_key_table:
                raise DataSecureError(
                    f"Discarding frame with plain APDU for secure group address: {cemi_data}",
                    log_level=logging.WARNING,
                )
            return cemi_data
        # Plain point-to-point frame
        #   No point to point key table is implemented at the moment as ETS can't even configure this
        #   only way to communicate point-to-point with data secure currently is with tool key
        #   - which we don't have
        return cemi_data

    def _received_secure_cemi(
        self, cemi_data: CEMILData, s_apdu: SecureAPDU
    ) -> CEMILData:
        """Handle received secured CEMI frame."""
        if s_apdu.scf.service is not SecurityALService.S_A_DATA:
            raise DataSecureError(
                f"Only SecurityALService.S_A_DATA supported {cemi_data}",
                log_level=logging.DEBUG,
            )
        if s_apdu.scf.system_broadcast or s_apdu.scf.tool_access:
            # TODO: handle incoming responses with tool key of sending device
            # when we can send with tool key
            raise DataSecureError(
                f"System broadcast and tool access not supported {cemi_data}",
                log_level=logging.DEBUG,
            )

        # Secure group communication frame
        if isinstance(cemi_data.dst_addr, GroupAddress):
            if not (key := self._group_key_table.get(cemi_data.dst_addr)):
                raise DataSecureError(
                    f"No key found for group address {cemi_data.dst_addr} from {cemi_data.src_addr}",
                    log_level=logging.INFO,
                )
        # Secure point-to-point frame
        else:
            # TODO: maybe possible to implement this over tool key
            raise DataSecureError(
                f"Secure Point-to-Point communication not supported {cemi_data}",
                log_level=logging.DEBUG,
            )

        with self.check_sequence_number(
            source_address=cemi_data.src_addr,
            received_sequence_number=int.from_bytes(
                s_apdu.secured_data.sequence_number_bytes, "big"
            ),
        ):
            _address_fields_raw = (
                cemi_data.src_addr.to_knx() + cemi_data.dst_addr.to_knx()
            )
            plain_apdu_raw = s_apdu.secured_data.get_plain_apdu(
                key=key,
                scf=s_apdu.scf,
                address_fields_raw=_address_fields_raw,
                frame_flags=cemi_data.flags,
                tpci=cemi_data.tpci,
            )
        decrypted_payload = APCI.from_knx(plain_apdu_raw)
        _LOGGER.debug("Unpacked APDU %s from %s", decrypted_payload, s_apdu)

        plain_cemi_data = copy(cemi_data)
        plain_cemi_data.payload = decrypted_payload
        return plain_cemi_data

    def outgoing_cemi(self, cemi_data: CEMILData) -> CEMILData:
        """Handle outgoing CEMI frame. Pass through as plain frame or encrypt."""
        # Outgoing  group communication frame
        if isinstance(cemi_data.dst_addr, GroupAddress):
            if key := self._group_key_table.get(cemi_data.dst_addr):
                scf = SecurityControlField(
                    algorithm=SecurityAlgorithmIdentifier.CCM_ENCRYPTION,
                    service=SecurityALService.S_A_DATA,
                    system_broadcast=False,
                    tool_access=False,
                )
                return self._secure_data_cemi(key=key, scf=scf, cemi_data=cemi_data)
            return cemi_data
        # Outgoing secure point-to-point frames are sent plain.
        # Data Secure point-to-point is not supported.
        return cemi_data

    def _secure_data_cemi(
        self,
        key: bytes,
        scf: SecurityControlField,
        cemi_data: CEMILData,
    ) -> CEMILData:
        """Wrap encrypted payload of a plain CEMILData in a SecureAPDU."""
        plain_apdu_raw: bytes | bytearray

        if cemi_data.payload is not None:
            plain_apdu_raw = cemi_data.payload.to_knx()
        else:
            # TODO: test if this is correct
            plain_apdu_raw = b""  # used in point-to-point eg. TConnect
        secure_asdu = SecureData.init_from_plain_apdu(
            key=key,
            apdu=plain_apdu_raw,
            scf=scf,
            sequence_number=self.get_sequence_number(),
            address_fields_raw=cemi_data.src_addr.to_knx()
            + cemi_data.dst_addr.to_knx(),
            frame_flags=cemi_data.flags,
            tpci=cemi_data.tpci,
        )
        secure_cemi_data = copy(cemi_data)
        secure_cemi_data.payload = SecureAPDU(scf=scf, secured_data=secure_asdu)
        _LOGGER.debug(
            "Secured APDU %s with %s", cemi_data.payload, secure_cemi_data.payload
        )
        return secure_cemi_data