File: self_description.py

package info (click to toggle)
python-xknx 3.10.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 4,044 kB
  • sloc: python: 40,087; javascript: 8,556; makefile: 32; sh: 12
file content (202 lines) | stat: -rw-r--r-- 7,043 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
"""Abstraction to send DescriptionRequest and wait for DescriptionResponse."""

from __future__ import annotations

from abc import ABC, abstractmethod
import asyncio
import logging
from typing import TYPE_CHECKING, Final

from xknx.exceptions import CommunicationError, XKNXException
from xknx.io import util
from xknx.io.gateway_scanner import GatewayDescriptor
from xknx.knxip import (
    HPAI,
    SRP,
    DescriptionRequest,
    DescriptionResponse,
    DIBTypeCode,
    KNXIPFrame,
    SearchRequestExtended,
    SearchResponseExtended,
)
from xknx.util import asyncio_timeout

from .const import DEFAULT_MCAST_PORT
from .transport import UDPTransport

if TYPE_CHECKING:
    from xknx.io.transport import KNXIPTransport

logger = logging.getLogger("xknx.log")

DESCRIPTION_TIMEOUT: Final = 2


async def request_description(
    gateway_ip: str,
    gateway_port: int = DEFAULT_MCAST_PORT,
    local_ip: str | None = None,
    local_port: int = 0,
    route_back: bool = False,
) -> GatewayDescriptor:
    """Set up a UDP transport to request a description from a KNXnet/IP device."""
    local_ip = local_ip or util.find_local_ip(gateway_ip)
    if local_ip is None:
        # Fall back to default interface and use route back
        local_ip = await util.get_default_local_ip(gateway_ip)
        if local_ip is None:
            raise CommunicationError(
                f"No network interface found to request gateway info from {gateway_ip}:{gateway_port}"
            )
        route_back = True
    try:
        local_ip = await util.validate_ip(local_ip, address_name="Local IP")
        gateway_ip = await util.validate_ip(gateway_ip, address_name="Gateway IP")
    except XKNXException as err:
        raise CommunicationError("Invalid address") from err

    transport = UDPTransport(
        local_addr=(local_ip, local_port),
        remote_addr=(gateway_ip, gateway_port),
        multicast=False,
    )
    try:
        await transport.connect()
    except OSError as err:
        raise CommunicationError(
            "Could not setup socket to request gateway info"
        ) from err
    else:
        local_hpai: HPAI
        if route_back:
            local_hpai = HPAI()
        else:
            local_addr = transport.getsockname()
            local_hpai = HPAI(*local_addr)

        description_query = DescriptionQuery(
            transport=transport,
            local_hpai=local_hpai,
        )
        await description_query.start()
        gateway = description_query.gateway_descriptor
        if gateway is None:
            raise CommunicationError(
                f"Could not fetch gateway info from {gateway_ip}:{gateway_port}"
            )
        if gateway.core_version >= 2:
            search_extended_query = SearchExtendedQuery(
                transport=transport,
                local_hpai=local_hpai,
            )
            await search_extended_query.start()
            gateway = search_extended_query.gateway_descriptor
            if gateway is None:
                raise CommunicationError(
                    f"Could not fetch extended gateway info from {gateway_ip}:{gateway_port}"
                )
        return gateway
    finally:
        transport.stop()


class _SelfDescriptionQuery(ABC):
    """Base class for handling descriptions request-response cycles."""

    expected_response_class: type[DescriptionResponse] | type[SearchResponseExtended]

    def __init__(
        self,
        transport: KNXIPTransport,
        local_hpai: HPAI,
    ) -> None:
        """Initialize Description class."""
        self.transport = transport
        self.local_hpai = local_hpai

        self.gateway_descriptor: GatewayDescriptor | None = None
        self.response_received_event = asyncio.Event()

    @abstractmethod
    def create_knxipframe(self) -> KNXIPFrame:
        """Create KNX/IP Frame object to be sent to device."""

    async def start(self) -> None:
        """Start. Send request and wait for an answer."""
        callb = self.transport.register_callback(
            self.response_rec_callback, [self.expected_response_class.SERVICE_TYPE]
        )
        frame = self.create_knxipframe()
        try:
            self.transport.send(frame)
            async with asyncio_timeout(DESCRIPTION_TIMEOUT):
                await self.response_received_event.wait()
        except asyncio.TimeoutError:
            logger.debug(
                "Error: KNX bus did not respond in time (%s secs) to request of type '%s'",
                DESCRIPTION_TIMEOUT,
                self.__class__.__name__,
            )
        except CommunicationError as err:
            logger.warning("Sending %s failed: %s", frame.body.__class__.__name__, err)
        finally:
            # cleanup to not leave callbacks (for asyncio.CancelledError)
            self.transport.unregister_callback(callb)

    def response_rec_callback(
        self, knxipframe: KNXIPFrame, source: HPAI, _: KNXIPTransport
    ) -> None:
        """Verify and handle knxipframe. Callback from internal transport."""
        if not isinstance(knxipframe.body, self.expected_response_class):
            logger.warning(
                "Wrong knxipframe for %s: %s", self.__class__.__name__, knxipframe
            )
            return
        self.response_received_event.set()
        # Set gateway descriptor attribute
        gateway = GatewayDescriptor(
            ip_addr=self.transport.remote_addr[0],
            port=self.transport.remote_addr[1],
            local_ip=self.transport.getsockname()[0],
        )
        gateway.parse_dibs(knxipframe.body.dibs)  # type: ignore[attr-defined]
        self.gateway_descriptor = gateway


class DescriptionQuery(_SelfDescriptionQuery):
    """Class to send a DescriptionRequest and wait for DescriptionResponse."""

    expected_response_class = DescriptionResponse

    def create_knxipframe(self) -> KNXIPFrame:
        """Create KNX/IP Frame object to be sent to device."""
        description_request = DescriptionRequest(control_endpoint=self.local_hpai)
        return KNXIPFrame.init_from_body(description_request)


class SearchExtendedQuery(_SelfDescriptionQuery):
    """
    Class to send a SearchRequestExtended and wait for SearchResponseExtended to a single device.

    Does only work with UDP transports.
    """

    expected_response_class = SearchResponseExtended

    def create_knxipframe(self) -> KNXIPFrame:
        """Create KNX/IP Frame object to be sent to device."""
        search_extended_request = SearchRequestExtended(
            discovery_endpoint=self.local_hpai,
            srps=[
                SRP.request_device_description(
                    [
                        DIBTypeCode.DEVICE_INFO,
                        DIBTypeCode.SUPP_SVC_FAMILIES,
                        DIBTypeCode.SECURED_SERVICE_FAMILIES,
                        DIBTypeCode.TUNNELING_INFO,
                    ]
                )
            ],
        )
        return KNXIPFrame.init_from_body(search_extended_request)