File: remote.py

package info (click to toggle)
python-broadlink 0.19.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 232 kB
  • sloc: python: 2,140; makefile: 6
file content (173 lines) | stat: -rw-r--r-- 4,789 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
"""Support for universal remotes."""
import struct
from typing import List, Optional, Tuple

from . import exceptions as e
from .device import Device


def pulses_to_data(pulses: List[int], tick: float = 32.84) -> bytes:
    """Convert a microsecond duration sequence into a Broadlink IR packet."""
    result = bytearray(4)
    result[0x00] = 0x26

    for pulse in pulses:
        div, mod = divmod(int(pulse // tick), 256)
        if div:
            result.append(0)
            result.append(div)
        result.append(mod)

    data_len = len(result) - 4
    result[0x02] = data_len & 0xFF
    result[0x03] = data_len >> 8

    return result


def data_to_pulses(data: bytes, tick: float = 32.84) -> List[int]:
    """Parse a Broadlink packet into a microsecond duration sequence."""
    result = []
    index = 4
    end = min(256 * data[0x03] + data[0x02] + 4, len(data))

    while index < end:
        chunk = data[index]
        index += 1

        if chunk == 0:
            try:
                chunk = 256 * data[index] + data[index + 1]
            except IndexError as err:
                raise ValueError("Malformed data.") from err
            index += 2

        result.append(int(chunk * tick))

    return result


class rmmini(Device):
    """Controls a Broadlink RM mini 3."""

    TYPE = "RMMINI"

    def _send(self, command: int, data: bytes = b"") -> bytes:
        """Send a packet to the device."""
        packet = struct.pack("<I", command) + data
        resp = self.send_packet(0x6A, packet)
        e.check_error(resp[0x22:0x24])
        payload = self.decrypt(resp[0x38:])
        return payload[0x4:]

    def update(self) -> None:
        """Update device name and lock status."""
        resp = self._send(0x1)
        self.name = resp[0x48:].split(b"\x00")[0].decode()
        self.is_locked = bool(resp[0x87])

    def send_data(self, data: bytes) -> None:
        """Send a code to the device."""
        self._send(0x2, data)

    def enter_learning(self) -> None:
        """Enter infrared learning mode."""
        self._send(0x3)

    def check_data(self) -> bytes:
        """Return the last captured code."""
        return self._send(0x4)


class rmpro(rmmini):
    """Controls a Broadlink RM pro."""

    TYPE = "RMPRO"

    def sweep_frequency(self) -> None:
        """Sweep frequency."""
        self._send(0x19)

    def check_frequency(self) -> Tuple[bool, float]:
        """Return True if the frequency was identified successfully."""
        resp = self._send(0x1A)
        is_found = bool(resp[0])
        frequency = struct.unpack("<I", resp[1:5])[0] / 1000.0
        return is_found, frequency

    def find_rf_packet(self, frequency: Optional[float] = None) -> None:
        """Enter radiofrequency learning mode."""
        payload = bytearray()
        if frequency:
            payload += struct.pack("<I", int(frequency * 1000))
        self._send(0x1B, payload)

    def cancel_sweep_frequency(self) -> None:
        """Cancel sweep frequency."""
        self._send(0x1E)

    def check_sensors(self) -> dict:
        """Return the state of the sensors."""
        resp = self._send(0x1)
        temp = struct.unpack("<bb", resp[:0x2])
        return {"temperature": temp[0x0] + temp[0x1] / 10.0}

    def check_temperature(self) -> float:
        """Return the temperature."""
        return self.check_sensors()["temperature"]


class rmminib(rmmini):
    """Controls a Broadlink RM mini 3 (new firmware)."""

    TYPE = "RMMINIB"

    def _send(self, command: int, data: bytes = b"") -> bytes:
        """Send a packet to the device."""
        packet = struct.pack("<HI", len(data) + 4, command) + data
        resp = self.send_packet(0x6A, packet)
        e.check_error(resp[0x22:0x24])
        payload = self.decrypt(resp[0x38:])
        p_len = struct.unpack("<H", payload[:0x2])[0]
        return payload[0x6:p_len+2]


class rm4mini(rmminib):
    """Controls a Broadlink RM4 mini."""

    TYPE = "RM4MINI"

    def check_sensors(self) -> dict:
        """Return the state of the sensors."""
        resp = self._send(0x24)
        temp = struct.unpack("<bb", resp[:0x2])
        return {
            "temperature": temp[0x0] + temp[0x1] / 100.0,
            "humidity": resp[0x2] + resp[0x3] / 100.0,
        }

    def check_temperature(self) -> float:
        """Return the temperature."""
        return self.check_sensors()["temperature"]

    def check_humidity(self) -> float:
        """Return the humidity."""
        return self.check_sensors()["humidity"]


class rm4pro(rm4mini, rmpro):
    """Controls a Broadlink RM4 pro."""

    TYPE = "RM4PRO"


class rm(rmpro):
    """For backwards compatibility."""

    TYPE = "RM2"


class rm4(rm4pro):
    """For backwards compatibility."""

    TYPE = "RM4"