File: hpai.py

package info (click to toggle)
python-xknx 3.6.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 4,012 kB
  • sloc: python: 39,710; javascript: 8,556; makefile: 27; sh: 12
file content (85 lines) | stat: -rw-r--r-- 2,838 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
"""
Module for serialization and deserialization of KNX HPAI (Host Protocol Address Information) information.

A HPAI contains an IP address and a port.
"""

from __future__ import annotations

import socket

from xknx.exceptions import ConversionError, CouldNotParseKNXIP

from .knxip_enum import HostProtocol


class HPAI:
    """Class for Module for Serialization and Deserialization."""

    LENGTH = 0x08

    def __init__(
        self,
        ip_addr: str = "0.0.0.0",
        port: int = 0,
        protocol: HostProtocol = HostProtocol.IPV4_UDP,
    ) -> None:
        """Initialize HPAI object."""
        self.ip_addr = ip_addr
        self.port = port
        self.protocol = protocol

    @property
    def route_back(self) -> bool:
        """Return True if HPAI is a route back address information."""
        return self.ip_addr == "0.0.0.0"

    @property
    def addr_tuple(self) -> tuple[str, int]:
        """Return tuple of ip address and port."""
        return self.ip_addr, self.port

    def from_knx(self, raw: bytes) -> int:
        """Parse/deserialize from KNX/IP raw data."""
        if len(raw) < HPAI.LENGTH:
            raise CouldNotParseKNXIP("wrong HPAI length")
        if raw[0] != HPAI.LENGTH:
            raise CouldNotParseKNXIP("wrong HPAI length")
        try:
            self.protocol = HostProtocol(raw[1])
        except ValueError as err:
            raise CouldNotParseKNXIP("unsupported host protocol code") from err
        self.ip_addr = socket.inet_ntoa(raw[2:6])
        self.port = raw[6] * 256 + raw[7]
        return HPAI.LENGTH

    def to_knx(self) -> bytes:
        """Serialize to KNX/IP raw data."""
        try:
            return (
                bytes((HPAI.LENGTH, self.protocol.value))
                + socket.inet_aton(self.ip_addr)
                + self.port.to_bytes(2, "big")
            )
        except (OSError, TypeError) as err:
            # OSError for invalid address strings; TypeError for non-strings
            raise ConversionError(f"Invalid IPv4 address: {self.ip_addr}") from err
        except (OverflowError, AttributeError) as err:
            # OverflowError for int < 0 or int > 65535; AttributeError for non-integers
            raise ConversionError(f"Port is not valid: {self.port}") from err

    def __repr__(self) -> str:
        """Representation of object."""
        return f"HPAI('{self.ip_addr}', {self.port}, {self.protocol})"

    def __str__(self) -> str:
        """Return object as readable string."""
        return f"{self.ip_addr}:{self.port}/{self.protocol.name[-3:].lower()}"

    def __eq__(self, other: object) -> bool:
        """Equal operator."""
        return self.__dict__ == other.__dict__

    def __hash__(self) -> int:
        """Hash function."""
        return hash((self.ip_addr, self.port, self.protocol))