1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229
|
"""
Module for queueing telegrams addressed to group addresses.
When a device wants to send a telegram to the KNX bus, it has to queue it to the
TelegramQueue within XKNX. The telegram will be forwarded to the local CEMIHandler and
processed in xknx-Devices.
You may register callbacks to be notified if a telegram was pushed to the queue.
Telegrams addressed to IndividualAddresses are not processed by this queue.
"""
from __future__ import annotations
import asyncio
from collections.abc import Awaitable
import logging
from typing import TYPE_CHECKING
from xknx.exceptions import CommunicationError, XKNXException
from xknx.telegram import AddressFilter, Telegram, TelegramDirection
from xknx.telegram.address import GroupAddress, InternalGroupAddress
from xknx.typing import TelegramCallbackType
if TYPE_CHECKING:
from xknx.xknx import XKNX
logger = logging.getLogger("xknx.log")
telegram_logger = logging.getLogger("xknx.telegram")
class TelegramQueue:
"""Class for telegram queue."""
class Callback:
"""Callback class for handling telegram received callbacks."""
def __init__(
self,
callback: TelegramCallbackType,
address_filters: list[AddressFilter] | None = None,
group_addresses: list[GroupAddress | InternalGroupAddress] | None = None,
match_for_outgoing_telegrams: bool = False,
) -> None:
"""Initialize Callback class."""
self.callback = callback
self._match_all = address_filters is None and group_addresses is None
self._match_outgoing = match_for_outgoing_telegrams
self.address_filters = [] if address_filters is None else address_filters
self.group_addresses = [] if group_addresses is None else group_addresses
def is_within_filter(self, telegram: Telegram) -> bool:
"""Test if callback is filtering for group address."""
if (
not self._match_outgoing
and telegram.direction == TelegramDirection.OUTGOING
):
return False
if self._match_all:
return True
if isinstance(
telegram.destination_address, GroupAddress | InternalGroupAddress
):
for address_filter in self.address_filters:
if address_filter.match(telegram.destination_address):
return True
for group_address in self.group_addresses:
if telegram.destination_address == group_address:
return True
return False
def __init__(self, xknx: XKNX) -> None:
"""Initialize TelegramQueue class."""
self.xknx = xknx
self.telegram_received_cbs: list[TelegramQueue.Callback] = []
self.outgoing_queue: asyncio.Queue[Telegram | None] = asyncio.Queue()
self._consumer_task: Awaitable[tuple[None, None]] | None = None
self._rate_limiter: asyncio.Task[None] | None = None
def register_telegram_received_cb(
self,
telegram_received_cb: TelegramCallbackType,
address_filters: list[AddressFilter] | None = None,
group_addresses: list[GroupAddress | InternalGroupAddress] | None = None,
match_for_outgoing: bool = False,
) -> TelegramQueue.Callback:
"""Register callback for a telegram being received from KNX bus."""
callback = TelegramQueue.Callback(
telegram_received_cb,
address_filters=address_filters,
group_addresses=group_addresses,
match_for_outgoing_telegrams=match_for_outgoing,
)
self.telegram_received_cbs.append(callback)
return callback
def unregister_telegram_received_cb(
self, telegram_received_cb: TelegramQueue.Callback
) -> None:
"""Unregister callback for a telegram being received from KNX bus."""
self.telegram_received_cbs.remove(telegram_received_cb)
async def start(self) -> None:
"""Start telegram queue."""
self._consumer_task = asyncio.gather(
self._telegram_consumer(), self._outgoing_rate_limiter()
)
async def stop(self) -> None:
"""Stop telegram queue."""
logger.debug("Stopping TelegramQueue")
# If a None object is pushed to the queue, the queue stops
self.xknx.telegrams.put_nowait(None)
if self._consumer_task is not None:
await self._consumer_task
async def _telegram_consumer(self) -> None:
"""Endless loop for processing telegrams."""
while True:
telegram = await self.xknx.telegrams.get()
# Breaking up queue if None is pushed to the queue
if telegram is None:
self.outgoing_queue.put_nowait(None)
await self.outgoing_queue.join()
self.xknx.telegrams.task_done()
break
self.xknx.group_address_dpt.set_decoded_data(telegram)
if telegram.direction == TelegramDirection.INCOMING:
try:
await self.process_telegram_incoming(telegram)
except XKNXException:
logger.exception(
"Unexpected xknx error while processing incoming telegram %s",
telegram,
)
except Exception: # pylint: disable=broad-except
# prevent the parser Task from stalling when unexpected errors occur
logger.exception(
"Unexpected error while processing incoming telegram %s",
telegram,
)
finally:
self.xknx.telegrams.task_done()
elif telegram.direction == TelegramDirection.OUTGOING:
self.outgoing_queue.put_nowait(telegram)
# self.xknx.telegrams.task_done() for outgoing is called in _outgoing_rate_limiter.
async def _outgoing_rate_limiter(self) -> None:
"""Endless loop for processing outgoing telegrams."""
while True:
telegram = await self.outgoing_queue.get()
# Breaking up queue if None is pushed to the queue
if telegram is None:
self.outgoing_queue.task_done()
if self._rate_limiter:
self._rate_limiter.cancel()
break
# limit rate to knx bus - defaults to 20 per second
if self.xknx.rate_limit and not isinstance(
telegram.destination_address, InternalGroupAddress
):
if self._rate_limiter is not None:
await self._rate_limiter
self._rate_limiter = asyncio.create_task(
asyncio.sleep(1 / self.xknx.rate_limit)
)
try:
await self.process_telegram_outgoing(telegram)
except CommunicationError as ex:
if ex.should_log:
logger.warning(ex)
except XKNXException as ex:
logger.error("Error while processing outgoing telegram %s", ex)
except Exception: # pylint: disable=broad-except
# prevent the sender Task from stalling when unexpected errors occur (eg. ValueError from creating KNXIPFrames)
logger.exception(
"Unexpected error while processing outgoing telegram %s", telegram
)
finally:
self.outgoing_queue.task_done()
self.xknx.telegrams.task_done()
async def _process_all_telegrams(self) -> None:
"""Process all telegrams being queued. Used in unit tests."""
while not self.xknx.telegrams.empty():
try:
telegram = self.xknx.telegrams.get_nowait()
if telegram is None:
return
if telegram.direction == TelegramDirection.INCOMING:
await self.process_telegram_incoming(telegram)
elif telegram.direction == TelegramDirection.OUTGOING:
await self.process_telegram_outgoing(telegram)
except XKNXException as ex:
logger.error("Error while processing telegram %s", ex)
finally:
self.xknx.telegrams.task_done()
async def process_telegram_outgoing(self, telegram: Telegram) -> None:
"""Process outgoing telegram."""
telegram_logger.debug(telegram)
if not isinstance(telegram.destination_address, InternalGroupAddress):
# raises CommunicationError when interface is not connected
await self.xknx.cemi_handler.send_telegram(telegram)
self.xknx.devices.process(telegram)
self._run_telegram_received_cbs(telegram)
async def process_telegram_incoming(self, telegram: Telegram) -> None:
"""Process incoming telegram."""
telegram_logger.debug(telegram)
self._run_telegram_received_cbs(telegram)
self.xknx.devices.process(telegram)
def _run_telegram_received_cbs(self, telegram: Telegram) -> None:
"""Run registered callbacks. Don't propagate exceptions."""
for callback in self.telegram_received_cbs:
if not callback.is_within_filter(telegram):
continue
try:
callback.callback(telegram)
except Exception: # pylint: disable=broad-except
logger.exception(
"Unexpected error while processing telegram_received_cb for %s",
telegram,
)
|