File: devices.py

package info (click to toggle)
python-xknx 3.6.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 4,012 kB
  • sloc: python: 39,710; javascript: 8,556; makefile: 27; sh: 12
file content (110 lines) | stat: -rw-r--r-- 3,944 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
"""
Module for handling a vector/array of devices.

More or less an array with devices. Adds some search functionality to find devices.
"""

from __future__ import annotations

import asyncio
from collections.abc import Iterator

from xknx.telegram import Telegram
from xknx.telegram.address import DeviceGroupAddress, GroupAddress, InternalGroupAddress
from xknx.typing import DeviceCallbackType

from .device import Device


class Devices:
    """Class for handling a vector/array of devices."""

    def __init__(self, started: asyncio.Event) -> None:
        """Initialize Devices class."""
        self.started = started  # xknx.started
        self.__devices: list[Device] = []
        self.device_updated_cbs: list[DeviceCallbackType[Device]] = []

    def async_start_device_tasks(self) -> None:
        """Start all devices tasks."""
        for device in self.__devices:
            device.async_start_tasks()

    def async_remove_device_tasks(self) -> None:
        """Remove all devices tasks."""
        for device in self.__devices:
            device.async_remove_tasks()

    def register_device_updated_cb(
        self, device_updated_cb: DeviceCallbackType[Device]
    ) -> None:
        """Register callback for devices being updated."""
        self.device_updated_cbs.append(device_updated_cb)

    def unregister_device_updated_cb(
        self, device_updated_cb: DeviceCallbackType[Device]
    ) -> None:
        """Unregister callback for devices being updated."""
        self.device_updated_cbs.remove(device_updated_cb)

    def __iter__(self) -> Iterator[Device]:
        """Iterate registered devices."""
        yield from self.__devices

    def devices_by_group_address(
        self, group_address: DeviceGroupAddress
    ) -> Iterator[Device]:
        """Return device(s) by group address."""
        for device in self.__devices:
            if device.has_group_address(group_address):
                yield device

    def __getitem__(self, key: str | int) -> Device:
        """Return device by name or by index."""
        for device in self.__devices:
            if device.name == key:
                return device
        if isinstance(key, int):
            return self.__devices[key]
        raise KeyError

    def __len__(self) -> int:
        """Return number of devices within vector."""
        return len(self.__devices)

    def __contains__(self, key: str) -> bool:
        """Return if devices with name 'key' is within devices."""
        return any(device.name == key for device in self.__devices)

    def async_add(self, device: Device) -> None:
        """Add device to active XKNX devices."""
        device.register_device_updated_cb(self.device_updated)
        self.__devices.append(device)
        device.register_state_updater()
        if self.started.is_set():
            # start if device was added after async_start_device_tasks() / xknx.start()
            device.async_start_tasks()

    def async_remove(self, device: Device) -> None:
        """Remove device from XKNX devices."""
        device.async_remove_tasks()
        device.unregister_state_updater()
        device.unregister_device_updated_cb(self.device_updated)
        self.__devices.remove(device)

    def device_updated(self, device: Device) -> None:
        """Call all registered device updated callbacks of device."""
        for device_updated_cb in self.device_updated_cbs:
            device_updated_cb(device)

    def process(self, telegram: Telegram) -> None:
        """Process telegram."""
        if isinstance(
            telegram.destination_address, GroupAddress | InternalGroupAddress
        ):
            for device in self.devices_by_group_address(telegram.destination_address):
                device.process(telegram)

    async def sync(self) -> None:
        """Read state of devices from KNX bus."""
        await asyncio.gather(*[device.sync() for device in self.__devices])