File: header.py

package info (click to toggle)
python-xknx 3.6.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 4,012 kB
  • sloc: python: 39,710; javascript: 8,556; makefile: 27; sh: 12
file content (69 lines) | stat: -rw-r--r-- 2,640 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
"""Module for serialization and deserialization of KNX/IP Header."""

from __future__ import annotations

from typing import Final

from xknx.exceptions import CouldNotParseKNXIP, IncompleteKNXIPFrame

from .body import KNXIPBody
from .knxip_enum import KNXIPServiceType


class KNXIPHeader:
    """Class for serialization and deserialization of KNX/IP Header."""

    HEADERLENGTH: Final = 0x06
    PROTOCOLVERSION: Final = 0x10  # The only valid protocol version at this time is 1.0

    def __init__(self) -> None:
        """Initialize KNXIPHeader class."""
        self.service_type_ident = KNXIPServiceType.ROUTING_INDICATION
        self.total_length = 0  # to be set later

    def from_knx(self, data: bytes) -> int:
        """Parse/deserialize from KNX/IP raw data."""
        if len(data) < KNXIPHeader.HEADERLENGTH:
            raise IncompleteKNXIPFrame("wrong connection header length")
        if data[0] != KNXIPHeader.HEADERLENGTH:
            raise CouldNotParseKNXIP("wrong connection header length")
        # set immediately, as we need it for tcp stream parsing before raising exception
        self.total_length = data[4] * 256 + data[5]
        if data[1] != KNXIPHeader.PROTOCOLVERSION:
            raise CouldNotParseKNXIP("wrong protocol version")

        try:
            self.service_type_ident = KNXIPServiceType(data[2] * 256 + data[3])
        except ValueError:
            raise CouldNotParseKNXIP(
                f"KNXIPServiceType unknown: 0x{data[2:4].hex()}"
            ) from None
        return KNXIPHeader.HEADERLENGTH

    def set_length(self, body: KNXIPBody) -> None:
        """Set length of full KNX/IP packet from body + fixed header length."""
        if not isinstance(body, KNXIPBody):
            raise TypeError()
        self.total_length = KNXIPHeader.HEADERLENGTH + body.calculated_length()

    def to_knx(self) -> bytes:
        """Serialize to KNX/IP raw data."""
        return (
            bytes((KNXIPHeader.HEADERLENGTH, KNXIPHeader.PROTOCOLVERSION))
            + self.service_type_ident.value.to_bytes(2, "big")
            + self.total_length.to_bytes(2, "big")
        )

    def __repr__(self) -> str:
        """Return object as readable string."""
        return (
            "<KNXIPHeader "
            f'HeaderLength="{KNXIPHeader.HEADERLENGTH}" '
            f'ProtocolVersion="{KNXIPHeader.PROTOCOLVERSION}" '
            f'KNXIPServiceType="{self.service_type_ident.name}" '
            f'TotalLength="{self.total_length}" />'
        )

    def __eq__(self, other: object) -> bool:
        """Equal operator."""
        return self.__dict__ == other.__dict__