File: srp.py

package info (click to toggle)
python-xknx 3.6.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 4,012 kB
  • sloc: python: 39,710; javascript: 8,556; makefile: 27; sh: 12
file content (163 lines) | stat: -rw-r--r-- 5,852 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
"""
Module for handling Search Request Parameters (SRP).

This can be used e.g. to restrict the set of devices that are expected to respond or to influence the type of DIBs
which the client is interested in.

If the M (Mandatory) bit is set, the server shall only respond to the search request if the complete SRP block
is satisfied.
"""

from __future__ import annotations

from xknx.exceptions import ConversionError, CouldNotParseKNXIP

from .knxip_enum import DIBServiceFamily, DIBTypeCode, SearchRequestParameterType


class SRP:
    """Search request parameter for a SearchRequestExtended."""

    SRP_HEADER_SIZE = 2
    MANDATORY_BIT_INDEX = 0x07

    # service family and service version have 1 byte each
    SERVICE_PAYLOAD_LENGTH = 2
    # mac address has 6 bytes
    MAC_ADDRESS_PAYLOAD_LENGTH = 6

    def __init__(
        self,
        srp_type: SearchRequestParameterType,
        mandatory: bool = True,
        data: bytes = b"",
    ) -> None:
        """Initialize a SRP."""
        self.type = srp_type
        self.mandatory = mandatory
        self.data = data
        self.payload_size = SRP.SRP_HEADER_SIZE

        if self.type == SearchRequestParameterType.SELECT_BY_SERVICE:
            self.validate_payload_length(SRP.SERVICE_PAYLOAD_LENGTH)
            self.payload_size = SRP.SRP_HEADER_SIZE + SRP.SERVICE_PAYLOAD_LENGTH
        elif self.type == SearchRequestParameterType.SELECT_BY_MAC_ADDRESS:
            self.validate_payload_length(SRP.MAC_ADDRESS_PAYLOAD_LENGTH)
            self.payload_size = SRP.SRP_HEADER_SIZE + SRP.MAC_ADDRESS_PAYLOAD_LENGTH
        elif self.type == SearchRequestParameterType.REQUEST_DIBS:
            if not self.data:
                raise ConversionError("Srp DIBs are not present.")
            # If the Client is interested in an odd number of DIBs
            # it shall add an additional Description Type 0 to make the structure length even
            if len(self.data) % 2:
                self.data += bytes([0])
            self.payload_size = SRP.SRP_HEADER_SIZE + len(self.data)

    def validate_payload_length(self, size: int) -> None:
        """Validate the length of the payload."""
        if not self.data or len(self.data) != size:
            raise ConversionError(
                "Srp parameter payload size does not match.",
                expected_size=size,
                srp_type=self.type,
            )

    def __len__(self) -> int:
        """Get the payload length."""
        return self.payload_size

    def __bytes__(self) -> bytes:
        """Convert this SRP to a bytes object."""
        return (
            bytes(
                [
                    self.payload_size,
                    (
                        self.mandatory << SRP.MANDATORY_BIT_INDEX
                        | self.type.value & SRP.MANDATORY_BIT_INDEX
                    ),
                ]
            )
            + self.data
        )

    @staticmethod
    def from_knx(data: bytes) -> SRP:
        """Convert the bytes to a SRP object."""
        if len(data) < SRP.SRP_HEADER_SIZE:
            raise CouldNotParseKNXIP("Data too short for SRP object.")

        size: int = data[0]
        if size > len(data):
            raise CouldNotParseKNXIP("SRP is larger than actual data size.")

        return SRP(
            srp_type=SearchRequestParameterType(data[1] & 0x7F),
            mandatory=bool(data[1] >> SRP.MANDATORY_BIT_INDEX),
            data=data[2:size],
        )

    @staticmethod
    def with_programming_mode() -> SRP:
        """Create a SRP that limits the response to only devices that are currently in programming mode."""
        return SRP(
            srp_type=SearchRequestParameterType.SELECT_BY_PROGRAMMING_MODE,
            mandatory=True,
        )

    @staticmethod
    def with_mac_address(mac_address: bytes) -> SRP:
        """
        Create a SRP that limits the response to only allow a device with the given MAC address.

        :param mac_address: The mac address to restrict this SRP to
        """
        return SRP(
            srp_type=SearchRequestParameterType.SELECT_BY_MAC_ADDRESS,
            mandatory=True,
            data=mac_address,
        )

    @staticmethod
    def with_service(family: DIBServiceFamily, family_version: int) -> SRP:
        """
        Create a SRP that limits the response to only allow devices that support the given service family.

        :param family: DIBServiceFamily that this SRP should be limited to
        :param family_version: The minimum family version so that devices will send a search response extended back
        :return: Srp with the given parameter
        """
        return SRP(
            srp_type=SearchRequestParameterType.SELECT_BY_SERVICE,
            mandatory=True,
            data=bytes([family.value, family_version]),
        )

    @staticmethod
    def request_device_description(dibs: list[DIBTypeCode]) -> SRP:
        """
        Create a SRP with a list of DIBs the server shall include in the response.

        The server may include in addition any number of other DIBs in the response.
        The server shall ignore Description types that are not recognized or not supported.

        :param dibs: the description types to include in the SRP
        :return: Srp with given parameters
        """
        return SRP(
            srp_type=SearchRequestParameterType.REQUEST_DIBS,
            mandatory=False,
            data=bytes(dib.value for dib in dibs),
        )

    def __eq__(self, other: object) -> bool:
        """Define equality for SRPs."""
        if not isinstance(other, SRP):
            return False

        return (
            self.payload_size == other.payload_size
            and self.type == other.type
            and self.mandatory == other.mandatory
            and self.data == other.data
        )