File: message.py

package info (click to toggle)
python-aiounifi 79-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 660 kB
  • sloc: python: 11,124; sh: 5; makefile: 5
file content (100 lines) | stat: -rw-r--r-- 2,858 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
"""Messages from websocket."""

from __future__ import annotations

from dataclasses import dataclass
import enum
import logging
from typing import Any, Self

LOGGER = logging.getLogger(__name__)


class MessageKey(enum.Enum):
    """Message as part of meta object of event.

    "meta": {"rc": "ok", "message": "device:sync"}.
    """

    ALERT = "alert"
    EVENT = "events"

    CLIENT = "sta:sync"
    CLIENT_REMOVED = "user:delete"
    CLIENT_UPDATED = "user:sync"
    DEVICE = "device:sync"
    DEVICE_ADD = "device:add"
    DEVICE_UPDATE = "device:update"
    DPI_APP_ADDED = "dpiapp:add"
    DPI_APP_REMOVED = "dpiapp:delete"
    DPI_APP_UPDATED = "dpiapp:sync"
    DPI_GROUP_ADDED = "dpigroup:add"
    DPI_GROUP_REMOVED = "dpigroup:delete"
    DPI_GROUP_UPDATED = "dpigroup:sync"
    FIREWALL_RULE_ADDED = "firewallrule:add"
    FIREWALL_RULE_UPDATED = "firewallrule:sync"
    NETWORK_CONF_UPDATED = "networkconf:sync"
    NOTIFICATION_TOAST = "notification-toast"
    PING_TEST_UPDATE = "ping-test:update"
    PORT_FORWARD_ADDED = "portforward:add"
    PORT_FORWARD_UPDATED = "portforward:sync"
    PORT_FORWARD_DELETED = "portforward:delete"
    SCHEDULE_TASK_ADDED = "scheduletask:add"
    SESSION_METADATA = "session-metadata:sync"
    SETTING_UPDATED = "setting:sync"
    SETUP_SYNC = "setup:sync"
    SPEED_TEST_UPDATE = "speed-test:update"
    UNIFI_DEVICE = "unifi-device:sync"
    UNIFI_DEVICE_ADD = "unifi-device:add"
    USER_GROUP_UPDATED = "usergroup:sync"
    VPN_CONNECT = "vpn:connect"
    VPN_DISCONNECT = "vpn:disconnect"
    VPN_CONNECTION_UPDATED = "vpn-connection:sync"
    WLAN_CONF_ADDED = "wlanconf:add"
    WLAN_CONF_UPDATED = "wlanconf:sync"
    WLAN_CONF_DELETED = "wlanconf:delete"

    UNKNOWN = "unknown"

    @classmethod
    def _missing_(cls, value: object) -> MessageKey:
        """Set default enum member if an unknown value is provided."""
        LOGGER.warning("Unsupported message key %s", value)
        return MessageKey.UNKNOWN


@dataclass
class Meta:
    """Meta description of UniFi websocket data."""

    rc: str
    message: MessageKey
    data: dict[str, Any]

    @classmethod
    def from_dict(cls, data: dict[str, Any]) -> Self:
        """Create meta instance from dict."""
        return cls(
            rc=data.get("rc", ""),
            message=MessageKey(data.get("message", "")),
            data=data,
        )


@dataclass
class Message:
    """Websocket package representation."""

    meta: Meta
    data: dict[str, Any]

    @classmethod
    def from_dict(cls, data: dict[str, Any]) -> Self:
        """Create data container instance from dict."""
        meta = Meta.from_dict(data["meta"])
        if meta.message is MessageKey.UNKNOWN:
            LOGGER.warning("Unsupported message %s", data)
        return cls(
            meta=meta,
            data=data["data"],
        )