1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208
|
"""
Module for exposing a (virtual) sensor to KNX bus.
It provides functionality for
* push local state changes to KNX bus
* KNX devices may read local values via GROUP READ.
(A typical example for using this class is the outside temperature
read from e.g. an internet serviceand exposed to the KNX bus.
KNX devices may show this value within their display.)
"""
from __future__ import annotations
import asyncio
from collections.abc import Iterator
from typing import TYPE_CHECKING, Any
from xknx.core import Task
from xknx.dpt import DPTArray, DPTBase, DPTBinary
from xknx.remote_value import (
GroupAddressesType,
RemoteValue,
RemoteValueSensor,
RemoteValueSwitch,
)
from xknx.telegram import TelegramDirection
from xknx.typing import DPTParsable
from .device import Device, DeviceCallbackType
if TYPE_CHECKING:
from xknx.telegram import Telegram
from xknx.xknx import XKNX
class ExposeSensor(Device):
"""Class for managing a sensor."""
def __init__(
self,
xknx: XKNX,
name: str,
group_address: GroupAddressesType = None,
respond_to_read: bool = True,
value_type: DPTParsable | type[DPTBase] | None = None,
cooldown: float = 0,
periodic_send: float = 0,
device_updated_cb: DeviceCallbackType[ExposeSensor] | None = None,
) -> None:
"""
Initialize ExposeSensor class.
Args:
xknx: XKNX instance to use for communication.
name: Name of the device.
group_address: KNX group address to send the value to.
respond_to_read: If True, respond to GroupValueRead telegrams with the
current value.
value_type: DPT type or identifier used to encode the sensor value.
cooldown: Minimum time in seconds between sending values to the KNX
bus. If multiple updates occur during this period, only the last
value is sent when the cooldown ends. ``0`` (default) disables cooldown.
periodic_send: Interval in seconds for automatically re-sending the
current value. A value of ``0`` (default) disables periodic sending.
device_updated_cb: Callback invoked when the device has been
updated.
"""
super().__init__(xknx, name, device_updated_cb)
self.respond_to_read = respond_to_read
self.sensor_value: RemoteValueSensor | RemoteValueSwitch
if value_type == "binary":
self.sensor_value = RemoteValueSwitch(
xknx,
group_address=group_address,
sync_state=False,
device_name=self.name,
after_update_cb=self.after_update,
)
else:
self.sensor_value = RemoteValueSensor(
xknx,
group_address=group_address,
sync_state=False,
device_name=self.name,
after_update_cb=self.after_update,
value_type=value_type,
)
self.cooldown = cooldown
# the next payload to be sent after cooldown or the last sent payload
self._payload_after_cooldown: DPTArray | DPTBinary | None = None
self._cooldown_task: Task | None = None
self._cooldown_task_name = f"expose_sensor.cooldown_{id(self)}"
self._periodic_send_time = periodic_send
self._periodic_send_task: Task | None = None
def _iter_remote_values(self) -> Iterator[RemoteValue[Any]]:
"""Iterate the devices RemoteValue classes."""
yield self.sensor_value
def async_start_tasks(self) -> None:
"""Start async background tasks of device."""
if self._periodic_send_time > 0:
self._periodic_send_task = self.xknx.task_registry.register(
name=f"expose_sensor.periodic_send_{id(self)}",
async_func=self._periodic_send_loop,
restart_after_reconnect=True,
).start()
def async_remove_tasks(self) -> None:
"""Remove async tasks of device."""
if self._cooldown_task is not None:
self.xknx.task_registry.unregister(self._cooldown_task.name)
self._cooldown_task = None
if self._periodic_send_task is not None:
self.xknx.task_registry.unregister(self._periodic_send_task.name)
self._periodic_send_task = None
def process_group_write(self, telegram: Telegram) -> None:
"""Process incoming and outgoing GROUP WRITE telegram."""
self.sensor_value.process(telegram)
# reset periodic send timer
if (
telegram.direction is TelegramDirection.OUTGOING
and self._periodic_send_task is not None
):
self._periodic_send_task.cancel()
self._periodic_send_task.start()
def process_group_read(self, telegram: Telegram) -> None:
"""Process incoming GROUP READ telegram."""
if not self.respond_to_read:
return
if self._payload_after_cooldown is not None:
# reading shall not be affected by cooldown, but restart the timer
self.sensor_value.send_raw(self._payload_after_cooldown, response=True)
self._restart_cooldown()
return
self.sensor_value.respond()
async def set(self, value: Any, skip_unchanged: bool = False) -> None:
"""
Set new value.
Set `skip_unchanged` to skip sending when the encoded payload matches the last one.
"""
payload = self.sensor_value.to_knx(value)
if skip_unchanged and self._payload_after_cooldown == payload:
return
self._payload_after_cooldown = payload
if self.cooldown:
if self._cooldown_task is not None and not self._cooldown_task.done():
return
self._cooldown_task = self.xknx.task_registry.register(
name=self._cooldown_task_name,
async_func=self._cooldown_wait,
).start()
self.sensor_value.send_raw(payload)
async def _cooldown_wait(self) -> None:
"""Send value after cooldown if it differs from last processed value."""
while True:
await asyncio.sleep(self.cooldown)
if self.sensor_value.last_payload == self._payload_after_cooldown:
break
self.sensor_value.send_raw(self._payload_after_cooldown) # type: ignore[arg-type]
def _restart_cooldown(self) -> None:
"""Reset cooldown task."""
if not self.cooldown:
return
if self._cooldown_task is not None and not self._cooldown_task.done():
self._cooldown_task.cancel()
self._cooldown_task.start()
return
self._cooldown_task = self.xknx.task_registry.register(
name=self._cooldown_task_name,
async_func=self._cooldown_wait,
).start()
async def _periodic_send_loop(self) -> None:
"""Endless loop for periodic sending of sensor value."""
while True:
await asyncio.sleep(self._periodic_send_time)
if self._payload_after_cooldown is not None:
self.sensor_value.send_raw(self._payload_after_cooldown)
self._restart_cooldown()
def unit_of_measurement(self) -> str | None:
"""Return the unit of measurement."""
return self.sensor_value.unit_of_measurement
def resolve_state(self) -> Any:
"""Return the current state of the sensor as a human readable string."""
return self.sensor_value.value
def __str__(self) -> str:
"""Return object as readable string."""
return (
f'<ExposeSensor name="{self.name}" '
f"sensor={self.sensor_value.group_addr_str()} "
f"value={self.sensor_value.value!r} "
f'unit="{self.unit_of_measurement()}"/>'
)
|