File: telegram_queue.py

package info (click to toggle)
python-xknx 3.6.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 4,012 kB
  • sloc: python: 39,710; javascript: 8,556; makefile: 27; sh: 12
file content (229 lines) | stat: -rw-r--r-- 9,741 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
"""
Module for queueing telegrams addressed to group addresses.

When a device wants to send a telegram to the KNX bus, it has to queue it to the
TelegramQueue within XKNX. The telegram will be forwarded to the local CEMIHandler and
processed in xknx-Devices.
You may register callbacks to be notified if a telegram was pushed to the queue.

Telegrams addressed to IndividualAddresses are not processed by this queue.
"""

from __future__ import annotations

import asyncio
from collections.abc import Awaitable
import logging
from typing import TYPE_CHECKING

from xknx.exceptions import CommunicationError, XKNXException
from xknx.telegram import AddressFilter, Telegram, TelegramDirection
from xknx.telegram.address import GroupAddress, InternalGroupAddress
from xknx.typing import TelegramCallbackType

if TYPE_CHECKING:
    from xknx.xknx import XKNX

logger = logging.getLogger("xknx.log")
telegram_logger = logging.getLogger("xknx.telegram")


class TelegramQueue:
    """Class for telegram queue."""

    class Callback:
        """Callback class for handling telegram received callbacks."""

        def __init__(
            self,
            callback: TelegramCallbackType,
            address_filters: list[AddressFilter] | None = None,
            group_addresses: list[GroupAddress | InternalGroupAddress] | None = None,
            match_for_outgoing_telegrams: bool = False,
        ) -> None:
            """Initialize Callback class."""
            self.callback = callback
            self._match_all = address_filters is None and group_addresses is None
            self._match_outgoing = match_for_outgoing_telegrams
            self.address_filters = [] if address_filters is None else address_filters
            self.group_addresses = [] if group_addresses is None else group_addresses

        def is_within_filter(self, telegram: Telegram) -> bool:
            """Test if callback is filtering for group address."""
            if (
                not self._match_outgoing
                and telegram.direction == TelegramDirection.OUTGOING
            ):
                return False
            if self._match_all:
                return True
            if isinstance(
                telegram.destination_address, GroupAddress | InternalGroupAddress
            ):
                for address_filter in self.address_filters:
                    if address_filter.match(telegram.destination_address):
                        return True
                for group_address in self.group_addresses:
                    if telegram.destination_address == group_address:
                        return True
            return False

    def __init__(self, xknx: XKNX) -> None:
        """Initialize TelegramQueue class."""
        self.xknx = xknx
        self.telegram_received_cbs: list[TelegramQueue.Callback] = []
        self.outgoing_queue: asyncio.Queue[Telegram | None] = asyncio.Queue()
        self._consumer_task: Awaitable[tuple[None, None]] | None = None
        self._rate_limiter: asyncio.Task[None] | None = None

    def register_telegram_received_cb(
        self,
        telegram_received_cb: TelegramCallbackType,
        address_filters: list[AddressFilter] | None = None,
        group_addresses: list[GroupAddress | InternalGroupAddress] | None = None,
        match_for_outgoing: bool = False,
    ) -> TelegramQueue.Callback:
        """Register callback for a telegram being received from KNX bus."""
        callback = TelegramQueue.Callback(
            telegram_received_cb,
            address_filters=address_filters,
            group_addresses=group_addresses,
            match_for_outgoing_telegrams=match_for_outgoing,
        )
        self.telegram_received_cbs.append(callback)
        return callback

    def unregister_telegram_received_cb(
        self, telegram_received_cb: TelegramQueue.Callback
    ) -> None:
        """Unregister callback for a telegram being received from KNX bus."""
        self.telegram_received_cbs.remove(telegram_received_cb)

    async def start(self) -> None:
        """Start telegram queue."""
        self._consumer_task = asyncio.gather(
            self._telegram_consumer(), self._outgoing_rate_limiter()
        )

    async def stop(self) -> None:
        """Stop telegram queue."""
        logger.debug("Stopping TelegramQueue")
        # If a None object is pushed to the queue, the queue stops
        self.xknx.telegrams.put_nowait(None)
        if self._consumer_task is not None:
            await self._consumer_task

    async def _telegram_consumer(self) -> None:
        """Endless loop for processing telegrams."""
        while True:
            telegram = await self.xknx.telegrams.get()
            # Breaking up queue if None is pushed to the queue
            if telegram is None:
                self.outgoing_queue.put_nowait(None)
                await self.outgoing_queue.join()
                self.xknx.telegrams.task_done()
                break

            self.xknx.group_address_dpt.set_decoded_data(telegram)

            if telegram.direction == TelegramDirection.INCOMING:
                try:
                    await self.process_telegram_incoming(telegram)
                except XKNXException:
                    logger.exception(
                        "Unexpected xknx error while processing incoming telegram %s",
                        telegram,
                    )
                except Exception:  # pylint: disable=broad-except
                    # prevent the parser Task from stalling when unexpected errors occur
                    logger.exception(
                        "Unexpected error while processing incoming telegram %s",
                        telegram,
                    )
                finally:
                    self.xknx.telegrams.task_done()
            elif telegram.direction == TelegramDirection.OUTGOING:
                self.outgoing_queue.put_nowait(telegram)
                # self.xknx.telegrams.task_done() for outgoing is called in _outgoing_rate_limiter.

    async def _outgoing_rate_limiter(self) -> None:
        """Endless loop for processing outgoing telegrams."""
        while True:
            telegram = await self.outgoing_queue.get()
            # Breaking up queue if None is pushed to the queue
            if telegram is None:
                self.outgoing_queue.task_done()
                if self._rate_limiter:
                    self._rate_limiter.cancel()
                break

            # limit rate to knx bus - defaults to 20 per second
            if self.xknx.rate_limit and not isinstance(
                telegram.destination_address, InternalGroupAddress
            ):
                if self._rate_limiter is not None:
                    await self._rate_limiter
                self._rate_limiter = asyncio.create_task(
                    asyncio.sleep(1 / self.xknx.rate_limit)
                )

            try:
                await self.process_telegram_outgoing(telegram)
            except CommunicationError as ex:
                if ex.should_log:
                    logger.warning(ex)
            except XKNXException as ex:
                logger.error("Error while processing outgoing telegram %s", ex)
            except Exception:  # pylint: disable=broad-except
                # prevent the sender Task from stalling when unexpected errors occur (eg. ValueError from creating KNXIPFrames)
                logger.exception(
                    "Unexpected error while processing outgoing telegram %s", telegram
                )
            finally:
                self.outgoing_queue.task_done()
                self.xknx.telegrams.task_done()

    async def _process_all_telegrams(self) -> None:
        """Process all telegrams being queued. Used in unit tests."""
        while not self.xknx.telegrams.empty():
            try:
                telegram = self.xknx.telegrams.get_nowait()
                if telegram is None:
                    return
                if telegram.direction == TelegramDirection.INCOMING:
                    await self.process_telegram_incoming(telegram)
                elif telegram.direction == TelegramDirection.OUTGOING:
                    await self.process_telegram_outgoing(telegram)
            except XKNXException as ex:
                logger.error("Error while processing telegram %s", ex)
            finally:
                self.xknx.telegrams.task_done()

    async def process_telegram_outgoing(self, telegram: Telegram) -> None:
        """Process outgoing telegram."""
        telegram_logger.debug(telegram)
        if not isinstance(telegram.destination_address, InternalGroupAddress):
            # raises CommunicationError when interface is not connected
            await self.xknx.cemi_handler.send_telegram(telegram)

        self.xknx.devices.process(telegram)
        self._run_telegram_received_cbs(telegram)

    async def process_telegram_incoming(self, telegram: Telegram) -> None:
        """Process incoming telegram."""
        telegram_logger.debug(telegram)
        self._run_telegram_received_cbs(telegram)
        self.xknx.devices.process(telegram)

    def _run_telegram_received_cbs(self, telegram: Telegram) -> None:
        """Run registered callbacks. Don't propagate exceptions."""
        for callback in self.telegram_received_cbs:
            if not callback.is_within_filter(telegram):
                continue
            try:
                callback.callback(telegram)
            except Exception:  # pylint: disable=broad-except
                logger.exception(
                    "Unexpected error while processing telegram_received_cb for %s",
                    telegram,
                )