1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161
|
"""
Module for Serialization and Deserialization of a KNX Connect Request information.
Connect requests are used to start a new tunnel connection on a KNX/IP device.
"""
from __future__ import annotations
from xknx.exceptions import CouldNotParseKNXIP
from xknx.telegram import IndividualAddress
from .body import KNXIPBody
from .hpai import HPAI
from .knxip_enum import ConnectRequestType, KNXIPServiceType, TunnellingLayer
class ConnectRequest(KNXIPBody):
"""Representation of a KNX Connect Request."""
SERVICE_TYPE = KNXIPServiceType.CONNECT_REQUEST
def __init__(
self,
control_endpoint: HPAI | None = None,
data_endpoint: HPAI | None = None,
cri: ConnectRequestInformation | None = None,
) -> None:
"""Initialize ConnectRequest object."""
self.control_endpoint = control_endpoint or HPAI()
self.data_endpoint = data_endpoint or HPAI()
self.cri = cri or ConnectRequestInformation()
def calculated_length(self) -> int:
"""Get length of KNX/IP body."""
return HPAI.LENGTH + HPAI.LENGTH + self.cri.calculated_length()
def from_knx(self, raw: bytes) -> int:
"""Parse/deserialize from KNX/IP raw data."""
pos = self.control_endpoint.from_knx(raw)
pos += self.data_endpoint.from_knx(raw[pos:])
pos += self.cri.from_knx(raw[pos:])
return pos
def to_knx(self) -> bytes:
"""Serialize to KNX/IP raw data."""
return (
self.control_endpoint.to_knx()
+ self.data_endpoint.to_knx()
+ self.cri.to_knx()
)
def __repr__(self) -> str:
"""Return object as readable string."""
return (
"<ConnectRequest "
f'control_endpoint="{self.control_endpoint}" '
f'data_endpoint="{self.data_endpoint}" '
f'cri="{self.cri}" />'
)
class ConnectRequestInformation:
"""
Representation of a KNX Connect Request Information (CRI).
A Basic CRI requests a tunnel without requesting any specific IA.
Using `individual_address` yields an Extended CRI which is only
supported by Tunnelling v2 devices.
"""
CRI_LENGTH = 2
CRI_TUNNEL_LENGTH = 4
CRI_TUNNEL_EXT_LENGTH = 6
def __init__(
self,
connection_type: ConnectRequestType = ConnectRequestType.TUNNEL_CONNECTION,
knx_layer: TunnellingLayer = TunnellingLayer.DATA_LINK_LAYER,
individual_address: IndividualAddress | None = None,
) -> None:
"""Initialize ConnectRequest object."""
self.connection_type = connection_type
self.knx_layer = knx_layer
self.individual_address = individual_address
def _is_tunnel_cri(self) -> bool:
return self.connection_type == ConnectRequestType.TUNNEL_CONNECTION
def calculated_length(self) -> int:
"""Get length of KNX/IP body."""
if self._is_tunnel_cri():
return (
ConnectRequestInformation.CRI_TUNNEL_EXT_LENGTH
if self.individual_address
else ConnectRequestInformation.CRI_TUNNEL_LENGTH
)
return ConnectRequestInformation.CRI_LENGTH
def from_knx(self, raw: bytes) -> int:
"""Parse/deserialize from KNX/IP raw data."""
cri_length = raw[0]
if len(raw) < cri_length:
raise CouldNotParseKNXIP("CRI data has wrong length")
if cri_length < ConnectRequestInformation.CRI_LENGTH:
raise CouldNotParseKNXIP("CRI length too small")
self.connection_type = ConnectRequestType(raw[1])
if self._is_tunnel_cri():
if cri_length == ConnectRequestInformation.CRI_TUNNEL_LENGTH:
extended = False
elif cri_length == ConnectRequestInformation.CRI_TUNNEL_EXT_LENGTH:
extended = True
else:
raise CouldNotParseKNXIP("CRI has wrong length")
self.knx_layer = TunnellingLayer(raw[2])
self.individual_address = (
IndividualAddress.from_knx(raw[4:6]) if extended else None
)
elif cri_length != ConnectRequestInformation.CRI_LENGTH:
raise CouldNotParseKNXIP("CRI has wrong length")
return cri_length
def to_knx(self) -> bytes:
"""Serialize to KNX/IP raw data."""
_cri = bytes(
(
self.calculated_length(),
self.connection_type.value,
)
)
if self._is_tunnel_cri():
_cri = _cri + bytes(
(
self.knx_layer.value,
0x00, # Reserved
)
)
if self.individual_address:
_cri = _cri + self.individual_address.to_knx()
return _cri
def __eq__(self, other: object) -> bool:
"""Equal operator."""
return self.__dict__ == other.__dict__
def __repr__(self) -> str:
"""Return object as readable string."""
_tunnel_layer = (
f'knx_layer="{self.knx_layer.name}" ' if self._is_tunnel_cri() else ""
)
_extended = (
f'individual_address="{self.individual_address}" '
if self._is_tunnel_cri() and self.individual_address
else ""
)
return (
"<ConnectRequestInformation "
f'connection_type="{self.connection_type.name}" '
f"{_tunnel_layer}"
f"{_extended}/>"
)
|