File: group_address_dpt.py

package info (click to toggle)
python-xknx 3.14.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 4,064 kB
  • sloc: python: 40,895; javascript: 8,556; makefile: 32; sh: 12
file content (90 lines) | stat: -rw-r--r-- 3,528 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
"""Group address data point type table."""

from __future__ import annotations

from collections.abc import Mapping
import logging

from xknx.dpt.dpt import DPTBase
from xknx.exceptions import ConversionError, CouldNotParseAddress, CouldNotParseTelegram
from xknx.telegram import Telegram, TelegramDecodedData
from xknx.telegram.address import (
    DeviceAddressableType,
    DeviceGroupAddress,
    GroupAddress,
    InternalGroupAddress,
    parse_device_group_address,
)
from xknx.telegram.apci import GroupValueResponse, GroupValueWrite
from xknx.typing import DPTParsable

_GA_DPT_LOGGER = logging.getLogger("xknx.ga_dpt")


class GroupAddressDPT:
    """Class for mapping group addresses to data point types for eager decoding."""

    __slots__ = ("_ga_dpts", "ga_decoding_error")

    def __init__(self) -> None:
        """Initialize GADataTypes class."""
        # using dict[int | str] instead of dict[DeviceGroupAddress] is faster.
        self._ga_dpts: dict[int | str, type[DPTBase]] = {}
        self.ga_decoding_error: set[GroupAddress | InternalGroupAddress] = set()

    def set(
        self,
        ga_dpt: Mapping[DeviceAddressableType, DPTParsable],
    ) -> None:
        """Assign decoders to group addresses."""
        unknown_dpts = set()
        for addr, dpt in ga_dpt.items():
            try:
                address = parse_device_group_address(addr)
            except CouldNotParseAddress as err:
                _GA_DPT_LOGGER.warning("Invalid group address %s: %s", addr, err)
                continue
            if (transcoder := DPTBase.parse_transcoder(dpt)) is None:
                unknown_dpts.add(repr(dpt))  # prevent unhashable types (dict)
                continue
            self._ga_dpts[address.raw] = transcoder
        if unknown_dpts:
            _GA_DPT_LOGGER.debug("No transcoder found for DPTs: %s", unknown_dpts)

    def get(self, address: DeviceGroupAddress) -> type[DPTBase] | None:
        """Return transcoder for group address."""
        return self._ga_dpts.get(address.raw)

    def clear(self) -> None:
        """Clear all group addresses."""
        self._ga_dpts = {}

    def set_decoded_data(self, telegram: Telegram) -> None:
        """Update telegram data with decoded value."""
        if telegram.decoded_data is not None:
            return
        if not isinstance(telegram.payload, GroupValueWrite | GroupValueResponse):
            return
        assert isinstance(  # GroupValueWrite and GroupValueResponse can not have IndividualAddress
            telegram.destination_address, GroupAddress | InternalGroupAddress
        )
        if (transcoder := self.get(telegram.destination_address)) is None:
            return
        try:
            value = transcoder.from_knx(telegram.payload.value)
        except (CouldNotParseTelegram, ConversionError) as err:
            if telegram.destination_address in self.ga_decoding_error:
                _logger_fn = _GA_DPT_LOGGER.debug
            else:
                _logger_fn = _GA_DPT_LOGGER.warning
            self.ga_decoding_error.add(telegram.destination_address)
            _logger_fn(
                "DPT decoding error. Telegram from %s to %s with payload %s can't be decoded by %s: %s",
                telegram.source_address,
                telegram.destination_address,
                telegram.payload.value,
                transcoder.dpt_name(),
                err,
            )
            return
        telegram.decoded_data = TelegramDecodedData(transcoder, value)