File: telegram_queue.py

package info (click to toggle)
python-xknx 3.14.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 4,064 kB
  • sloc: python: 40,895; javascript: 8,556; makefile: 32; sh: 12
file content (285 lines) | stat: -rw-r--r-- 12,013 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
"""
Module for queueing telegrams addressed to group addresses.

When a device wants to send a telegram to the KNX bus, it has to queue it to the
TelegramQueue within XKNX. The telegram will be forwarded to the local CEMIHandler and
processed in xknx-Devices.
You may register callbacks to be notified if a telegram was pushed to the queue.

Telegrams addressed to IndividualAddresses are not processed by this queue.
"""

from __future__ import annotations

import asyncio
from collections.abc import Awaitable, Callable
from functools import partial
import logging
from typing import TYPE_CHECKING

from xknx.exceptions import CommunicationError, XKNXException
from xknx.telegram import AddressFilter, Telegram, TelegramDirection
from xknx.telegram.address import GroupAddress, InternalGroupAddress
from xknx.typing import TelegramCallbackType

if TYPE_CHECKING:
    from xknx.xknx import XKNX

logger = logging.getLogger("xknx.log")
telegram_logger = logging.getLogger("xknx.telegram")


class TelegramQueue:
    """Class for telegram queue."""

    class Callback:
        """Callback class for handling telegram received callbacks."""

        __slots__ = (
            "_match_all",
            "_match_outgoing",
            "address_filters",
            "callback",
            "group_addresses",
        )

        def __init__(
            self,
            callback: TelegramCallbackType,
            address_filters: list[AddressFilter] | None = None,
            group_addresses: list[GroupAddress | InternalGroupAddress] | None = None,
            match_for_outgoing_telegrams: bool = False,
        ) -> None:
            """Initialize Callback class."""
            self.callback = callback
            self._match_all = address_filters is None and group_addresses is None
            self._match_outgoing = match_for_outgoing_telegrams
            self.address_filters = [] if address_filters is None else address_filters
            self.group_addresses = [] if group_addresses is None else group_addresses

        def is_within_filter(self, telegram: Telegram) -> bool:
            """Test if callback is filtering for group address."""
            if (
                not self._match_outgoing
                and telegram.direction == TelegramDirection.OUTGOING
            ):
                return False
            if self._match_all:
                return True
            if isinstance(
                telegram.destination_address, GroupAddress | InternalGroupAddress
            ):
                for address_filter in self.address_filters:
                    if address_filter.match(telegram.destination_address):
                        return True
                for group_address in self.group_addresses:
                    if telegram.destination_address == group_address:
                        return True
            return False

    __slots__ = (
        "_consumer_task",
        "_data_secure_group_key_issue_cbs",
        "_rate_limiter",
        "outgoing_queue",
        "telegram_received_cbs",
        "xknx",
    )

    def __init__(self, xknx: XKNX) -> None:
        """Initialize TelegramQueue class."""
        self.xknx = xknx
        self.telegram_received_cbs: list[TelegramQueue.Callback] = []
        self._data_secure_group_key_issue_cbs: list[TelegramCallbackType] = []

        self.outgoing_queue: asyncio.Queue[Telegram | None] = asyncio.Queue()
        self._consumer_task: Awaitable[tuple[None, None]] | None = None
        self._rate_limiter: asyncio.Task[None] | None = None

    def register_telegram_received_cb(
        self,
        telegram_received_cb: TelegramCallbackType,
        address_filters: list[AddressFilter] | None = None,
        group_addresses: list[GroupAddress | InternalGroupAddress] | None = None,
        match_for_outgoing: bool = False,
    ) -> TelegramQueue.Callback:
        """Register callback for a telegram being received from KNX bus."""
        callback = TelegramQueue.Callback(
            telegram_received_cb,
            address_filters=address_filters,
            group_addresses=group_addresses,
            match_for_outgoing_telegrams=match_for_outgoing,
        )
        self.telegram_received_cbs.append(callback)
        return callback

    def unregister_telegram_received_cb(
        self, telegram_received_cb: TelegramQueue.Callback
    ) -> None:
        """Unregister callback for a telegram being received from KNX bus."""
        self.telegram_received_cbs.remove(telegram_received_cb)

    async def start(self) -> None:
        """Start telegram queue."""
        self._consumer_task = asyncio.gather(
            self._telegram_consumer(), self._outgoing_rate_limiter()
        )

    async def stop(self) -> None:
        """Stop telegram queue."""
        logger.debug("Stopping TelegramQueue")
        # If a None object is pushed to the queue, the queue stops
        self.xknx.telegrams.put_nowait(None)
        if self._consumer_task is not None:
            await self._consumer_task

    async def _telegram_consumer(self) -> None:
        """Endless loop for processing telegrams."""
        while True:
            telegram = await self.xknx.telegrams.get()
            # Breaking up queue if None is pushed to the queue
            if telegram is None:
                self.outgoing_queue.put_nowait(None)
                await self.outgoing_queue.join()
                self.xknx.telegrams.task_done()
                break

            self.xknx.group_address_dpt.set_decoded_data(telegram)

            if telegram.direction == TelegramDirection.INCOMING:
                try:
                    await self.process_telegram_incoming(telegram)
                except XKNXException:
                    logger.exception(
                        "Unexpected xknx error while processing incoming telegram %s",
                        telegram,
                    )
                except Exception:  # pylint: disable=broad-except
                    # prevent the parser Task from stalling when unexpected errors occur
                    logger.exception(
                        "Unexpected error while processing incoming telegram %s",
                        telegram,
                    )
                finally:
                    self.xknx.telegrams.task_done()
            elif telegram.direction == TelegramDirection.OUTGOING:
                self.outgoing_queue.put_nowait(telegram)
                # self.xknx.telegrams.task_done() for outgoing is called in _outgoing_rate_limiter.

    async def _outgoing_rate_limiter(self) -> None:
        """Endless loop for processing outgoing telegrams."""
        while True:
            telegram = await self.outgoing_queue.get()
            # Breaking up queue if None is pushed to the queue
            if telegram is None:
                self.outgoing_queue.task_done()
                if self._rate_limiter:
                    self._rate_limiter.cancel()
                break

            # limit rate to knx bus - defaults to 20 per second
            if self.xknx.rate_limit and not isinstance(
                telegram.destination_address, InternalGroupAddress
            ):
                if self._rate_limiter is not None:
                    await self._rate_limiter
                self._rate_limiter = asyncio.create_task(
                    asyncio.sleep(1 / self.xknx.rate_limit)
                )

            try:
                await self.process_telegram_outgoing(telegram)
            except CommunicationError as ex:
                if ex.should_log:
                    logger.warning(ex)
            except XKNXException as ex:
                logger.error("Error while processing outgoing telegram %s", ex)
            except Exception:  # pylint: disable=broad-except
                # prevent the sender Task from stalling when unexpected errors occur (eg. ValueError from creating KNXIPFrames)
                logger.exception(
                    "Unexpected error while processing outgoing telegram %s", telegram
                )
            finally:
                self.outgoing_queue.task_done()
                self.xknx.telegrams.task_done()

    async def _process_all_telegrams(self) -> None:
        """Process all telegrams being queued. Used in unit tests."""
        while not self.xknx.telegrams.empty():
            try:
                telegram = self.xknx.telegrams.get_nowait()
                if telegram is None:
                    return
                if telegram.direction == TelegramDirection.INCOMING:
                    await self.process_telegram_incoming(telegram)
                elif telegram.direction == TelegramDirection.OUTGOING:
                    await self.process_telegram_outgoing(telegram)
            except XKNXException as ex:
                logger.error("Error while processing telegram %s", ex)
            finally:
                self.xknx.telegrams.task_done()

    async def process_telegram_outgoing(self, telegram: Telegram) -> None:
        """Process outgoing telegram."""
        telegram_logger.debug(telegram)
        if not isinstance(telegram.destination_address, InternalGroupAddress):
            # raises CommunicationError when interface is not connected
            await self.xknx.cemi_handler.send_telegram(telegram)

        self.xknx.devices.process(telegram)
        self._run_telegram_received_cbs(telegram)

    async def process_telegram_incoming(self, telegram: Telegram) -> None:
        """Process incoming telegram."""
        telegram_logger.debug(telegram)
        self._run_telegram_received_cbs(telegram)
        self.xknx.devices.process(telegram)

    def _run_telegram_received_cbs(self, telegram: Telegram) -> None:
        """Run registered callbacks. Don't propagate exceptions."""
        for callback in self.telegram_received_cbs:
            if not callback.is_within_filter(telegram):
                continue
            try:
                callback.callback(telegram)
            except Exception:  # pylint: disable=broad-except
                logger.exception(
                    "Unexpected error while processing telegram_received_cb for %s",
                    telegram,
                )

    # DataSecure Telegrams that could not be decrypted due to missing or invalid keys
    # are handled separately so they don't interfere with normal telegram processing.
    # This is not going through xknx.telegrams, but here for convenient callback management.
    def register_data_secure_group_key_issue_cb(
        self, data_secure_group_key_issue_cb: TelegramCallbackType
    ) -> Callable[[], None]:
        """
        Register callback for data secure group key issues. Returns unregister function.

        Only Group telegrams with DataSecure issues are forwarded to the callbacks.
        """
        self._data_secure_group_key_issue_cbs.append(data_secure_group_key_issue_cb)
        return partial(
            self.unregister_data_secure_group_key_issue_cb,
            data_secure_group_key_issue_cb,
        )

    def unregister_data_secure_group_key_issue_cb(
        self, data_secure_group_key_issue_cb: TelegramCallbackType
    ) -> None:
        """Unregister callback for data secure group key issues."""
        if data_secure_group_key_issue_cb in self._data_secure_group_key_issue_cbs:
            self._data_secure_group_key_issue_cbs.remove(data_secure_group_key_issue_cb)

    def received_data_secure_group_key_issue(self, telegram: Telegram) -> None:
        """
        Run registered callbacks for data secure group key issues.

        Only TDataGroup telegrams with undecodable DataSecure payloads are forwarded.
        """
        for data_secure_group_key_issue_cb in self._data_secure_group_key_issue_cbs:
            try:
                data_secure_group_key_issue_cb(telegram)
            except Exception as e:  # pylint: disable=broad-except
                logger.exception("Error in data secure group key issue callback: %s", e)