1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108
|
"""
Module for managing a switch via KNX.
It provides functionality for
* switching 'on' and 'off'.
* reading the current state from KNX bus.
"""
from __future__ import annotations
import asyncio
from collections.abc import Iterator
from functools import partial
import logging
from typing import TYPE_CHECKING
from xknx.core import Task
from xknx.remote_value import GroupAddressesType, RemoteValueSwitch
from .device import Device, DeviceCallbackType
if TYPE_CHECKING:
from xknx.telegram import Telegram
from xknx.xknx import XKNX
logger = logging.getLogger("xknx.log")
class Switch(Device):
"""Class for managing a switch."""
def __init__(
self,
xknx: XKNX,
name: str,
group_address: GroupAddressesType = None,
group_address_state: GroupAddressesType = None,
respond_to_read: bool = False,
sync_state: bool | int | float | str = True,
invert: bool = False,
reset_after: float | None = None,
device_updated_cb: DeviceCallbackType[Switch] | None = None,
) -> None:
"""Initialize Switch class."""
super().__init__(xknx, name, device_updated_cb)
self.reset_after = reset_after
self._reset_task: Task | None = None
self.respond_to_read = respond_to_read
self.switch = RemoteValueSwitch(
xknx,
group_address,
group_address_state,
sync_state=sync_state,
invert=invert,
device_name=self.name,
after_update_cb=self.after_update,
)
def _iter_remote_values(self) -> Iterator[RemoteValueSwitch]:
"""Iterate the devices RemoteValue classes."""
yield self.switch
def async_remove_tasks(self) -> None:
"""Remove async tasks of device."""
if self._reset_task is not None:
self._reset_task.cancel()
self._reset_task = None
@property
def state(self) -> bool | None:
"""Return the current switch state of the device."""
return self.switch.value
async def set_on(self) -> None:
"""Switch on switch."""
self.switch.on()
async def set_off(self) -> None:
"""Switch off switch."""
self.switch.off()
def process_group_write(self, telegram: Telegram) -> None:
"""Process incoming and outgoing GROUP WRITE telegram."""
if self.switch.process(telegram):
if self.reset_after is not None and self.switch.value:
self._reset_task = self.xknx.task_registry.register(
name=f"switch.reset_{id(self)}",
async_func=partial(self._reset_state, self.reset_after),
track_task=True,
).start()
def process_group_read(self, telegram: Telegram) -> None:
"""Process incoming GroupValueResponse telegrams."""
if (
self.respond_to_read
and telegram.destination_address == self.switch.group_address
):
self.switch.respond()
async def _reset_state(self, wait_seconds: float) -> None:
await asyncio.sleep(wait_seconds)
await self.set_off()
def __str__(self) -> str:
"""Return object as readable string."""
return f'<Switch name="{self.name}" switch={self.switch.group_addr_str()} />'
|