1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162
|
"""
Module for serialization and deserialization of KNX/IP packets.
It consists of a header and a body.
Depending on the service_type_ident different types of body classes are instantiated.
"""
from __future__ import annotations
from xknx.exceptions import CouldNotParseKNXIP, IncompleteKNXIPFrame
from .body import KNXIPBody
from .connect_request import ConnectRequest
from .connect_response import ConnectResponse
from .connectionstate_request import ConnectionStateRequest
from .connectionstate_response import ConnectionStateResponse
from .description_request import DescriptionRequest
from .description_response import DescriptionResponse
from .device_configuration_ack import DeviceConfigurationAck
from .device_configuration_request import DeviceConfigurationRequest
from .disconnect_request import DisconnectRequest
from .disconnect_response import DisconnectResponse
from .header import KNXIPHeader
from .knxip_enum import KNXIPServiceType
from .routing_busy import RoutingBusy
from .routing_indication import RoutingIndication
from .routing_lost_message import RoutingLostMessage
from .search_request import SearchRequest
from .search_request_extended import SearchRequestExtended
from .search_response import SearchResponse
from .search_response_extended import SearchResponseExtended
from .secure_wrapper import SecureWrapper
from .session_authenticate import SessionAuthenticate
from .session_request import SessionRequest
from .session_response import SessionResponse
from .session_status import SessionStatus
from .timer_notify import TimerNotify
from .tunnelling_ack import TunnellingAck
from .tunnelling_feature import (
TunnellingFeatureGet,
TunnellingFeatureInfo,
TunnellingFeatureResponse,
TunnellingFeatureSet,
)
from .tunnelling_request import TunnellingRequest
class KNXIPFrame:
"""Class for KNX/IP Frames."""
def __init__(self, header: KNXIPHeader, body: KNXIPBody) -> None:
"""Initialize object."""
self.header = header
self.body = body
@staticmethod
def init_from_body(knxip_body: KNXIPBody) -> KNXIPFrame:
"""Return KNXIPFrame from KNXIPBody."""
header = KNXIPHeader()
header.service_type_ident = knxip_body.__class__.SERVICE_TYPE
header.set_length(knxip_body)
return KNXIPFrame(header=header, body=knxip_body)
@staticmethod
def from_knx(data: bytes) -> tuple[KNXIPFrame, bytes]:
"""
Parse/deserialize from KNX/IP raw data.
Returns a tuple of the KNXIPFrame and the rest of the data.
Raises IncompleteKNXIPFrame if the data is not enough to parse the KNXIPFrame
or CouldNotParseKNXIP on parsing error.
"""
header = KNXIPHeader()
pos_body = header.from_knx(data)
if len(data) < header.total_length:
raise IncompleteKNXIPFrame("Incomplete data for KNXIPFrame")
# limit data to self.header.total_length for streaming socket data
raw_body = data[pos_body : header.total_length]
body: KNXIPBody
# Core
if header.service_type_ident == KNXIPServiceType.SEARCH_REQUEST:
body = SearchRequest()
elif header.service_type_ident == KNXIPServiceType.SEARCH_REQUEST_EXTENDED:
body = SearchRequestExtended()
elif header.service_type_ident == KNXIPServiceType.SEARCH_RESPONSE:
body = SearchResponse()
elif header.service_type_ident == KNXIPServiceType.SEARCH_RESPONSE_EXTENDED:
body = SearchResponseExtended()
elif header.service_type_ident == KNXIPServiceType.DESCRIPTION_REQUEST:
body = DescriptionRequest()
elif header.service_type_ident == KNXIPServiceType.DESCRIPTION_RESPONSE:
body = DescriptionResponse()
elif header.service_type_ident == KNXIPServiceType.CONNECT_REQUEST:
body = ConnectRequest()
elif header.service_type_ident == KNXIPServiceType.CONNECT_RESPONSE:
body = ConnectResponse()
elif header.service_type_ident == KNXIPServiceType.CONNECTIONSTATE_REQUEST:
body = ConnectionStateRequest()
elif header.service_type_ident == KNXIPServiceType.CONNECTIONSTATE_RESPONSE:
body = ConnectionStateResponse()
elif header.service_type_ident == KNXIPServiceType.DISCONNECT_REQUEST:
body = DisconnectRequest()
elif header.service_type_ident == KNXIPServiceType.DISCONNECT_RESPONSE:
body = DisconnectResponse()
# Device Management
elif header.service_type_ident == KNXIPServiceType.DEVICE_CONFIGURATION_REQUEST:
body = DeviceConfigurationRequest()
elif header.service_type_ident == KNXIPServiceType.DEVICE_CONFIGURATION_ACK:
body = DeviceConfigurationAck()
# Tunnelling
elif header.service_type_ident == KNXIPServiceType.TUNNELLING_REQUEST:
body = TunnellingRequest()
elif header.service_type_ident == KNXIPServiceType.TUNNELLING_ACK:
body = TunnellingAck()
elif header.service_type_ident == KNXIPServiceType.TUNNELLING_FEATURE_GET:
body = TunnellingFeatureGet()
elif header.service_type_ident == KNXIPServiceType.TUNNELLING_FEATURE_INFO:
body = TunnellingFeatureInfo()
elif header.service_type_ident == KNXIPServiceType.TUNNELLING_FEATURE_RESPONSE:
body = TunnellingFeatureResponse()
elif header.service_type_ident == KNXIPServiceType.TUNNELLING_FEATURE_SET:
body = TunnellingFeatureSet()
# Routing
elif header.service_type_ident == KNXIPServiceType.ROUTING_INDICATION:
body = RoutingIndication()
elif header.service_type_ident == KNXIPServiceType.ROUTING_BUSY:
body = RoutingBusy()
elif header.service_type_ident == KNXIPServiceType.ROUTING_LOST_MESSAGE:
body = RoutingLostMessage()
# Secure
elif header.service_type_ident == KNXIPServiceType.SECURE_WRAPPER:
body = SecureWrapper()
elif header.service_type_ident == KNXIPServiceType.SESSION_AUTHENTICATE:
body = SessionAuthenticate()
elif header.service_type_ident == KNXIPServiceType.SESSION_REQUEST:
body = SessionRequest()
elif header.service_type_ident == KNXIPServiceType.SESSION_RESPONSE:
body = SessionResponse()
elif header.service_type_ident == KNXIPServiceType.SESSION_STATUS:
body = SessionStatus()
elif header.service_type_ident == KNXIPServiceType.TIMER_NOTIFY:
body = TimerNotify()
else:
raise CouldNotParseKNXIP(
f"KNXIPServiceType not implemented: {header.service_type_ident.name}"
)
body.from_knx(raw_body)
return KNXIPFrame(header=header, body=body), data[header.total_length :]
def to_knx(self) -> bytes:
"""Serialize to KNX/IP raw data."""
return self.header.to_knx() + self.body.to_knx()
def __repr__(self) -> str:
"""Return object as readable string."""
return f'<KNXIPFrame {self.header} body="{self.body}" />'
def __eq__(self, other: object) -> bool:
"""Equal operator."""
return self.__dict__ == other.__dict__
|