File: binary_sensor.py

package info (click to toggle)
python-xknx 3.6.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 4,012 kB
  • sloc: python: 39,710; javascript: 8,556; makefile: 27; sh: 12
file content (184 lines) | stat: -rw-r--r-- 6,674 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
"""
Module for managing a binary sensor.

A binary sensor can be:
* A switch in the wall (as in the thing you press to switch on the light)
* A motion detector
* A reed sensor for detecting of a window/door is opened or closed.
"""

from __future__ import annotations

import asyncio
from collections.abc import Iterator
from functools import partial
import time
from typing import TYPE_CHECKING, cast

from xknx.core import Task
from xknx.remote_value import GroupAddressesType, RemoteValueSwitch

from .device import Device, DeviceCallbackType

if TYPE_CHECKING:
    from xknx.telegram import Telegram
    from xknx.xknx import XKNX


class BinarySensor(Device):
    """Class for binary sensor."""

    def __init__(
        self,
        xknx: XKNX,
        name: str,
        group_address_state: GroupAddressesType = None,
        invert: bool = False,
        sync_state: bool | int | float | str = True,
        ignore_internal_state: bool = False,
        reset_after: float | None = None,
        context_timeout: float | None = None,
        device_updated_cb: DeviceCallbackType[BinarySensor] | None = None,
    ) -> None:
        """Initialize BinarySensor class."""
        super().__init__(xknx, name, device_updated_cb)

        self.ignore_internal_state = ignore_internal_state or bool(context_timeout)
        self.reset_after = reset_after
        self.state: bool | None = None

        self._context_timeout = context_timeout
        self._count_set_on = 0
        self._count_set_off = 0
        self._last_set: float | None = None
        self._reset_task: Task | None = None
        self._context_task: Task | None = None

        self.remote_value = RemoteValueSwitch(
            xknx,
            group_address_state=group_address_state,
            invert=invert,
            sync_state=sync_state,
            device_name=self.name,
            # after_update called internally
            after_update_cb=self._state_from_remote_value,
        )

    def _iter_remote_values(self) -> Iterator[RemoteValueSwitch]:
        """Iterate the devices RemoteValue classes."""
        yield self.remote_value

    def async_remove_tasks(self) -> None:
        """Remove async tasks of device."""
        if self._context_task:
            self.xknx.task_registry.unregister(self._context_task.name)
            self._context_task = None
        if self._reset_task:
            self.xknx.task_registry.unregister(self._reset_task.name)
            self._reset_task = None

    @property
    def last_telegram(self) -> Telegram | None:
        """Return the last telegram received from the RemoteValue."""
        return self.remote_value.telegram

    def _state_from_remote_value(self, state: bool) -> None:
        """Update the internal state from RemoteValue (Callback)."""
        self._set_internal_state(state)

    def _set_internal_state(self, state: bool) -> None:
        """Set the internal state of the device. If state was changed after_update hooks and connected Actions are executed."""
        if state != self.state or self.ignore_internal_state:
            self.state = state

            if self.ignore_internal_state and self._context_timeout:
                self.bump_and_get_counter(state)
                self._context_task = self.xknx.task_registry.register(
                    name=f"binary_sensor.context_{id(self)}",
                    async_func=partial(self._counter_task, self._context_timeout),
                ).start()
            else:
                self.after_update()

    async def _counter_task(self, wait_seconds: float) -> None:
        """Trigger after 1 second to prevent double triggers."""
        await asyncio.sleep(wait_seconds)
        self.after_update()

        self._count_set_on = 0
        self._count_set_off = 0
        self.after_update()

    @property
    def counter(self) -> int | None:
        """Return current counter for sensor."""
        if self._context_timeout:
            return self._count_set_on if self.state else self._count_set_off
        return None

    def bump_and_get_counter(self, state: bool) -> int:
        """Bump counter and return the number of times a state was set to the same value within CONTEXT_TIMEOUT."""

        def within_same_context() -> bool:
            """Check if state change was within same context (e.g. 'Button was pressed twice')."""
            if self._last_set is None:
                self._last_set = time.time()
                return False
            new_set_time = time.time()
            time_diff = new_set_time - self._last_set
            self._last_set = new_set_time
            return time_diff < cast(float, self._context_timeout)

        if within_same_context():
            if state:
                self._count_set_on = self._count_set_on + 1
                return self._count_set_on
            self._count_set_off = self._count_set_off + 1
            return self._count_set_off

        if state:
            self._count_set_on = 1
            self._count_set_off = 0
        else:
            self._count_set_on = 0
            self._count_set_off = 1
        return 1

    def process_group_write(self, telegram: Telegram) -> None:
        """Process incoming and outgoing GROUP WRITE telegram."""
        if self.remote_value.process(telegram, always_callback=True):
            self._process_reset_after()

    def process_group_response(self, telegram: Telegram) -> None:
        """Process incoming GroupValueResponse telegrams."""
        if self.remote_value.process(telegram, always_callback=False):
            self._process_reset_after()

    def _process_reset_after(self) -> None:
        """Create Task for resetting state if 'reset_after' is configured."""
        if self.reset_after is not None and self.state:
            self._reset_task = self.xknx.task_registry.register(
                name=f"binary_sensor.reset_{id(self)}",
                async_func=partial(self._reset_state, self.reset_after),
                track_task=True,
            ).start()

    async def _reset_state(self, wait_seconds: float) -> None:
        await asyncio.sleep(wait_seconds)
        self._set_internal_state(False)

    def is_on(self) -> bool:
        """Return if binary sensor is 'on'."""
        return bool(self.state)

    def is_off(self) -> bool:
        """Return if binary sensor is 'off'."""
        return not self.state

    def __str__(self) -> str:
        """Return object as readable string."""
        return (
            f'<BinarySensor name="{self.name}" '
            f"remote_value={self.remote_value.group_addr_str()} "
            f"state={self.state.__repr__()} />"
        )