File: airtouch5_client.py

package info (click to toggle)
python-airtouch5py 0.2.10-1.1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 1,052 kB
  • sloc: python: 1,748; makefile: 5
file content (146 lines) | stat: -rw-r--r-- 4,986 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
import asyncio
import binascii
import logging
from enum import Enum

from airtouch5py.data_packet_factory import DataPacketFactory

from airtouch5py.packet_encoder import PacketEncoder
from airtouch5py.packet_reader import PacketReader

from airtouch5py.packets.datapacket import DataPacket

_LOGGER = logging.getLogger(__name__)


class Airtouch5ConnectionStateChange(Enum):
    CONNECTED = 1
    DISCONNECTED = 2


class Airtouch5Client:
    """
    The "Raw" (Hard mode) Airtouch 5 Client.
    You should probably use Airtouch5SimpleClient instead.

    Usage:
    Construct, call connect.
    Wait on packets_received to receive a CONNECTED message.
    Wait on updates from packets_received, use send_packet to send packets.

    Call disconnect to disconnect.

    When a DISCONNECTED message is received, call connect to reconnect.
    """

    _encoder = PacketEncoder()
    _packet_reader = PacketReader()

    ip: str
    packets_received: asyncio.Queue[Airtouch5ConnectionStateChange | DataPacket]

    _reader: asyncio.StreamReader | None
    _writer: asyncio.StreamWriter | None

    _disconnect_lock: asyncio.Lock

    def __init__(self, ip: str):
        self.ip = ip
        self.packets_received = asyncio.Queue()
        self.data_packet_factory = DataPacketFactory()

        self._connected = False
        self._should_be_connected = False
        self._writer, self._reader = None, None
        self._disconnect_lock = asyncio.Lock()

    async def connect(self):
        """
        Connect to the airtouch 5.
        Times out after 5 seconds.
        Throws if we fail to connect.
        Otherwise puts a connected message in the queue and starts up the reader task.
        """

        # Clear the queue before we connect (Might be dangling stuff in it from a previous connection)
        while not self.packets_received.empty():
            self.packets_received.get_nowait()

        _LOGGER.info(f"Connecting to {self.ip}:9005")
        self._reader, self._writer = await asyncio.wait_for(
            asyncio.open_connection(self.ip, 9005), 5
        )
        _LOGGER.info(f"Connected to {self.ip}:9005")

        self.packets_received.put_nowait(Airtouch5ConnectionStateChange.CONNECTED)
        self._reader_task = asyncio.create_task(self._read_packets())

    async def disconnect(self):
        """
        Disconnect the socket if it is connected.
        If stop_reader_task is true, also stop the reader task (Normally you should do this, unless called from the reader task)
        Pushes a DISCONNECTED message in to the queue if we actually disconnected (were connected before)
        """
        async with self._disconnect_lock:
            did_disconnect = False
            if self._writer is not None:
                did_disconnect = True
                self._writer.close()
                try:
                    await self._writer.wait_closed()
                except Exception:
                    # Ignore exceptions when closing
                    _LOGGER.debug("Exception when closing writer", exc_info=True)

            if self._reader_task is not None:
                self._reader_task.cancel()
                self._reader_task = None

            self._writer, self._reader = None, None
            if did_disconnect:
                self.packets_received.put_nowait(
                    Airtouch5ConnectionStateChange.DISCONNECTED
                )

    async def _read_packets(self):
        """
        Continuously read packets from the socket and put them in the queue.
        If we detect the socket has died, disconnect it
        """
        reader = self._reader
        if reader is None:
            raise Exception("Reader is None")

        try:
            while reader.at_eof() == False:
                read = await reader.read(1024)
                _LOGGER.debug(f"Received data: {binascii.hexlify(read)}")
                packets = self._packet_reader.read(read)
                for packet in packets:
                    self.packets_received.put_nowait(packet)
        except Exception as e:
            _LOGGER.error(f"Exception in reader task: {e}")

        # If we've finished reading for some reason, we should disconnect
        # Clear our task first so we don't get cancelled
        self._reader_task = None
        await self.disconnect()

    async def send_packet(self, packet: DataPacket):
        """
        Send the given packet to the airtouch 5.
        Throws if we aren't connected or if there is a connection issue
        """
        writer = self._writer
        if writer is None:
            raise Exception("Writer is None")

        try:
            data = self._encoder.encode(packet)
            _LOGGER.debug(f"Sending data: {binascii.hexlify(data)}")
            writer.write(data)
            await writer.drain()
        except Exception as e:
            _LOGGER.error(f"Exception when sending packet: {e}")
            await self.disconnect()
            raise e