File: device.py

package info (click to toggle)
python-xknx 3.6.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 4,012 kB
  • sloc: python: 39,710; javascript: 8,556; makefile: 27; sh: 12
file content (136 lines) | stat: -rw-r--r-- 4,884 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
"""
Device is the base class for all implemented devices (e.g. Lights/Switches/Sensors).

It provides basis functionality for reading the state from the KNX bus.
"""

from __future__ import annotations

from abc import ABC, abstractmethod
from collections.abc import Iterator
import logging
from typing import TYPE_CHECKING, Any

from xknx.remote_value import RemoteValue
from xknx.telegram import Telegram
from xknx.telegram.address import DeviceGroupAddress
from xknx.telegram.apci import GroupValueRead, GroupValueResponse, GroupValueWrite
from xknx.typing import DeviceCallbackType, Self

if TYPE_CHECKING:
    from xknx.xknx import XKNX

logger = logging.getLogger("xknx.log")


class Device(ABC):
    """Base class for devices."""

    def __init__(
        self,
        xknx: XKNX,
        name: str,
        device_updated_cb: DeviceCallbackType[Self] | None = None,
    ) -> None:
        """Initialize Device class."""
        self.xknx = xknx
        self.name = name
        self.device_updated_cbs: list[DeviceCallbackType[Self]] = []
        if device_updated_cb is not None:
            self.register_device_updated_cb(device_updated_cb)

    def register_state_updater(self) -> None:
        """Register device addresses for StateUpdater."""
        for remote_value in self._iter_remote_values():
            remote_value.register_state_updater()

    def unregister_state_updater(self) -> None:
        """Unregister device addresses from StateUpdater."""
        for remote_value in self._iter_remote_values():
            remote_value.unregister_state_updater()

    def async_start_tasks(self) -> None:
        """Start async background tasks of device."""
        return

    def async_remove_tasks(self) -> None:
        """Remove all tasks of device."""
        return

    @abstractmethod
    def _iter_remote_values(self) -> Iterator[RemoteValue[Any]]:
        """Iterate the devices RemoteValue classes."""
        # yield self.remote_value
        # yield from (<list all used RemoteValue instances>)
        yield from ()

    def register_device_updated_cb(
        self, device_updated_cb: DeviceCallbackType[Self]
    ) -> None:
        """Register device updated callback."""
        self.device_updated_cbs.append(device_updated_cb)

    def unregister_device_updated_cb(
        self, device_updated_cb: DeviceCallbackType[Self]
    ) -> None:
        """Unregister device updated callback."""
        if device_updated_cb in self.device_updated_cbs:
            self.device_updated_cbs.remove(device_updated_cb)

    def after_update(
        self: Self,
        *args: Any,  # a single argument may be passed if used as a RemoteValue callback
    ) -> None:
        """Execute callbacks after internal state has been changed."""
        for device_callback in self.device_updated_cbs:
            try:
                device_callback(self)
            except Exception:  # pylint: disable=broad-except
                logger.exception(
                    "Unexpected error while processing device_updated_cb for %s",
                    self,
                )

    async def sync(self, wait_for_result: bool = False) -> None:
        """Read states of device from KNX bus."""
        for remote_value in self._iter_remote_values():
            await remote_value.read_state(wait_for_result=wait_for_result)

    def process(self, telegram: Telegram) -> None:
        """Process incoming telegram."""
        if isinstance(telegram.payload, GroupValueWrite):
            self.process_group_write(telegram)
        elif isinstance(telegram.payload, GroupValueResponse):
            self.process_group_response(telegram)
        elif isinstance(telegram.payload, GroupValueRead):
            self.process_group_read(telegram)

    def process_group_read(self, telegram: Telegram) -> None:
        """Process incoming GroupValueRead telegrams."""
        # The default is, that devices don't answer to group reads
        return

    def process_group_response(self, telegram: Telegram) -> None:
        """Process incoming GroupValueResponse telegrams."""
        # Per default mapped to group write.
        self.process_group_write(telegram)

    def process_group_write(self, telegram: Telegram) -> None:
        """Process incoming GroupValueWrite telegrams."""
        # The default is, that devices don't process group writes
        return

    def get_name(self) -> str:
        """Return name of device."""
        return self.name

    def has_group_address(self, group_address: DeviceGroupAddress) -> bool:
        """Test if device has given group address."""
        return any(
            remote_value.has_group_address(group_address)
            for remote_value in self._iter_remote_values()
        )

    def __eq__(self, other: object) -> bool:
        """Compare for quality."""
        return self.__dict__ == other.__dict__