File: request_response.py

package info (click to toggle)
python-xknx 3.6.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 4,012 kB
  • sloc: python: 39,710; javascript: 8,556; makefile: 27; sh: 12
file content (92 lines) | stat: -rw-r--r-- 3,657 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
"""
Base class for sending a specific type of KNX/IP Packet to a KNX/IP device and wait for the corresponding answer.

Will report if the corresponding answer was not received.
"""

from __future__ import annotations

import asyncio
import logging

from xknx.exceptions import CommunicationError
from xknx.io.transport import KNXIPTransport
from xknx.knxip import HPAI, ErrorCode, KNXIPBody, KNXIPBodyResponse, KNXIPFrame
from xknx.util import asyncio_timeout

logger = logging.getLogger("xknx.log")


class RequestResponse:
    """Class for sending a specific type of KNX/IP Packet to a KNX/IP device and wait for the corresponding answer."""

    def __init__(
        self,
        transport: KNXIPTransport,
        awaited_response_class: type[KNXIPBody],
        timeout_in_seconds: float = 1.0,
    ) -> None:
        """Initialize RequstResponse class."""
        self.transport = transport
        self.awaited_response_class: type[KNXIPBody] = awaited_response_class
        self.response_received_event = asyncio.Event()
        self.success = False
        self.timeout_in_seconds = timeout_in_seconds

        self.response_status_code: ErrorCode | None = None

    def create_knxipframe(self) -> KNXIPFrame:
        """Create KNX/IP Frame object to be sent to device."""
        raise NotImplementedError("create_knxipframe has to be implemented")

    async def start(self) -> None:
        """Start. Send request and wait for an answer."""
        callb = self.transport.register_callback(
            self.response_rec_callback, [self.awaited_response_class.SERVICE_TYPE]
        )
        try:
            await self.send_request()
            async with asyncio_timeout(self.timeout_in_seconds):
                await self.response_received_event.wait()
        except asyncio.TimeoutError:
            logger.debug(
                "Error: KNX bus did not respond in time (%s secs) to request of type '%s'",
                self.timeout_in_seconds,
                self.__class__.__name__,
            )
        except CommunicationError as err:
            logger.warning(
                "Sending request of type '%s' failed: %s", self.__class__.__name__, err
            )
        finally:
            # cleanup to not leave callbacks (for asyncio.CancelledError)
            self.transport.unregister_callback(callb)

    async def send_request(self) -> None:
        """Build knxipframe (within derived class) and send via transport."""
        self.transport.send(self.create_knxipframe())

    def response_rec_callback(
        self, knxipframe: KNXIPFrame, source: HPAI, _: KNXIPTransport
    ) -> None:
        """Verify and handle knxipframe. Callback from internal transport."""
        if not isinstance(knxipframe.body, self.awaited_response_class):
            logger.warning("Could not understand knxipframe")
            return
        self.response_received_event.set()

        if isinstance(knxipframe.body, KNXIPBodyResponse):
            self.response_status_code = knxipframe.body.status_code
            if knxipframe.body.status_code != ErrorCode.E_NO_ERROR:
                logger.debug(
                    "Error: KNX bus responded to request of type '%s' with error in '%s': %s",
                    self.__class__.__name__,
                    self.awaited_response_class.__name__,
                    knxipframe.body.status_code,
                )
                return
        self.success = True
        self.on_success_hook(knxipframe)

    def on_success_hook(self, knxipframe: KNXIPFrame) -> None:
        """Do something after having received a valid answer. May be overwritten in derived class."""