1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102
|
"""Module for managing remote value with payload length based DPT detection."""
from __future__ import annotations
from collections.abc import Iterable
from typing import TYPE_CHECKING
from xknx.dpt import DPTArray, DPTBinary, DPTNumeric
from xknx.exceptions import ConversionError, CouldNotParseTelegram
from .remote_value import GroupAddressesType, RemoteValue, RVCallbackType
if TYPE_CHECKING:
from xknx.xknx import XKNX
class RemoteValueByLength(RemoteValue[float]):
"""RemoteValue with DPT detection based on payload length of first received value."""
def __init__(
self,
xknx: XKNX,
dpt_classes: Iterable[type[DPTNumeric]],
group_address: GroupAddressesType = None,
group_address_state: GroupAddressesType = None,
sync_state: bool | int | float | str = True,
device_name: str | None = None,
feature_name: str | None = None,
after_update_cb: RVCallbackType[float] | None = None,
) -> None:
"""Initialize RemoteValueByLength class."""
_payload_lengths = set()
for dpt_class in dpt_classes:
if (
not issubclass(dpt_class, DPTNumeric)
or dpt_class.payload_type is not DPTArray
):
raise ConversionError(
"Only DPTNumeric subclasses with payload_type DPTArray are supported"
)
if dpt_class.payload_length in _payload_lengths:
raise ConversionError(
f"Duplicate payload_length {dpt_class.payload_length} in {dpt_classes}"
)
_payload_lengths.add(dpt_class.payload_length)
super().__init__(
xknx,
group_address,
group_address_state,
sync_state=sync_state,
device_name=device_name,
feature_name=feature_name,
after_update_cb=after_update_cb,
)
self._dpt_classes = dpt_classes
self._internal_dpt_class: type[DPTNumeric] | None = None
def to_knx(self, value: float) -> DPTArray:
"""Convert value to payload."""
if self._internal_dpt_class is None:
raise ConversionError(
f"RemoteValue DPT not initialized for {self.device_name}"
)
return self._internal_dpt_class.to_knx(value)
def from_knx(self, payload: DPTArray | DPTBinary) -> float:
"""Convert current payload to value."""
if self._internal_dpt_class is None:
self._internal_dpt_class = self._determine_dpt_class(payload)
return self._internal_dpt_class.from_knx(payload)
@property
def unit_of_measurement(self) -> str | None:
"""Return the unit of measurement."""
if not self._internal_dpt_class:
return None
return self._internal_dpt_class.unit
@property
def ha_device_class(self) -> str | None:
"""Return a string representing the home assistant device class."""
if not self._internal_dpt_class:
return None
return getattr(self._internal_dpt_class, "ha_device_class", None)
def _determine_dpt_class(self, payload: DPTArray | DPTBinary) -> type[DPTNumeric]:
"""Test if telegram payload may be parsed."""
if isinstance(payload, DPTArray):
try:
return next(
dpt_class
for dpt_class in self._dpt_classes
if dpt_class.payload_type is DPTArray
and dpt_class.payload_length == len(payload.value)
)
except StopIteration:
pass
raise CouldNotParseTelegram("Payload invalid", payload=str(payload))
|