1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168
|
"""
CEMI Frame handler.
This class represents a CEMI Client vaguely according to KNX specification 3/6/3 §4.1.2.
It is responsible for sending and receiving CEMI frames to/from a CEMI Server - this
can be a remote server when using IP tunnelling or a local server when using IP routing.
"""
from __future__ import annotations
import asyncio
import logging
from typing import TYPE_CHECKING
from xknx.exceptions import (
CommunicationError,
ConfirmationError,
ConversionError,
CouldNotParseCEMI,
DataSecureError,
UnsupportedCEMIMessage,
)
from xknx.secure.data_secure import DataSecure, is_data_secure
from xknx.secure.keyring import Keyring
from xknx.telegram import IndividualAddress, Telegram, TelegramDirection, tpci
from xknx.util import asyncio_timeout
from .cemi_frame import CEMIFrame, CEMILData
from .const import CEMIMessageCode
if TYPE_CHECKING:
from xknx.xknx import XKNX
logger = logging.getLogger("xknx.cemi")
data_secure_logger = logging.getLogger("xknx.data_secure")
# See 3/6/3 EMI_IMI §4.1.5 Data Link Layer messages
REQUEST_TO_CONFIRMATION_TIMEOUT = 3
class CEMIHandler:
"""Class for handling CEMI frames from/to the TelegramQueue."""
__slots__ = ("_l_data_confirmation_event", "data_secure", "xknx")
def __init__(self, xknx: XKNX) -> None:
"""Initialize CEMIHandler class."""
self.xknx = xknx
self.data_secure: DataSecure | None = None
self._l_data_confirmation_event = asyncio.Event()
def data_secure_init(self, keyring: Keyring | None) -> None:
"""Initialize DataSecure."""
if keyring is None:
self.data_secure = None
else:
self.data_secure = DataSecure.init_from_keyring(keyring)
async def send_telegram(self, telegram: Telegram) -> None:
"""Create a CEMIFrame from a Telegram and send it to the CEMI Server."""
cemi_data = CEMILData.init_from_telegram(
telegram=telegram,
src_addr=(
self.xknx.current_address if telegram.source_address.raw == 0 else None
),
)
cemi = CEMIFrame(
code=CEMIMessageCode.L_DATA_REQ,
data=cemi_data,
)
logger.debug("Outgoing CEMI: %s", cemi)
if self.data_secure is not None:
cemi.data = self.data_secure.outgoing_cemi(cemi_data=cemi_data)
self._l_data_confirmation_event.clear()
try:
await self.xknx.knxip_interface.send_cemi(cemi)
except (ConversionError, CommunicationError) as ex:
logger.warning("Could not send CEMI frame: %s for %s", ex, cemi)
self.xknx.connection_manager.cemi_count_outgoing_error += 1
raise ex
try:
async with asyncio_timeout(REQUEST_TO_CONFIRMATION_TIMEOUT):
await self._l_data_confirmation_event.wait()
except asyncio.TimeoutError:
self.xknx.connection_manager.cemi_count_outgoing_error += 1
raise ConfirmationError(
f"L_DATA_CON Data Link Layer confirmation timed out for {cemi}"
) from None
self.xknx.connection_manager.cemi_count_outgoing += 1
def handle_raw_cemi(self, raw_cemi: bytes) -> None:
"""Parse and handle incoming raw CEMI Frames."""
try:
cemi = CEMIFrame.from_knx(raw_cemi)
except CouldNotParseCEMI as cemi_parse_err:
logger.warning("CEMI Frame failed to parse: %s", cemi_parse_err)
self.xknx.connection_manager.cemi_count_incoming_error += 1
return
except UnsupportedCEMIMessage as unsupported_cemi_err:
logger.info("CEMI not supported: %s", unsupported_cemi_err)
self.xknx.connection_manager.cemi_count_incoming_error += 1
return
self.handle_cemi_frame(cemi)
def handle_cemi_frame(self, cemi: CEMIFrame) -> None:
"""Handle incoming CEMI Frames."""
if not isinstance(cemi.data, CEMILData):
logger.debug("Ignoring incoming non-link-layer CEMI: %s", cemi)
return
if cemi.code is CEMIMessageCode.L_DATA_CON:
# L_DATA_CON confirmation frame signals ready to send next telegram
self._l_data_confirmation_event.set()
logger.debug("Incoming CEMI confirmation: %s", cemi)
return
if cemi.code is CEMIMessageCode.L_DATA_REQ:
# L_DATA_REQ frames should only be outgoing.
logger.warning("Received unexpected L_DATA_REQ frame: %s", cemi)
self.xknx.connection_manager.cemi_count_incoming_error += 1
return
logger.debug("Incoming CEMI: %s", cemi)
self.xknx.connection_manager.cemi_count_incoming += 1
if self.data_secure is None:
if is_data_secure(cemi.data):
data_secure_logger.debug(
"Received DataSecure encrypted CEMI frame but no keys for DataSecure are initialized: %s",
cemi,
)
self.handle_data_secure_key_issue(cemi.data)
return
else:
try:
cemi.data = self.data_secure.received_cemi(cemi_data=cemi.data)
except DataSecureError as err:
data_secure_logger.log(
err.log_level,
"Could not decrypt CEMI frame: %s",
err,
)
self.handle_data_secure_key_issue(cemi.data)
return
telegram = cemi.data.telegram()
telegram.direction = TelegramDirection.INCOMING
self.telegram_received(telegram)
def telegram_received(self, telegram: Telegram) -> None:
"""Forward Telegram to upper layer."""
if isinstance(telegram.tpci, tpci.TDataGroup):
self.xknx.telegrams.put_nowait(telegram)
return
if (
isinstance(telegram.destination_address, IndividualAddress)
and telegram.destination_address != self.xknx.current_address
):
return
self.xknx.management.process(telegram)
def handle_data_secure_key_issue(self, cemi_data: CEMILData) -> None:
"""Handle DataSecure telegrams with missing or invalid keys."""
self.xknx.connection_manager.undecoded_data_secure += 1
if isinstance(cemi_data.tpci, tpci.TDataGroup):
telegram = cemi_data.telegram()
telegram.direction = TelegramDirection.INCOMING
self.xknx.telegram_queue.received_data_secure_group_key_issue(telegram)
|