File: event.py

package info (click to toggle)
zwave-js-server-python 0.67.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 1,820 kB
  • sloc: python: 15,886; sh: 21; javascript: 16; makefile: 2
file content (82 lines) | stat: -rw-r--r-- 2,368 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
"""Provide Event base classes for Z-Wave JS."""

from __future__ import annotations

from collections.abc import Callable
from dataclasses import dataclass, field
import logging
from typing import Literal

from pydantic import BaseModel

LOGGER = logging.getLogger(__package__)


class BaseEventModel(BaseModel):
    """Base model for an event."""

    source: Literal["controller", "driver", "node"]
    event: str

    @classmethod
    def from_dict(cls, data: dict) -> BaseEventModel:
        """Initialize from dict."""
        return cls(
            source=data["source"],
            event=data["event"],
        )


@dataclass
class Event:
    """Represent an event."""

    type: str
    data: dict = field(default_factory=dict)


class EventBase:
    """Represent a Z-Wave JS base class for event handling models."""

    def __init__(self) -> None:
        """Initialize event base."""
        self._listeners: dict[str, list[Callable]] = {}

    def on(self, event_name: str, callback: Callable) -> Callable:
        """Register an event callback."""
        listeners: list = self._listeners.setdefault(event_name, [])
        listeners.append(callback)

        def unsubscribe() -> None:
            """Unsubscribe listeners."""
            if callback in listeners:
                listeners.remove(callback)

        return unsubscribe

    def once(self, event_name: str, callback: Callable) -> Callable:
        """Listen for an event exactly once."""

        def event_listener(data: dict) -> None:
            unsub()
            callback(data)

        unsub = self.on(event_name, event_listener)

        return unsub

    def emit(self, event_name: str, data: dict) -> None:
        """Run all callbacks for an event."""
        for listener in self._listeners.get(event_name, []).copy():
            try:
                listener(data)
            except Exception:  # pylint: disable=broad-exception-caught
                LOGGER.exception("Error handling event: %s", event_name)

    def _handle_event_protocol(self, event: Event) -> None:
        """Process an event based on event protocol."""
        handler = getattr(self, f"handle_{event.type.replace(' ', '_')}", None)
        if handler is None:
            LOGGER.debug("Received unknown event: %s", event)
            return
        handler(event)  # pylint: disable=not-callable