1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93
|
"""
Module for reading the value of a specific KNX group address from KNX bus.
The module will
* ... send a group_read to the selected group address.
* ... register a callback for receiving telegrams within telegram queue.
* ... check if received telegrams have the correct group address.
* ... store the received telegram for further processing.
"""
from __future__ import annotations
import asyncio
import logging
from typing import TYPE_CHECKING
from xknx.telegram import Telegram
from xknx.telegram.address import GroupAddress, InternalGroupAddress
from xknx.telegram.apci import GroupValueRead, GroupValueResponse, GroupValueWrite
from xknx.util import asyncio_timeout
if TYPE_CHECKING:
from xknx.xknx import XKNX
logger = logging.getLogger("xknx.log")
class ValueReader:
"""Class for reading the value of a specific KNX group address from KNX bus."""
__slots__ = (
"group_address",
"received_telegram",
"response_received_event",
"timeout_in_seconds",
"xknx",
)
def __init__(
self,
xknx: XKNX,
group_address: GroupAddress | InternalGroupAddress,
timeout_in_seconds: float = 2.0,
) -> None:
"""Initialize ValueReader class."""
self.xknx = xknx
self.group_address: GroupAddress | InternalGroupAddress = group_address
self.response_received_event = asyncio.Event()
self.timeout_in_seconds: float = timeout_in_seconds
self.received_telegram: Telegram | None = None
async def read(self) -> Telegram | None:
"""Send group read and wait for response."""
cb_obj = self.xknx.telegram_queue.register_telegram_received_cb(
self.telegram_received,
group_addresses=[self.group_address],
match_for_outgoing=True,
)
self.send_group_read()
try:
async with asyncio_timeout(self.timeout_in_seconds):
await self.response_received_event.wait()
except asyncio.TimeoutError:
logger.warning(
"Error: KNX bus did not respond in time (%s secs) to GroupValueRead request for: %s",
self.timeout_in_seconds,
self.group_address,
)
else:
return self.received_telegram
finally:
# cleanup to not leave callbacks (for asyncio.CancelledError)
self.xknx.telegram_queue.unregister_telegram_received_cb(cb_obj)
return None
def send_group_read(self) -> None:
"""Send group read."""
telegram = Telegram(
destination_address=self.group_address,
payload=GroupValueRead(),
source_address=self.xknx.current_address,
)
self.xknx.telegrams.put_nowait(telegram)
def telegram_received(self, telegram: Telegram) -> None:
"""Test if telegram has correct group address and trigger event."""
if telegram.destination_address == self.group_address and isinstance(
telegram.payload, GroupValueResponse | GroupValueWrite
):
self.received_telegram = telegram
self.response_received_event.set()
|