File: remote_value_by_length.py

package info (click to toggle)
python-xknx 3.6.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 4,012 kB
  • sloc: python: 39,710; javascript: 8,556; makefile: 27; sh: 12
file content (102 lines) | stat: -rw-r--r-- 3,743 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
"""Module for managing remote value with payload length based DPT detection."""

from __future__ import annotations

from collections.abc import Iterable
from typing import TYPE_CHECKING

from xknx.dpt import DPTArray, DPTBinary, DPTNumeric
from xknx.exceptions import ConversionError, CouldNotParseTelegram

from .remote_value import GroupAddressesType, RemoteValue, RVCallbackType

if TYPE_CHECKING:
    from xknx.xknx import XKNX


class RemoteValueByLength(RemoteValue[float]):
    """RemoteValue with DPT detection based on payload length of first received value."""

    def __init__(
        self,
        xknx: XKNX,
        dpt_classes: Iterable[type[DPTNumeric]],
        group_address: GroupAddressesType = None,
        group_address_state: GroupAddressesType = None,
        sync_state: bool | int | float | str = True,
        device_name: str | None = None,
        feature_name: str | None = None,
        after_update_cb: RVCallbackType[float] | None = None,
    ) -> None:
        """Initialize RemoteValueByLength class."""
        _payload_lengths = set()
        for dpt_class in dpt_classes:
            if (
                not issubclass(dpt_class, DPTNumeric)
                or dpt_class.payload_type is not DPTArray
            ):
                raise ConversionError(
                    "Only DPTNumeric subclasses with payload_type DPTArray are supported"
                )
            if dpt_class.payload_length in _payload_lengths:
                raise ConversionError(
                    f"Duplicate payload_length {dpt_class.payload_length} in {dpt_classes}"
                )
            _payload_lengths.add(dpt_class.payload_length)

        super().__init__(
            xknx,
            group_address,
            group_address_state,
            sync_state=sync_state,
            device_name=device_name,
            feature_name=feature_name,
            after_update_cb=after_update_cb,
        )

        self._dpt_classes = dpt_classes
        self._internal_dpt_class: type[DPTNumeric] | None = None

    def to_knx(self, value: float) -> DPTArray:
        """Convert value to payload."""
        if self._internal_dpt_class is None:
            raise ConversionError(
                f"RemoteValue DPT not initialized for {self.device_name}"
            )
        return self._internal_dpt_class.to_knx(value)

    def from_knx(self, payload: DPTArray | DPTBinary) -> float:
        """Convert current payload to value."""
        if self._internal_dpt_class is None:
            self._internal_dpt_class = self._determine_dpt_class(payload)

        return self._internal_dpt_class.from_knx(payload)

    @property
    def unit_of_measurement(self) -> str | None:
        """Return the unit of measurement."""
        if not self._internal_dpt_class:
            return None
        return self._internal_dpt_class.unit

    @property
    def ha_device_class(self) -> str | None:
        """Return a string representing the home assistant device class."""
        if not self._internal_dpt_class:
            return None
        return getattr(self._internal_dpt_class, "ha_device_class", None)

    def _determine_dpt_class(self, payload: DPTArray | DPTBinary) -> type[DPTNumeric]:
        """Test if telegram payload may be parsed."""
        if isinstance(payload, DPTArray):
            try:
                return next(
                    dpt_class
                    for dpt_class in self._dpt_classes
                    if dpt_class.payload_type is DPTArray
                    and dpt_class.payload_length == len(payload.value)
                )
            except StopIteration:
                pass

        raise CouldNotParseTelegram("Payload invalid", payload=str(payload))