File: remote_value.py

package info (click to toggle)
python-xknx 3.14.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 4,064 kB
  • sloc: python: 40,895; javascript: 8,556; makefile: 32; sh: 12
file content (322 lines) | stat: -rw-r--r-- 12,176 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
"""
Module for managing a remote value on the KNX bus.

Remote value can be :
- a group address for writing a KNX value,
- a group address for reading a KNX value,
- or a group of both representing the same value.
"""

from __future__ import annotations

from abc import ABC
from collections.abc import Callable, Iterator
import logging
from typing import TYPE_CHECKING, Generic, TypeVar, Union

from xknx.dpt import DPTArray, DPTBase, DPTBinary
from xknx.exceptions import ConversionError, CouldNotParseTelegram
from xknx.telegram import GroupAddress, Telegram
from xknx.telegram.address import (
    DeviceGroupAddress,
    InternalGroupAddress,
    parse_device_group_address,
)
from xknx.telegram.apci import GroupValueResponse, GroupValueWrite

if TYPE_CHECKING:
    from xknx.telegram.address import DeviceAddressableType
    from xknx.xknx import XKNX

logger = logging.getLogger("xknx.log")

GroupAddressesType = Union[
    "DeviceAddressableType", list[Union["DeviceAddressableType", None]], None
]
ValueT = TypeVar("ValueT")

RVCallbackType = Callable[[ValueT], None]


class RemoteValue(ABC, Generic[ValueT]):
    """Class for managing remote knx value."""

    dpt_class: type[DPTBase] | None = None

    def __init__(
        self,
        xknx: XKNX,
        group_address: GroupAddressesType = None,
        group_address_state: GroupAddressesType = None,
        sync_state: None | bool | int | float | str = None,
        device_name: str | None = None,
        feature_name: str | None = None,
        after_update_cb: RVCallbackType[ValueT] | None = None,
    ) -> None:
        """Initialize RemoteValue class."""
        self.xknx: XKNX = xknx
        self.passive_group_addresses: list[DeviceGroupAddress] = []

        def unpack_group_addresses(
            addresses: GroupAddressesType,
        ) -> DeviceGroupAddress | None:
            """Parse group addresses and assign passive addresses when given."""
            if addresses is None:
                return None
            if not isinstance(addresses, list):
                return parse_device_group_address(addresses)
            if not addresses:  # empty list
                return None
            active = addresses[0]
            passive = [
                parse_device_group_address(addr)
                for addr in addresses[1:]
                if addr is not None
            ]
            self.passive_group_addresses.extend(passive)
            return parse_device_group_address(active) if active is not None else None

        self.group_address = unpack_group_addresses(group_address)
        self.group_address_state = unpack_group_addresses(group_address_state)

        self.device_name: str = "Unknown" if device_name is None else device_name
        self.feature_name: str = "Unknown" if feature_name is None else feature_name
        self._value: ValueT | None = None
        self._payload: DPTArray | DPTBinary | None = None
        self.telegram: Telegram | None = None
        self.after_update_cb: RVCallbackType[ValueT] | None = after_update_cb
        self._sync_state = sync_state

    def group_addresses(self) -> Iterator[DeviceGroupAddress]:
        """Return all configured group addresses of this RemoteValue."""
        if self.group_address is not None:
            yield self.group_address
        if self.group_address_state is not None:
            yield self.group_address_state
        yield from self.passive_group_addresses

    def register_state_updater(self) -> None:
        """Register RemoteValue for state updates."""
        sync_state = (
            self._sync_state
            if self._sync_state is not None
            else self.xknx.state_updater.default_use_updater
        )
        if sync_state and self.group_address_state:
            self.xknx.state_updater.register_remote_value(
                self, tracker_options=sync_state
            )

    def unregister_state_updater(self) -> None:
        """Unregister RemoteValue from state updates."""
        try:
            self.xknx.state_updater.unregister_remote_value(self)
        except KeyError:
            # KeyError if it was never added to StateUpdater
            pass

    @property
    def value(self) -> ValueT | None:
        """Get current value."""
        return self._value

    @value.setter
    def value(self, value: ValueT | None) -> None:
        """Set new value without creating a Telegram or calling after_update_cb. Raises ConversionError on invalid value."""
        if value is not None:
            # raises ConversionError on invalid value
            self._payload = self.to_knx(value)
        else:
            self._payload = None
        self._value = value

    @property
    def last_payload(self) -> DPTArray | DPTBinary | None:
        """Get last payload."""
        return self._payload

    def update_value(self, value: ValueT) -> None:
        """Set new value without creating a Telegram. Calls after_update_cb. Raises ConversionError on invalid value."""
        self.value = value
        if self.after_update_cb is not None:
            self.after_update_cb(value)

    @property
    def initialized(self) -> bool:
        """Evaluate if remote value is initialized with group address."""
        return bool(
            self.group_address_state
            or self.group_address
            or self.passive_group_addresses
        )

    @property
    def readable(self) -> bool:
        """Evaluate if remote value should be read from bus."""
        return bool(self.group_address_state)

    @property
    def writable(self) -> bool:
        """Evaluate if remote value has a group_address set."""
        return bool(self.group_address)

    def from_knx(self, payload: DPTArray | DPTBinary) -> ValueT:
        """Convert current payload to value - to be implemented in derived class when `dpt_class` can't be used."""
        if self.dpt_class is None:
            raise NotImplementedError(
                "Either `dpt_class` must be set or `from_knx` must be implemented"
            )
        return self.dpt_class.from_knx(payload)  # type: ignore[no-any-return]

    def to_knx(self, value: ValueT) -> DPTArray | DPTBinary:
        """Convert value to payload - to be implemented in derived class when `dpt_class` can't be used."""
        if self.dpt_class is None:
            raise NotImplementedError(
                "Either `dpt_class` must be set or `to_knx` must be implemented"
            )
        return self.dpt_class.to_knx(value)

    def process(self, telegram: Telegram, always_callback: bool = False) -> bool:
        """Process incoming or outgoing telegram."""
        if (
            not isinstance(
                telegram.destination_address, GroupAddress | InternalGroupAddress
            )
            or telegram.destination_address not in self.group_addresses()
        ):
            return False
        if not isinstance(telegram.payload, GroupValueWrite | GroupValueResponse):
            raise CouldNotParseTelegram(
                "payload not a GroupValueWrite or GroupValueResponse",
                payload=str(telegram.payload),
                destination_address=str(telegram.destination_address),
                source_address=str(telegram.source_address),
                device_name=self.device_name,
                feature_name=self.feature_name,
            )

        try:
            decoded_payload: ValueT
            if (
                telegram.decoded_data is not None
                and telegram.decoded_data.transcoder is self.dpt_class
            ):
                decoded_payload = telegram.decoded_data.value  # type: ignore[assignment]
            else:
                decoded_payload = self.from_knx(telegram.payload.value)
        except (ConversionError, CouldNotParseTelegram) as err:
            logger.warning(
                "Can not process %s for %s - %s: %s",
                telegram,
                self.device_name,
                self.feature_name,
                err,
            )
            return False
        self._payload = telegram.payload.value
        self.xknx.state_updater.update_received(self)
        if self._value is None or always_callback or self._value != decoded_payload:
            self._value = decoded_payload
            self.telegram = telegram
            if self.after_update_cb is not None:
                self.after_update_cb(decoded_payload)
        return True

    def send_raw(self, payload: DPTArray | DPTBinary, response: bool = False) -> None:
        """Send payload as telegram to KNX bus."""
        if self.group_address is None:
            logger.warning(
                "Attempted to set payload for non-writable device: %s - %s (payload: %s)",
                self.device_name,
                self.feature_name,
                payload,
            )
            return
        telegram = Telegram(
            destination_address=self.group_address,
            payload=(
                GroupValueResponse(payload) if response else GroupValueWrite(payload)
            ),
            source_address=self.xknx.current_address,
        )
        self.xknx.telegrams.put_nowait(telegram)

    def set(self, value: ValueT, response: bool = False) -> None:
        """Set new value."""
        if not self.writable:
            logger.warning(
                "Attempted to set value for non-writable device: %s - %s (value: %s)",
                self.device_name,
                self.feature_name,
                value,
            )
            return
        payload = self.to_knx(value)
        # self._value is set and after_update_cb() called when the outgoing telegram is processed.
        self.send_raw(payload, response)

    def respond(self) -> None:
        """Send current payload as GroupValueResponse telegram to KNX bus."""
        if self._value is None:
            return
        payload = self.to_knx(self._value)
        self.send_raw(payload, response=True)

    async def read_state(self, wait_for_result: bool = False) -> None:
        """Send GroupValueRead telegram for state address to KNX bus."""
        if self.group_address_state is not None:
            # pylint: disable=import-outside-toplevel
            # TODO: send a ReadRequest and start a timeout from here instead of ValueReader
            #       cancel timeout form process(); delete ValueReader
            from xknx.core import ValueReader

            value_reader = ValueReader(self.xknx, self.group_address_state)
            if wait_for_result:
                if await value_reader.read() is None:
                    logger.warning(
                        "Could not sync group address '%s' (%s - %s)",
                        self.group_address_state,
                        self.device_name,
                        self.feature_name,
                    )
            else:
                value_reader.send_group_read()

    @property
    def unit_of_measurement(self) -> str | None:
        """Return the unit of measurement."""
        return None

    def group_addr_str(self) -> str:
        """Return object as readable string."""
        return (
            f"<{self.group_address}, "
            f"{self.group_address_state}, "
            f"{list(map(str, self.passive_group_addresses))}, "
            f"{self.value!r} />"
        )

    def __str__(self) -> str:
        """Return object as string representation."""
        return (
            f"<{self.__class__.__name__} "
            f'device_name="{self.device_name}" '
            f'feature_name="{self.feature_name}" '
            f"{self.group_addr_str()} />"
        )

    def __eq__(self, other: object) -> bool:
        """Equal operator."""
        for key, value in self.__dict__.items():
            if key == "after_update_cb":
                continue
            if key not in other.__dict__:
                return False
            if other.__dict__[key] != value:
                return False
        for key in other.__dict__:
            if key == "after_update_cb":
                continue
            if key not in self.__dict__:
                return False
        return True