1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72
|
"""
Module for managing a remote value typically used within a sensor.
The module maps a given value_type to a DPT class and uses this class
for serialization and deserialization of the KNX value.
"""
from __future__ import annotations
from typing import TYPE_CHECKING
from xknx.dpt.dpt import DPTArray, DPTBinary
from xknx.exceptions import ConversionError, CouldNotParseTelegram
from .remote_value import GroupAddressesType, RemoteValue, RVCallbackType
if TYPE_CHECKING:
from xknx.xknx import XKNX
class RemoteValueRaw(RemoteValue[int]):
"""Abstraction for raw values."""
def __init__(
self,
xknx: XKNX,
payload_length: int,
group_address: GroupAddressesType = None,
group_address_state: GroupAddressesType = None,
sync_state: bool | int | float | str = True,
device_name: str | None = None,
feature_name: str = "Raw",
after_update_cb: RVCallbackType[int] | None = None,
) -> None:
"""Initialize RemoteValueRaw class."""
self.payload_length = payload_length
super().__init__(
xknx,
group_address,
group_address_state,
sync_state=sync_state,
device_name=device_name,
feature_name=feature_name,
after_update_cb=after_update_cb,
)
def to_knx(self, value: int) -> DPTArray | DPTBinary:
"""Convert value to payload."""
if self.payload_length == 0:
try:
return DPTBinary(value)
except TypeError as err:
raise ConversionError(
"Could not init DPTBinary", value=str(value)
) from err
try:
return DPTArray(value.to_bytes(length=self.payload_length, byteorder="big"))
except (AttributeError, OverflowError) as err:
raise ConversionError("Could not init DPTArray", value=str(value)) from err
def from_knx(self, payload: DPTArray | DPTBinary) -> int:
"""Convert current payload to value."""
if isinstance(payload, DPTBinary) and self.payload_length == 0:
return payload.value
if isinstance(payload, DPTArray) and len(payload.value) == self.payload_length:
try:
return int.from_bytes(payload.value, byteorder="big")
except ValueError as err:
raise ConversionError(
"Could not parse payload", payload=payload
) from err
raise CouldNotParseTelegram("Payload invalid", payload=str(payload))
|