File: messages.py

package info (click to toggle)
python-aiounifi 79-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 660 kB
  • sloc: python: 11,124; sh: 5; makefile: 5
file content (89 lines) | stat: -rw-r--r-- 2,704 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
"""Manage events from UniFi Network Controller."""

from __future__ import annotations

from collections.abc import Callable
import logging
from typing import TYPE_CHECKING, Any

import orjson

from ..models.message import Message, MessageKey

if TYPE_CHECKING:
    from ..controller import Controller

LOGGER = logging.getLogger(__name__)


SubscriptionCallback = Callable[[Message], None]
SubscriptionType = tuple[SubscriptionCallback, tuple[MessageKey, ...] | None]
UnsubscribeType = Callable[[], None]


class MessageHandler:
    """Message handler class."""

    def __init__(self, controller: Controller) -> None:
        """Initialize message handler class."""
        self.controller = controller
        self._subscribers: list[SubscriptionType] = []
        self._subscribed_messages: set[MessageKey] = set()

    def subscribe(
        self,
        callback: SubscriptionCallback,
        message_filter: tuple[MessageKey, ...] | MessageKey | None = None,
    ) -> UnsubscribeType:
        """Subscribe to messages.

        "callback" - callback function to call when on event.
        Return function to unsubscribe.
        """
        if isinstance(message_filter, MessageKey):
            message_filter = (message_filter,)

        if message_filter is not None:
            self._subscribed_messages.update(message_filter)

        subscription = (callback, message_filter)
        self._subscribers.append(subscription)

        def unsubscribe() -> None:
            self._subscribers.remove(subscription)

        return unsubscribe

    def new_data(self, raw_bytes: bytes) -> None:
        """Convert bytes data into parseable JSON data.."""
        try:
            self.handler(orjson.loads(raw_bytes))
        except orjson.JSONDecodeError:
            LOGGER.debug("Bad JSON data '%s'", raw_bytes)

    def handler(self, raw: dict[str, Any]) -> None:
        """Process data and identify where the message belongs."""
        if "meta" not in raw or "data" not in raw:
            return

        for raw_data in raw["data"]:
            data = Message.from_dict(
                {
                    "meta": raw["meta"],
                    "data": raw_data,
                }
            )
            if data.meta.message not in self._subscribed_messages:
                break

            for callback, message_filter in self._subscribers:
                if (
                    message_filter is not None
                    and data.meta.message not in message_filter
                ):
                    continue
                callback(data)

    def __len__(self) -> int:
        """List number of message subscribers."""
        return len(self._subscribers)