File: raw_value.py

package info (click to toggle)
python-xknx 3.6.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 4,012 kB
  • sloc: python: 39,710; javascript: 8,556; makefile: 27; sh: 12
file content (89 lines) | stat: -rw-r--r-- 2,936 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
"""
Module for managing a raw value via KNX.

It provides functionality for

* reading the current value from KNX bus or providing a value to the bus.
* send local state changes to KNX bus.
* watching for state updates from KNX bus.
"""

from __future__ import annotations

from collections.abc import Iterator
from typing import TYPE_CHECKING

from xknx.remote_value import GroupAddressesType, RemoteValueRaw

from .device import Device, DeviceCallbackType

if TYPE_CHECKING:
    from xknx.telegram import Telegram
    from xknx.xknx import XKNX


class RawValue(Device):
    """Class for managing a raw value."""

    def __init__(
        self,
        xknx: XKNX,
        name: str,
        payload_length: int,
        group_address: GroupAddressesType = None,
        group_address_state: GroupAddressesType = None,
        respond_to_read: bool = False,
        sync_state: bool | int | float | str = True,
        always_callback: bool = False,
        device_updated_cb: DeviceCallbackType[RawValue] | None = None,
    ) -> None:
        """Initialize Sensor class."""
        super().__init__(xknx, name, device_updated_cb)
        self.always_callback = always_callback
        self.respond_to_read = respond_to_read
        self.remote_value = RemoteValueRaw(
            xknx,
            payload_length=payload_length,
            group_address=group_address,
            group_address_state=group_address_state,
            sync_state=sync_state,
            device_name=self.name,
            after_update_cb=self.after_update,
        )

    def _iter_remote_values(self) -> Iterator[RemoteValueRaw]:
        """Iterate the devices RemoteValue classes."""
        yield self.remote_value

    @property
    def last_telegram(self) -> Telegram | None:
        """Return the last telegram received from the RemoteValue."""
        return self.remote_value.telegram

    def process_group_write(self, telegram: Telegram) -> None:
        """Process incoming and outgoing GROUP WRITE telegram."""
        self.remote_value.process(telegram, always_callback=self.always_callback)

    def process_group_read(self, telegram: Telegram) -> None:
        """Process incoming GroupValueResponse telegrams."""
        if (
            self.respond_to_read
            and telegram.destination_address == self.remote_value.group_address
        ):
            self.remote_value.respond()

    async def set(self, value: int) -> None:
        """Set new value."""
        self.remote_value.set(value)

    def resolve_state(self) -> int | None:
        """Return the current state of the sensor as an unsigned integer."""
        return self.remote_value.value

    def __str__(self) -> str:
        """Return object as readable string."""
        return (
            f'<RawValue name="{self.name}" '
            f"addresses={self.remote_value.group_addr_str()} "
            f"value={self.resolve_state().__repr__()}/>"
        )