1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144
|
"""
Module for Serialization and Deserialization of a KNX Connect Response information.
Connect requests are used to start a new tunnel connection on a KNX/IP device.
With a Connect Response the receiving party acknowledges the valid processing of the request,
assigns a communication channel and an individual address for the client.
"""
from __future__ import annotations
from xknx.exceptions import CouldNotParseKNXIP
from xknx.telegram import IndividualAddress
from .body import KNXIPBodyResponse
from .error_code import ErrorCode
from .hpai import HPAI
from .knxip_enum import ConnectRequestType, KNXIPServiceType
class ConnectResponse(KNXIPBodyResponse):
"""Representation of a KNX Connect Response."""
SERVICE_TYPE = KNXIPServiceType.CONNECT_RESPONSE
def __init__(
self,
communication_channel: int = 0,
status_code: ErrorCode = ErrorCode.E_NO_ERROR,
data_endpoint: HPAI | None = None,
crd: ConnectResponseData | None = None,
) -> None:
"""Initialize ConnectResponse class."""
self.communication_channel = communication_channel
self.status_code = status_code
self.data_endpoint = data_endpoint or HPAI()
self.crd = crd or ConnectResponseData()
def calculated_length(self) -> int:
"""Get length of KNX/IP body."""
return 2 + HPAI.LENGTH + self.crd.calculated_length()
def from_knx(self, raw: bytes) -> int:
"""Parse/deserialize from KNX/IP raw data."""
self.communication_channel = raw[0]
self.status_code = ErrorCode(raw[1])
pos = 2
if self.status_code == ErrorCode.E_NO_ERROR:
pos += self.data_endpoint.from_knx(raw[pos:])
pos += self.crd.from_knx(raw[pos:])
else:
# do not parse HPAI and CRD in case of errors - just check length
pos = len(raw)
return pos
def to_knx(self) -> bytes:
"""Serialize to KNX/IP raw data."""
return (
bytes((self.communication_channel, self.status_code.value))
+ self.data_endpoint.to_knx()
+ self.crd.to_knx()
)
def __repr__(self) -> str:
"""Return object as readable string."""
return (
"<ConnectResponse "
f'communication_channel="{self.communication_channel}" '
f'status_code="{self.status_code}" '
f'data_endpoint="{self.data_endpoint}" '
f'crd="{self.crd}" />'
)
class ConnectResponseData:
"""Representation of a KNX Connect Response Data block (CRD)."""
CRD_LENGTH = 2
CRD_TUNNEL_LENGTH = 4
def __init__(
self,
request_type: ConnectRequestType = ConnectRequestType.TUNNEL_CONNECTION,
individual_address: IndividualAddress | None = None,
) -> None:
"""Initialize ConnectResponseData object."""
self.request_type = request_type
self.individual_address = individual_address
def _is_tunnel_crd(self) -> bool:
return self.request_type == ConnectRequestType.TUNNEL_CONNECTION
def calculated_length(self) -> int:
"""Get length of KNX/IP body."""
return (
ConnectResponseData.CRD_TUNNEL_LENGTH
if self._is_tunnel_crd()
else ConnectResponseData.CRD_LENGTH
)
def from_knx(self, raw: bytes) -> int:
"""Parse/deserialize from KNX/IP raw data."""
crd_length = raw[0]
if len(raw) < crd_length:
raise CouldNotParseKNXIP("CRD has wrong length")
if crd_length < ConnectResponseData.CRD_LENGTH:
raise CouldNotParseKNXIP("CRD length too small")
self.request_type = ConnectRequestType(raw[1])
if self._is_tunnel_crd():
if crd_length != ConnectResponseData.CRD_TUNNEL_LENGTH:
raise CouldNotParseKNXIP("CRD has wrong length")
self.individual_address = IndividualAddress.from_knx(raw[2:4])
elif crd_length != ConnectResponseData.CRD_LENGTH:
raise CouldNotParseKNXIP("CRD has wrong length")
return crd_length
def to_knx(self) -> bytes:
"""Serialize CRD (Connect Response Data Block)."""
_crd = bytes(
(
self.calculated_length(),
self.request_type.value,
)
)
if self._is_tunnel_crd():
assert self.individual_address is not None
return _crd + self.individual_address.to_knx()
return _crd
def __eq__(self, other: object) -> bool:
"""Equal operator."""
return self.__dict__ == other.__dict__
def __repr__(self) -> str:
"""Return object as readable string."""
_address = (
f'individual_address="{self.individual_address}" '
if self._is_tunnel_crd() and self.individual_address
else ""
)
return f'<ConnectResponseData request_type="{self.request_type}" {_address}/>'
|