1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136
|
"""
Device is the base class for all implemented devices (e.g. Lights/Switches/Sensors).
It provides basis functionality for reading the state from the KNX bus.
"""
from __future__ import annotations
from abc import ABC, abstractmethod
from collections.abc import Iterator
import logging
from typing import TYPE_CHECKING, Any
from xknx.remote_value import RemoteValue
from xknx.telegram import Telegram
from xknx.telegram.address import DeviceGroupAddress
from xknx.telegram.apci import GroupValueRead, GroupValueResponse, GroupValueWrite
from xknx.typing import DeviceCallbackType, Self
if TYPE_CHECKING:
from xknx.xknx import XKNX
logger = logging.getLogger("xknx.log")
class Device(ABC):
"""Base class for devices."""
def __init__(
self,
xknx: XKNX,
name: str,
device_updated_cb: DeviceCallbackType[Self] | None = None,
) -> None:
"""Initialize Device class."""
self.xknx = xknx
self.name = name
self.device_updated_cbs: list[DeviceCallbackType[Self]] = []
if device_updated_cb is not None:
self.register_device_updated_cb(device_updated_cb)
def register_state_updater(self) -> None:
"""Register device addresses for StateUpdater."""
for remote_value in self._iter_remote_values():
remote_value.register_state_updater()
def unregister_state_updater(self) -> None:
"""Unregister device addresses from StateUpdater."""
for remote_value in self._iter_remote_values():
remote_value.unregister_state_updater()
def async_start_tasks(self) -> None:
"""Start async background tasks of device."""
return
def async_remove_tasks(self) -> None:
"""Remove all tasks of device."""
return
@abstractmethod
def _iter_remote_values(self) -> Iterator[RemoteValue[Any]]:
"""Iterate the devices RemoteValue classes."""
# yield self.remote_value
# yield from (<list all used RemoteValue instances>)
yield from ()
def register_device_updated_cb(
self, device_updated_cb: DeviceCallbackType[Self]
) -> None:
"""Register device updated callback."""
self.device_updated_cbs.append(device_updated_cb)
def unregister_device_updated_cb(
self, device_updated_cb: DeviceCallbackType[Self]
) -> None:
"""Unregister device updated callback."""
if device_updated_cb in self.device_updated_cbs:
self.device_updated_cbs.remove(device_updated_cb)
def after_update(
self: Self,
*args: Any, # a single argument may be passed if used as a RemoteValue callback
) -> None:
"""Execute callbacks after internal state has been changed."""
for device_callback in self.device_updated_cbs:
try:
device_callback(self)
except Exception: # pylint: disable=broad-except
logger.exception(
"Unexpected error while processing device_updated_cb for %s",
self,
)
async def sync(self, wait_for_result: bool = False) -> None:
"""Read states of device from KNX bus."""
for remote_value in self._iter_remote_values():
await remote_value.read_state(wait_for_result=wait_for_result)
def process(self, telegram: Telegram) -> None:
"""Process incoming telegram."""
if isinstance(telegram.payload, GroupValueWrite):
self.process_group_write(telegram)
elif isinstance(telegram.payload, GroupValueResponse):
self.process_group_response(telegram)
elif isinstance(telegram.payload, GroupValueRead):
self.process_group_read(telegram)
def process_group_read(self, telegram: Telegram) -> None:
"""Process incoming GroupValueRead telegrams."""
# The default is, that devices don't answer to group reads
return
def process_group_response(self, telegram: Telegram) -> None:
"""Process incoming GroupValueResponse telegrams."""
# Per default mapped to group write.
self.process_group_write(telegram)
def process_group_write(self, telegram: Telegram) -> None:
"""Process incoming GroupValueWrite telegrams."""
# The default is, that devices don't process group writes
return
def get_name(self) -> str:
"""Return name of device."""
return self.name
def has_group_address(self, group_address: DeviceGroupAddress) -> bool:
"""Test if device has given group address."""
return any(
remote_value.has_group_address(group_address)
for remote_value in self._iter_remote_values()
)
def __eq__(self, other: object) -> bool:
"""Compare for quality."""
return self.__dict__ == other.__dict__
|