File: packet_reader.py

package info (click to toggle)
python-airtouch5py 0.2.10-1.1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 1,052 kB
  • sloc: python: 1,748; makefile: 5
file content (62 lines) | stat: -rw-r--r-- 2,199 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
import logging

from airtouch5py.packet_decoder import PacketDecoder
from airtouch5py.packets.datapacket import DataPacket

_LOGGER = logging.getLogger(__name__)

# Header (4) + Address (2) + Message Id (1) + Message type (1) + Data length (2) + check bytes (2)
MINIMUM_PACKET_LENGTH = 12
HEADER = b"\x55\x55\x55\xAA"


class PacketReader:
    """
    Provide the bytes received from the airtouch 5 socket, and this class will find the packets, decode them, and return them.
    """

    _buffer: bytearray
    _packet_decoder = PacketDecoder()

    def __init__(self):
        self._buffer = bytearray()

    def read(self, data: bytes) -> list[DataPacket]:
        """
        Read the data and return a list of packets.
        """
        self._buffer.extend(data)

        packets = []

        # h. Redundant bytes in message
        # To prevent the message from containing the same data as header, a 00 is inserted after every three
        # consecutive 0x55s in the message. The inserted 00 is redundant bytes
        # ^^ In testing this actually doesn't happen. I named a zone UUUUUUU and it didn't insert any 00s

        while len(self._buffer) >= MINIMUM_PACKET_LENGTH:
            # Seek until we find the header
            while len(self._buffer) > 0 and not self._buffer.startswith(HEADER):
                self._buffer.pop(0)

            # If we don't have enough data for a packet, then we can't do anything
            if len(self._buffer) < MINIMUM_PACKET_LENGTH:
                break

            # Check if we have a full packet (Data length is now bytes 9-10)
            data_length = int.from_bytes(self._buffer[8:10], byteorder="big")
            packet_length = data_length + MINIMUM_PACKET_LENGTH
            if len(self._buffer) < packet_length:
                break

            # Decode the packet
            try:
                packet = self._packet_decoder.decode(self._buffer[:packet_length])
                packets.append(packet)
            except Exception as e:
                _LOGGER.debug(f"Error decoding packet: {e}")

            # remove the packet from the buffer
            self._buffer = self._buffer[packet_length:]

        return packets