1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236
|
import base64
import re
from typing import Callable, Set, Tuple
import click
from construct import (
Adapter,
Array,
BitsInteger,
BitStruct,
Computed,
Const,
Int16ub,
Int16ul,
Int32ul,
Rebuild,
Struct,
len_,
this,
)
from .click_common import command, format_output
from .device import Device
from .exceptions import DeviceException
class ChuangmiIrException(DeviceException):
pass
class ChuangmiIr(Device):
"""Main class representing Chuangmi IR Remote Controller."""
_supported_models = [
"chuangmi.ir.v2",
"chuangmi.remote.v2",
"chuangmi-remote-h102a03", # maybe?
]
PRONTO_RE = re.compile(r"^([\da-f]{4}\s?){3,}([\da-f]{4})$", re.IGNORECASE)
@command(
click.argument("key", type=int),
default_output=format_output("Learning command into storage key {key}"),
)
def learn(self, key: int = 1):
"""Learn an infrared command.
:param int key: Storage slot, must be between 1 and 1000000
"""
if key < 1 or key > 1000000:
raise ChuangmiIrException("Invalid storage slot.")
return self.send("miIO.ir_learn", {"key": str(key)})
@command(
click.argument("key", type=int),
default_output=format_output("Reading infrared command from storage key {key}"),
)
def read(self, key: int = 1):
"""Read a learned command.
Positive response (chuangmi.ir.v2):
{'key': '1', 'code': 'Z6WPAasBAAA3BQAA4AwJAEA....AAABAAEBAQAAAQAA=='}
Negative response (chuangmi.ir.v2):
{'error': {'code': -5002, 'message': 'no code for this key'}, 'id': 5}
Negative response (chuangmi.ir.v2):
{'error': {'code': -5003, 'message': 'learn timeout'}, 'id': 17}
:param int key: Slot to read from
"""
if key < 1 or key > 1000000:
raise ChuangmiIrException("Invalid storage slot.")
return self.send("miIO.ir_read", {"key": str(key)})
def play_raw(self, command: str, frequency: int = 38400, length: int = -1):
"""Play a captured command.
:param str command: Command to execute
:param int frequency: Execution frequency
:param int length: Length of the command. -1 means not sending the length parameter.
"""
if length < 0:
return self.send("miIO.ir_play", {"freq": frequency, "code": command})
else:
return self.send(
"miIO.ir_play", {"freq": frequency, "code": command, "length": length}
)
def play_pronto(self, pronto: str, repeats: int = 1, length: int = -1):
"""Play a Pronto Hex encoded IR command. Supports only raw Pronto format,
starting with 0000.
:param str pronto: Pronto Hex string.
:param int repeats: Number of extra signal repeats.
:param int length: Length of the command. -1 means not sending the length parameter.
"""
command, frequency = self.pronto_to_raw(pronto, repeats)
return self.play_raw(command, frequency, length)
@classmethod
def pronto_to_raw(cls, pronto: str, repeats: int = 1) -> Tuple[str, int]:
"""Play a Pronto Hex encoded IR command. Supports only raw Pronto format,
starting with 0000.
:param str pronto: Pronto Hex string.
:param int repeats: Number of extra signal repeats.
"""
if repeats < 0:
raise ChuangmiIrException("Invalid repeats value")
try:
pronto_data = Pronto.parse(bytearray.fromhex(pronto))
except Exception as ex:
raise ChuangmiIrException("Invalid Pronto command") from ex
if len(pronto_data.intro) == 0:
repeats += 1
times: Set[int] = set()
for pair in pronto_data.intro + pronto_data.repeat * (1 if repeats else 0):
times.add(pair.pulse)
times.add(pair.gap)
times_sorted = sorted(times)
times_map = {t: idx for idx, t in enumerate(times_sorted)}
edge_pairs = []
for pair in pronto_data.intro + pronto_data.repeat * repeats:
edge_pairs.append(
{"pulse": times_map[pair.pulse], "gap": times_map[pair.gap]}
)
signal_code = base64.b64encode(
ChuangmiIrSignal.build(
{
"times_index": times_sorted + [0] * (16 - len(times)),
"edge_pairs": edge_pairs,
}
)
).decode()
return signal_code, int(round(pronto_data.frequency))
@command(
click.argument("command", type=str),
default_output=format_output("Playing the supplied command"),
)
def play(self, command: str):
"""Plays a command in one of the supported formats."""
if ":" not in command:
if self.PRONTO_RE.match(command):
command_type = "pronto"
else:
command_type = "raw"
command_args = []
else:
command_type, command, *command_args = command.split(":")
arg_types = [int, int]
if len(command_args) > len(arg_types):
raise ChuangmiIrException("Invalid command arguments count")
if command_type not in ["raw", "pronto"]:
raise ChuangmiIrException("Invalid command type")
play_method: Callable
if command_type == "raw":
play_method = self.play_raw
elif command_type == "pronto":
play_method = self.play_pronto
try:
converted_command_args = [t(v) for v, t in zip(command_args, arg_types)]
except Exception as ex:
raise ChuangmiIrException("Invalid command arguments") from ex
return play_method(command, *converted_command_args)
@command(
click.argument("indicator_led", type=bool),
default_output=format_output(
lambda indicator_led: "Turning on indicator LED"
if indicator_led
else "Turning off indicator LED"
),
)
def set_indicator_led(self, indicator_led: bool):
"""Set the indicator led on/off."""
if indicator_led:
return self.send("set_indicatorLamp", ["on"])
else:
return self.send("set_indicatorLamp", ["off"])
@command(default_output=format_output("Indicator LED status: {result}"))
def get_indicator_led(self):
"""Get the indicator led status."""
return self.send("get_indicatorLamp")
class ProntoPulseAdapter(Adapter):
def _decode(self, obj, context, path):
return int(obj * context._.modulation_period)
def _encode(self, obj, context, path):
raise RuntimeError("Not implemented")
ChuangmiIrSignal = Struct(
Const(0xA567, Int16ul),
"edge_count" / Rebuild(Int16ul, len_(this.edge_pairs) * 2 - 1),
"times_index" / Array(16, Int32ul),
"edge_pairs"
/ Array(
(this.edge_count + 1) // 2,
BitStruct("gap" / BitsInteger(4), "pulse" / BitsInteger(4)),
),
)
ProntoBurstPair = Struct(
"pulse" / ProntoPulseAdapter(Int16ub), "gap" / ProntoPulseAdapter(Int16ub)
)
Pronto = Struct(
Const(0, Int16ub),
"_ticks" / Int16ub,
"modulation_period" / Computed(this._ticks * 0.241246),
"frequency" / Computed(1000000 / this.modulation_period),
"intro_len" / Int16ub,
"repeat_len" / Int16ub,
"intro" / Array(this.intro_len, ProntoBurstPair),
"repeat" / Array(this.repeat_len, ProntoBurstPair),
)
|