File: knxip.py

package info (click to toggle)
python-xknx 3.6.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 4,012 kB
  • sloc: python: 39,710; javascript: 8,556; makefile: 27; sh: 12
file content (162 lines) | stat: -rw-r--r-- 7,272 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
"""
Module for serialization and deserialization of KNX/IP packets.

It consists of a header and a body.
Depending on the service_type_ident different types of body classes are instantiated.
"""

from __future__ import annotations

from xknx.exceptions import CouldNotParseKNXIP, IncompleteKNXIPFrame

from .body import KNXIPBody
from .connect_request import ConnectRequest
from .connect_response import ConnectResponse
from .connectionstate_request import ConnectionStateRequest
from .connectionstate_response import ConnectionStateResponse
from .description_request import DescriptionRequest
from .description_response import DescriptionResponse
from .device_configuration_ack import DeviceConfigurationAck
from .device_configuration_request import DeviceConfigurationRequest
from .disconnect_request import DisconnectRequest
from .disconnect_response import DisconnectResponse
from .header import KNXIPHeader
from .knxip_enum import KNXIPServiceType
from .routing_busy import RoutingBusy
from .routing_indication import RoutingIndication
from .routing_lost_message import RoutingLostMessage
from .search_request import SearchRequest
from .search_request_extended import SearchRequestExtended
from .search_response import SearchResponse
from .search_response_extended import SearchResponseExtended
from .secure_wrapper import SecureWrapper
from .session_authenticate import SessionAuthenticate
from .session_request import SessionRequest
from .session_response import SessionResponse
from .session_status import SessionStatus
from .timer_notify import TimerNotify
from .tunnelling_ack import TunnellingAck
from .tunnelling_feature import (
    TunnellingFeatureGet,
    TunnellingFeatureInfo,
    TunnellingFeatureResponse,
    TunnellingFeatureSet,
)
from .tunnelling_request import TunnellingRequest


class KNXIPFrame:
    """Class for KNX/IP Frames."""

    def __init__(self, header: KNXIPHeader, body: KNXIPBody) -> None:
        """Initialize object."""
        self.header = header
        self.body = body

    @staticmethod
    def init_from_body(knxip_body: KNXIPBody) -> KNXIPFrame:
        """Return KNXIPFrame from KNXIPBody."""
        header = KNXIPHeader()
        header.service_type_ident = knxip_body.__class__.SERVICE_TYPE
        header.set_length(knxip_body)
        return KNXIPFrame(header=header, body=knxip_body)

    @staticmethod
    def from_knx(data: bytes) -> tuple[KNXIPFrame, bytes]:
        """
        Parse/deserialize from KNX/IP raw data.

        Returns a tuple of the KNXIPFrame and the rest of the data.

        Raises IncompleteKNXIPFrame if the data is not enough to parse the KNXIPFrame
        or CouldNotParseKNXIP on parsing error.
        """
        header = KNXIPHeader()
        pos_body = header.from_knx(data)
        if len(data) < header.total_length:
            raise IncompleteKNXIPFrame("Incomplete data for KNXIPFrame")
        # limit data to self.header.total_length for streaming socket data
        raw_body = data[pos_body : header.total_length]

        body: KNXIPBody
        # Core
        if header.service_type_ident == KNXIPServiceType.SEARCH_REQUEST:
            body = SearchRequest()
        elif header.service_type_ident == KNXIPServiceType.SEARCH_REQUEST_EXTENDED:
            body = SearchRequestExtended()
        elif header.service_type_ident == KNXIPServiceType.SEARCH_RESPONSE:
            body = SearchResponse()
        elif header.service_type_ident == KNXIPServiceType.SEARCH_RESPONSE_EXTENDED:
            body = SearchResponseExtended()
        elif header.service_type_ident == KNXIPServiceType.DESCRIPTION_REQUEST:
            body = DescriptionRequest()
        elif header.service_type_ident == KNXIPServiceType.DESCRIPTION_RESPONSE:
            body = DescriptionResponse()
        elif header.service_type_ident == KNXIPServiceType.CONNECT_REQUEST:
            body = ConnectRequest()
        elif header.service_type_ident == KNXIPServiceType.CONNECT_RESPONSE:
            body = ConnectResponse()
        elif header.service_type_ident == KNXIPServiceType.CONNECTIONSTATE_REQUEST:
            body = ConnectionStateRequest()
        elif header.service_type_ident == KNXIPServiceType.CONNECTIONSTATE_RESPONSE:
            body = ConnectionStateResponse()
        elif header.service_type_ident == KNXIPServiceType.DISCONNECT_REQUEST:
            body = DisconnectRequest()
        elif header.service_type_ident == KNXIPServiceType.DISCONNECT_RESPONSE:
            body = DisconnectResponse()
        # Device Management
        elif header.service_type_ident == KNXIPServiceType.DEVICE_CONFIGURATION_REQUEST:
            body = DeviceConfigurationRequest()
        elif header.service_type_ident == KNXIPServiceType.DEVICE_CONFIGURATION_ACK:
            body = DeviceConfigurationAck()
        # Tunnelling
        elif header.service_type_ident == KNXIPServiceType.TUNNELLING_REQUEST:
            body = TunnellingRequest()
        elif header.service_type_ident == KNXIPServiceType.TUNNELLING_ACK:
            body = TunnellingAck()
        elif header.service_type_ident == KNXIPServiceType.TUNNELLING_FEATURE_GET:
            body = TunnellingFeatureGet()
        elif header.service_type_ident == KNXIPServiceType.TUNNELLING_FEATURE_INFO:
            body = TunnellingFeatureInfo()
        elif header.service_type_ident == KNXIPServiceType.TUNNELLING_FEATURE_RESPONSE:
            body = TunnellingFeatureResponse()
        elif header.service_type_ident == KNXIPServiceType.TUNNELLING_FEATURE_SET:
            body = TunnellingFeatureSet()
        # Routing
        elif header.service_type_ident == KNXIPServiceType.ROUTING_INDICATION:
            body = RoutingIndication()
        elif header.service_type_ident == KNXIPServiceType.ROUTING_BUSY:
            body = RoutingBusy()
        elif header.service_type_ident == KNXIPServiceType.ROUTING_LOST_MESSAGE:
            body = RoutingLostMessage()
        # Secure
        elif header.service_type_ident == KNXIPServiceType.SECURE_WRAPPER:
            body = SecureWrapper()
        elif header.service_type_ident == KNXIPServiceType.SESSION_AUTHENTICATE:
            body = SessionAuthenticate()
        elif header.service_type_ident == KNXIPServiceType.SESSION_REQUEST:
            body = SessionRequest()
        elif header.service_type_ident == KNXIPServiceType.SESSION_RESPONSE:
            body = SessionResponse()
        elif header.service_type_ident == KNXIPServiceType.SESSION_STATUS:
            body = SessionStatus()
        elif header.service_type_ident == KNXIPServiceType.TIMER_NOTIFY:
            body = TimerNotify()
        else:
            raise CouldNotParseKNXIP(
                f"KNXIPServiceType not implemented: {header.service_type_ident.name}"
            )
        body.from_knx(raw_body)
        return KNXIPFrame(header=header, body=body), data[header.total_length :]

    def to_knx(self) -> bytes:
        """Serialize to KNX/IP raw data."""
        return self.header.to_knx() + self.body.to_knx()

    def __repr__(self) -> str:
        """Return object as readable string."""
        return f'<KNXIPFrame {self.header} body="{self.body}" />'

    def __eq__(self, other: object) -> bool:
        """Equal operator."""
        return self.__dict__ == other.__dict__