1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92
|
"""
Base class for sending a specific type of KNX/IP Packet to a KNX/IP device and wait for the corresponding answer.
Will report if the corresponding answer was not received.
"""
from __future__ import annotations
import asyncio
import logging
from xknx.exceptions import CommunicationError
from xknx.io.transport import KNXIPTransport
from xknx.knxip import HPAI, ErrorCode, KNXIPBody, KNXIPBodyResponse, KNXIPFrame
from xknx.util import asyncio_timeout
logger = logging.getLogger("xknx.log")
class RequestResponse:
"""Class for sending a specific type of KNX/IP Packet to a KNX/IP device and wait for the corresponding answer."""
def __init__(
self,
transport: KNXIPTransport,
awaited_response_class: type[KNXIPBody],
timeout_in_seconds: float = 1.0,
) -> None:
"""Initialize RequstResponse class."""
self.transport = transport
self.awaited_response_class: type[KNXIPBody] = awaited_response_class
self.response_received_event = asyncio.Event()
self.success = False
self.timeout_in_seconds = timeout_in_seconds
self.response_status_code: ErrorCode | None = None
def create_knxipframe(self) -> KNXIPFrame:
"""Create KNX/IP Frame object to be sent to device."""
raise NotImplementedError("create_knxipframe has to be implemented")
async def start(self) -> None:
"""Start. Send request and wait for an answer."""
callb = self.transport.register_callback(
self.response_rec_callback, [self.awaited_response_class.SERVICE_TYPE]
)
try:
await self.send_request()
async with asyncio_timeout(self.timeout_in_seconds):
await self.response_received_event.wait()
except asyncio.TimeoutError:
logger.debug(
"Error: KNX bus did not respond in time (%s secs) to request of type '%s'",
self.timeout_in_seconds,
self.__class__.__name__,
)
except CommunicationError as err:
logger.warning(
"Sending request of type '%s' failed: %s", self.__class__.__name__, err
)
finally:
# cleanup to not leave callbacks (for asyncio.CancelledError)
self.transport.unregister_callback(callb)
async def send_request(self) -> None:
"""Build knxipframe (within derived class) and send via transport."""
self.transport.send(self.create_knxipframe())
def response_rec_callback(
self, knxipframe: KNXIPFrame, source: HPAI, _: KNXIPTransport
) -> None:
"""Verify and handle knxipframe. Callback from internal transport."""
if not isinstance(knxipframe.body, self.awaited_response_class):
logger.warning("Could not understand knxipframe")
return
self.response_received_event.set()
if isinstance(knxipframe.body, KNXIPBodyResponse):
self.response_status_code = knxipframe.body.status_code
if knxipframe.body.status_code != ErrorCode.E_NO_ERROR:
logger.debug(
"Error: KNX bus responded to request of type '%s' with error in '%s': %s",
self.__class__.__name__,
self.awaited_response_class.__name__,
knxipframe.body.status_code,
)
return
self.success = True
self.on_success_hook(knxipframe)
def on_success_hook(self, knxipframe: KNXIPFrame) -> None:
"""Do something after having received a valid answer. May be overwritten in derived class."""
|