File: remote_value_setpoint_shift.py

package info (click to toggle)
python-xknx 3.6.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 4,012 kB
  • sloc: python: 39,710; javascript: 8,556; makefile: 27; sh: 12
file content (89 lines) | stat: -rw-r--r-- 3,151 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
"""
Module for managing setpoint shifting.

DPT 6.010.
"""

from __future__ import annotations

from enum import Enum
from typing import TYPE_CHECKING

from xknx.dpt import DPTArray, DPTBinary, DPTTemperature, DPTValue1Count
from xknx.exceptions import ConversionError, CouldNotParseTelegram

from .remote_value import GroupAddressesType, RemoteValue, RVCallbackType

if TYPE_CHECKING:
    from xknx.xknx import XKNX


class SetpointShiftMode(Enum):
    """Enum for setting the setpoint shift mode."""

    DPT6010 = DPTValue1Count
    DPT9002 = DPTTemperature


class RemoteValueSetpointShift(RemoteValue[float]):
    """Abstraction for remote value of KNX DPT 6.010."""

    def __init__(
        self,
        xknx: XKNX,
        group_address: GroupAddressesType = None,
        group_address_state: GroupAddressesType = None,
        sync_state: bool | int | float | str = True,
        device_name: str | None = None,
        after_update_cb: RVCallbackType[float] | None = None,
        setpoint_shift_mode: SetpointShiftMode | None = None,
        setpoint_shift_step: float = 0.1,
    ) -> None:
        """Initialize RemoteValueSetpointShift class."""
        super().__init__(
            xknx,
            group_address,
            group_address_state,
            sync_state=sync_state,
            device_name=device_name,
            feature_name="Setpoint shift value",
            after_update_cb=after_update_cb,
        )

        self._internal_dpt_class: type[DPTValue1Count | DPTTemperature] | None = (
            setpoint_shift_mode.value if setpoint_shift_mode is not None else None
        )
        self.setpoint_shift_step = setpoint_shift_step

    def to_knx(self, value: float) -> DPTArray:
        """Convert value to payload."""
        if self._internal_dpt_class is None:
            raise ConversionError(
                f"Setpoint shift DPT not initialized for {self.device_name}"
            )
        if self._internal_dpt_class == DPTValue1Count:
            converted_value = int(value / self.setpoint_shift_step)
            return DPTValue1Count.to_knx(converted_value)
        return DPTTemperature.to_knx(value)

    def from_knx(self, payload: DPTArray | DPTBinary) -> float:
        """Convert current payload to value."""
        if self._internal_dpt_class is None:
            self._internal_dpt_class = self._determine_dpt_class(payload)

        payload_value = self._internal_dpt_class.from_knx(payload)
        if self._internal_dpt_class == DPTValue1Count:
            return payload_value * self.setpoint_shift_step
        return payload_value

    def _determine_dpt_class(
        self, payload: DPTArray | DPTBinary
    ) -> type[DPTValue1Count | DPTTemperature]:
        """Test if telegram payload may be parsed."""
        if isinstance(payload, DPTArray):
            payload_length = len(payload.value)
            if payload_length == DPTTemperature.payload_length:
                return DPTTemperature
            if payload_length == DPTValue1Count.payload_length:
                return DPTValue1Count
        raise CouldNotParseTelegram("Payload invalid", payload=str(payload))