File: cemi_handler.py

package info (click to toggle)
python-xknx 3.14.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 4,064 kB
  • sloc: python: 40,895; javascript: 8,556; makefile: 32; sh: 12
file content (168 lines) | stat: -rw-r--r-- 6,584 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
"""
CEMI Frame handler.

This class represents a CEMI Client vaguely according to KNX specification 3/6/3 §4.1.2.
It is responsible for sending and receiving CEMI frames to/from a CEMI Server - this
can be a remote server when using IP tunnelling or a local server when using IP routing.
"""

from __future__ import annotations

import asyncio
import logging
from typing import TYPE_CHECKING

from xknx.exceptions import (
    CommunicationError,
    ConfirmationError,
    ConversionError,
    CouldNotParseCEMI,
    DataSecureError,
    UnsupportedCEMIMessage,
)
from xknx.secure.data_secure import DataSecure, is_data_secure
from xknx.secure.keyring import Keyring
from xknx.telegram import IndividualAddress, Telegram, TelegramDirection, tpci
from xknx.util import asyncio_timeout

from .cemi_frame import CEMIFrame, CEMILData
from .const import CEMIMessageCode

if TYPE_CHECKING:
    from xknx.xknx import XKNX

logger = logging.getLogger("xknx.cemi")
data_secure_logger = logging.getLogger("xknx.data_secure")

# See 3/6/3 EMI_IMI §4.1.5 Data Link Layer messages
REQUEST_TO_CONFIRMATION_TIMEOUT = 3


class CEMIHandler:
    """Class for handling CEMI frames from/to the TelegramQueue."""

    __slots__ = ("_l_data_confirmation_event", "data_secure", "xknx")

    def __init__(self, xknx: XKNX) -> None:
        """Initialize CEMIHandler class."""
        self.xknx = xknx
        self.data_secure: DataSecure | None = None
        self._l_data_confirmation_event = asyncio.Event()

    def data_secure_init(self, keyring: Keyring | None) -> None:
        """Initialize DataSecure."""
        if keyring is None:
            self.data_secure = None
        else:
            self.data_secure = DataSecure.init_from_keyring(keyring)

    async def send_telegram(self, telegram: Telegram) -> None:
        """Create a CEMIFrame from a Telegram and send it to the CEMI Server."""
        cemi_data = CEMILData.init_from_telegram(
            telegram=telegram,
            src_addr=(
                self.xknx.current_address if telegram.source_address.raw == 0 else None
            ),
        )
        cemi = CEMIFrame(
            code=CEMIMessageCode.L_DATA_REQ,
            data=cemi_data,
        )

        logger.debug("Outgoing CEMI: %s", cemi)
        if self.data_secure is not None:
            cemi.data = self.data_secure.outgoing_cemi(cemi_data=cemi_data)
        self._l_data_confirmation_event.clear()
        try:
            await self.xknx.knxip_interface.send_cemi(cemi)
        except (ConversionError, CommunicationError) as ex:
            logger.warning("Could not send CEMI frame: %s for %s", ex, cemi)
            self.xknx.connection_manager.cemi_count_outgoing_error += 1
            raise ex

        try:
            async with asyncio_timeout(REQUEST_TO_CONFIRMATION_TIMEOUT):
                await self._l_data_confirmation_event.wait()
        except asyncio.TimeoutError:
            self.xknx.connection_manager.cemi_count_outgoing_error += 1
            raise ConfirmationError(
                f"L_DATA_CON Data Link Layer confirmation timed out for {cemi}"
            ) from None
        self.xknx.connection_manager.cemi_count_outgoing += 1

    def handle_raw_cemi(self, raw_cemi: bytes) -> None:
        """Parse and handle incoming raw CEMI Frames."""
        try:
            cemi = CEMIFrame.from_knx(raw_cemi)
        except CouldNotParseCEMI as cemi_parse_err:
            logger.warning("CEMI Frame failed to parse: %s", cemi_parse_err)
            self.xknx.connection_manager.cemi_count_incoming_error += 1
            return
        except UnsupportedCEMIMessage as unsupported_cemi_err:
            logger.info("CEMI not supported: %s", unsupported_cemi_err)
            self.xknx.connection_manager.cemi_count_incoming_error += 1
            return
        self.handle_cemi_frame(cemi)

    def handle_cemi_frame(self, cemi: CEMIFrame) -> None:
        """Handle incoming CEMI Frames."""
        if not isinstance(cemi.data, CEMILData):
            logger.debug("Ignoring incoming non-link-layer CEMI: %s", cemi)
            return

        if cemi.code is CEMIMessageCode.L_DATA_CON:
            # L_DATA_CON confirmation frame signals ready to send next telegram
            self._l_data_confirmation_event.set()
            logger.debug("Incoming CEMI confirmation: %s", cemi)
            return
        if cemi.code is CEMIMessageCode.L_DATA_REQ:
            # L_DATA_REQ frames should only be outgoing.
            logger.warning("Received unexpected L_DATA_REQ frame: %s", cemi)
            self.xknx.connection_manager.cemi_count_incoming_error += 1
            return
        logger.debug("Incoming CEMI: %s", cemi)
        self.xknx.connection_manager.cemi_count_incoming += 1

        if self.data_secure is None:
            if is_data_secure(cemi.data):
                data_secure_logger.debug(
                    "Received DataSecure encrypted CEMI frame but no keys for DataSecure are initialized: %s",
                    cemi,
                )
                self.handle_data_secure_key_issue(cemi.data)
                return
        else:
            try:
                cemi.data = self.data_secure.received_cemi(cemi_data=cemi.data)
            except DataSecureError as err:
                data_secure_logger.log(
                    err.log_level,
                    "Could not decrypt CEMI frame: %s",
                    err,
                )
                self.handle_data_secure_key_issue(cemi.data)
                return

        telegram = cemi.data.telegram()
        telegram.direction = TelegramDirection.INCOMING
        self.telegram_received(telegram)

    def telegram_received(self, telegram: Telegram) -> None:
        """Forward Telegram to upper layer."""
        if isinstance(telegram.tpci, tpci.TDataGroup):
            self.xknx.telegrams.put_nowait(telegram)
            return
        if (
            isinstance(telegram.destination_address, IndividualAddress)
            and telegram.destination_address != self.xknx.current_address
        ):
            return
        self.xknx.management.process(telegram)

    def handle_data_secure_key_issue(self, cemi_data: CEMILData) -> None:
        """Handle DataSecure telegrams with missing or invalid keys."""
        self.xknx.connection_manager.undecoded_data_secure += 1
        if isinstance(cemi_data.tpci, tpci.TDataGroup):
            telegram = cemi_data.telegram()
            telegram.direction = TelegramDirection.INCOMING
            self.xknx.telegram_queue.received_data_secure_group_key_issue(telegram)