1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110
|
"""
Module for handling a vector/array of devices.
More or less an array with devices. Adds some search functionality to find devices.
"""
from __future__ import annotations
import asyncio
from collections.abc import Iterator
from xknx.telegram import Telegram
from xknx.telegram.address import DeviceGroupAddress, GroupAddress, InternalGroupAddress
from xknx.typing import DeviceCallbackType
from .device import Device
class Devices:
"""Class for handling a vector/array of devices."""
def __init__(self, started: asyncio.Event) -> None:
"""Initialize Devices class."""
self.started = started # xknx.started
self.__devices: list[Device] = []
self.device_updated_cbs: list[DeviceCallbackType[Device]] = []
def async_start_device_tasks(self) -> None:
"""Start all devices tasks."""
for device in self.__devices:
device.async_start_tasks()
def async_remove_device_tasks(self) -> None:
"""Remove all devices tasks."""
for device in self.__devices:
device.async_remove_tasks()
def register_device_updated_cb(
self, device_updated_cb: DeviceCallbackType[Device]
) -> None:
"""Register callback for devices being updated."""
self.device_updated_cbs.append(device_updated_cb)
def unregister_device_updated_cb(
self, device_updated_cb: DeviceCallbackType[Device]
) -> None:
"""Unregister callback for devices being updated."""
self.device_updated_cbs.remove(device_updated_cb)
def __iter__(self) -> Iterator[Device]:
"""Iterate registered devices."""
yield from self.__devices
def devices_by_group_address(
self, group_address: DeviceGroupAddress
) -> Iterator[Device]:
"""Return device(s) by group address."""
for device in self.__devices:
if device.has_group_address(group_address):
yield device
def __getitem__(self, key: str | int) -> Device:
"""Return device by name or by index."""
for device in self.__devices:
if device.name == key:
return device
if isinstance(key, int):
return self.__devices[key]
raise KeyError
def __len__(self) -> int:
"""Return number of devices within vector."""
return len(self.__devices)
def __contains__(self, key: str) -> bool:
"""Return if devices with name 'key' is within devices."""
return any(device.name == key for device in self.__devices)
def async_add(self, device: Device) -> None:
"""Add device to active XKNX devices."""
device.register_device_updated_cb(self.device_updated)
self.__devices.append(device)
device.register_state_updater()
if self.started.is_set():
# start if device was added after async_start_device_tasks() / xknx.start()
device.async_start_tasks()
def async_remove(self, device: Device) -> None:
"""Remove device from XKNX devices."""
device.async_remove_tasks()
device.unregister_state_updater()
device.unregister_device_updated_cb(self.device_updated)
self.__devices.remove(device)
def device_updated(self, device: Device) -> None:
"""Call all registered device updated callbacks of device."""
for device_updated_cb in self.device_updated_cbs:
device_updated_cb(device)
def process(self, telegram: Telegram) -> None:
"""Process telegram."""
if isinstance(
telegram.destination_address, GroupAddress | InternalGroupAddress
):
for device in self.devices_by_group_address(telegram.destination_address):
device.process(telegram)
async def sync(self) -> None:
"""Read state of devices from KNX bus."""
await asyncio.gather(*[device.sync() for device in self.__devices])
|