File: connect_response.py

package info (click to toggle)
python-xknx 3.6.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 4,012 kB
  • sloc: python: 39,710; javascript: 8,556; makefile: 27; sh: 12
file content (144 lines) | stat: -rw-r--r-- 4,985 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
"""
Module for Serialization and Deserialization of a KNX Connect Response information.

Connect requests are used to start a new tunnel connection on a KNX/IP device.
With a Connect Response the receiving party acknowledges the valid processing of the request,
assigns a communication channel and an individual address for the client.
"""

from __future__ import annotations

from xknx.exceptions import CouldNotParseKNXIP
from xknx.telegram import IndividualAddress

from .body import KNXIPBodyResponse
from .error_code import ErrorCode
from .hpai import HPAI
from .knxip_enum import ConnectRequestType, KNXIPServiceType


class ConnectResponse(KNXIPBodyResponse):
    """Representation of a KNX Connect Response."""

    SERVICE_TYPE = KNXIPServiceType.CONNECT_RESPONSE

    def __init__(
        self,
        communication_channel: int = 0,
        status_code: ErrorCode = ErrorCode.E_NO_ERROR,
        data_endpoint: HPAI | None = None,
        crd: ConnectResponseData | None = None,
    ) -> None:
        """Initialize ConnectResponse class."""
        self.communication_channel = communication_channel
        self.status_code = status_code
        self.data_endpoint = data_endpoint or HPAI()
        self.crd = crd or ConnectResponseData()

    def calculated_length(self) -> int:
        """Get length of KNX/IP body."""
        return 2 + HPAI.LENGTH + self.crd.calculated_length()

    def from_knx(self, raw: bytes) -> int:
        """Parse/deserialize from KNX/IP raw data."""

        self.communication_channel = raw[0]
        self.status_code = ErrorCode(raw[1])
        pos = 2

        if self.status_code == ErrorCode.E_NO_ERROR:
            pos += self.data_endpoint.from_knx(raw[pos:])
            pos += self.crd.from_knx(raw[pos:])
        else:
            # do not parse HPAI and CRD in case of errors - just check length
            pos = len(raw)
        return pos

    def to_knx(self) -> bytes:
        """Serialize to KNX/IP raw data."""

        return (
            bytes((self.communication_channel, self.status_code.value))
            + self.data_endpoint.to_knx()
            + self.crd.to_knx()
        )

    def __repr__(self) -> str:
        """Return object as readable string."""
        return (
            "<ConnectResponse "
            f'communication_channel="{self.communication_channel}" '
            f'status_code="{self.status_code}" '
            f'data_endpoint="{self.data_endpoint}" '
            f'crd="{self.crd}" />'
        )


class ConnectResponseData:
    """Representation of a KNX Connect Response Data block (CRD)."""

    CRD_LENGTH = 2
    CRD_TUNNEL_LENGTH = 4

    def __init__(
        self,
        request_type: ConnectRequestType = ConnectRequestType.TUNNEL_CONNECTION,
        individual_address: IndividualAddress | None = None,
    ) -> None:
        """Initialize ConnectResponseData object."""
        self.request_type = request_type
        self.individual_address = individual_address

    def _is_tunnel_crd(self) -> bool:
        return self.request_type == ConnectRequestType.TUNNEL_CONNECTION

    def calculated_length(self) -> int:
        """Get length of KNX/IP body."""
        return (
            ConnectResponseData.CRD_TUNNEL_LENGTH
            if self._is_tunnel_crd()
            else ConnectResponseData.CRD_LENGTH
        )

    def from_knx(self, raw: bytes) -> int:
        """Parse/deserialize from KNX/IP raw data."""
        crd_length = raw[0]
        if len(raw) < crd_length:
            raise CouldNotParseKNXIP("CRD has wrong length")
        if crd_length < ConnectResponseData.CRD_LENGTH:
            raise CouldNotParseKNXIP("CRD length too small")
        self.request_type = ConnectRequestType(raw[1])
        if self._is_tunnel_crd():
            if crd_length != ConnectResponseData.CRD_TUNNEL_LENGTH:
                raise CouldNotParseKNXIP("CRD has wrong length")
            self.individual_address = IndividualAddress.from_knx(raw[2:4])
        elif crd_length != ConnectResponseData.CRD_LENGTH:
            raise CouldNotParseKNXIP("CRD has wrong length")
        return crd_length

    def to_knx(self) -> bytes:
        """Serialize CRD (Connect Response Data Block)."""
        _crd = bytes(
            (
                self.calculated_length(),
                self.request_type.value,
            )
        )
        if self._is_tunnel_crd():
            assert self.individual_address is not None
            return _crd + self.individual_address.to_knx()

        return _crd

    def __eq__(self, other: object) -> bool:
        """Equal operator."""
        return self.__dict__ == other.__dict__

    def __repr__(self) -> str:
        """Return object as readable string."""
        _address = (
            f'individual_address="{self.individual_address}" '
            if self._is_tunnel_crd() and self.individual_address
            else ""
        )
        return f'<ConnectResponseData request_type="{self.request_type}" {_address}/>'