File: expose_sensor.py

package info (click to toggle)
python-xknx 3.6.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 4,012 kB
  • sloc: python: 39,710; javascript: 8,556; makefile: 27; sh: 12
file content (139 lines) | stat: -rw-r--r-- 4,882 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
"""
Module for exposing a (virtual) sensor to KNX bus.

It provides functionality for

* push local state changes to KNX bus
* KNX devices may read local values via GROUP READ.

(A typical example for using this class is the outside temperature
read from e.g. an internet serviceand exposed to the KNX bus.
KNX devices may show this value within their display.)
"""

from __future__ import annotations

import asyncio
from collections.abc import Iterator
from typing import TYPE_CHECKING, Any

from xknx.core import Task
from xknx.dpt import DPTBase
from xknx.remote_value import (
    GroupAddressesType,
    RemoteValue,
    RemoteValueSensor,
    RemoteValueSwitch,
)
from xknx.typing import DPTParsable

from .device import Device, DeviceCallbackType

if TYPE_CHECKING:
    from xknx.telegram import Telegram
    from xknx.xknx import XKNX


class ExposeSensor(Device):
    """Class for managing a sensor."""

    def __init__(
        self,
        xknx: XKNX,
        name: str,
        group_address: GroupAddressesType = None,
        respond_to_read: bool = True,
        value_type: DPTParsable | type[DPTBase] | None = None,
        cooldown: float = 0,
        device_updated_cb: DeviceCallbackType[ExposeSensor] | None = None,
    ) -> None:
        """Initialize Sensor class."""
        super().__init__(xknx, name, device_updated_cb)
        self.cooldown = cooldown
        self.respond_to_read = respond_to_read
        self.sensor_value: RemoteValueSensor | RemoteValueSwitch
        if value_type == "binary":
            self.sensor_value = RemoteValueSwitch(
                xknx,
                group_address=group_address,
                sync_state=False,
                device_name=self.name,
                after_update_cb=self.expose_after_update,
            )
        else:
            self.sensor_value = RemoteValueSensor(
                xknx,
                group_address=group_address,
                sync_state=False,
                device_name=self.name,
                after_update_cb=self.expose_after_update,
                value_type=value_type,
            )
        self._cooldown_latest_value: Any | None = None
        self._cooldown_task: Task | None = None
        self._cooldown_task_name = f"expose_sensor.cooldown_{id(self)}"

    def expose_after_update(self, value: int | float | str | bool) -> None:
        """Call after state was updated."""
        self._cooldown_latest_value = value
        super().after_update()

    def _iter_remote_values(self) -> Iterator[RemoteValue[Any]]:
        """Iterate the devices RemoteValue classes."""
        yield self.sensor_value

    def async_remove_tasks(self) -> None:
        """Remove async tasks of device."""
        if self._cooldown_task is not None:
            self.xknx.task_registry.unregister(self._cooldown_task.name)
            self._cooldown_task = None

    def process_group_write(self, telegram: Telegram) -> None:
        """Process incoming and outgoing GROUP WRITE telegram."""
        self.sensor_value.process(telegram)

    def process_group_read(self, telegram: Telegram) -> None:
        """Process incoming GROUP READ telegram."""
        if not self.respond_to_read:
            return
        if self._cooldown_latest_value is not None:
            self.sensor_value.set(self._cooldown_latest_value, response=True)
            return
        self.sensor_value.respond()

    async def set(self, value: Any) -> None:
        """Set new value."""
        if self.cooldown:
            self._cooldown_latest_value = value
            if self._cooldown_task is not None and not self._cooldown_task.done():
                return
            self._cooldown_task = self.xknx.task_registry.register(
                name=self._cooldown_task_name,
                async_func=self._cooldown_wait,
            ).start()
        self.sensor_value.set(value)

    async def _cooldown_wait(self) -> None:
        """Send value after cooldown if it differs from last processed value."""
        while True:
            await asyncio.sleep(self.cooldown)
            if self.sensor_value.value == self._cooldown_latest_value:
                break
            self.sensor_value.set(self._cooldown_latest_value)  # type: ignore[arg-type]

    def unit_of_measurement(self) -> str | None:
        """Return the unit of measurement."""
        return self.sensor_value.unit_of_measurement

    def resolve_state(self) -> Any:
        """Return the current state of the sensor as a human readable string."""
        return self.sensor_value.value

    def __str__(self) -> str:
        """Return object as readable string."""
        return (
            f'<ExposeSensor name="{self.name}" '
            f"sensor={self.sensor_value.group_addr_str()} "
            f"value={self.sensor_value.value!r} "
            f'unit="{self.unit_of_measurement()}"/>'
        )