File: coolmasternet.py

package info (click to toggle)
pycoolmasternet-async 0.2.2-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 92 kB
  • sloc: python: 253; makefile: 2
file content (252 lines) | stat: -rw-r--r-- 8,202 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
import asyncio
import re

_MODES = ["auto", "cool", "dry", "fan", "heat"]

_SWING_CHAR_TO_NAME = {
    "a": "auto",
    "h": "horizontal",
    "3": "30",
    "4": "45",
    "6": "60",
    "v": "vertical",
    "x": "stop",
}

_SWING_NAME_TO_CHAR = {value: key for key, value in _SWING_CHAR_TO_NAME.items()}

SWING_MODES = list(_SWING_CHAR_TO_NAME.values())

class CoolMasterNet():
    """A connection to a coolmasternet bridge."""
    def __init__(self, host, port=10102, read_timeout=1, swing_support=False):
        """Initialize this CoolMasterNet instance to connect to a particular
        host at a particular port."""
        self._host = host
        self._port = port
        self._read_timeout = read_timeout
        self._swing_support = swing_support
        self._status_cmd = None
        self._concurrent_reads = asyncio.Semaphore(3)

    async def _make_request(self, request):
        """Send a request to the CoolMasterNet and returns the response."""
        async with self._concurrent_reads:

            reader, writer = await asyncio.open_connection(self._host, self._port)

            try:
                writer.write((request + "\n").encode("ascii"))
                response = await asyncio.wait_for(reader.readuntil(b"\n>"), self._read_timeout)

                data = response.decode("ascii")

                if data.startswith(">"):
                    data = data[1:]

                if data.endswith("\n>"):
                    data = data[:-1]

                if data.endswith("OK\r\n"):
                    data = data[:-4]

                if "Unknown command" in data:
                    cmd = request.split(" ", 1)[0]
                    raise ValueError(f"Command '{cmd}' is not supported")

                return data
            finally:
                writer.close()
                await writer.wait_closed()

    async def info(self):
        """Get the general info the this CoolMasterNet."""
        raw = await self._make_request("set")
        lines = raw.strip().split("\r\n")
        key_values = [re.split(r"\s*:\s*", line, 1) for line in lines]
        return dict(key_values)

    async def _status(self, status_cmd=None, unit_id=None):
        """Fetch the status of all units or a single one, falls back to legacy
        format if necessary"""

        if status_cmd is None:
            cmds = ['ls2', 'stat2']
        else:
            cmds = [status_cmd]

        for cmd in cmds:
            try:
                final_cmd = f"{cmd} {unit_id}" if unit_id is not None else cmd
                status_lines = (await self._make_request(final_cmd)).strip().split("\r\n")
                break
            except ValueError:
                continue
        else:
            raise Exception("failed to execute status commands")

        return cmd, status_lines

    async def status(self):
        """Return a list of CoolMasterNetUnit objects with current status."""

        self._status_cmd, status_lines = await self._status(self._status_cmd)

        return {
            key: unit
            for unit, key in await asyncio.gather(
                *(CoolMasterNetUnit.create(
                    self, line.split(" ", 1)[0], line, self._status_cmd)
                    for line in status_lines
                )
            )
        }


class CoolMasterNetUnit():
    """An immutable snapshot of a unit."""
    def __init__(self, bridge, unit_id, raw, swing_raw, status_cmd):
        """Initialize a unit snapshot."""
        self._raw = raw
        self._status_cmd = status_cmd
        self._swing_raw = swing_raw
        self._unit_id = unit_id
        self._bridge = bridge
        self._parse()

    @classmethod
    async def create(cls, bridge, unit_id, raw=None, status_cmd=None):
        if raw is None or status_cmd is None:
            status_cmd, status_lines = await bridge._status(status_cmd, unit_id)
            raw = status_lines[0]
        swing_raw = ((await bridge._make_request(f"query {unit_id} s")).strip() 
            if bridge._swing_support else "")
        return CoolMasterNetUnit(bridge, unit_id, raw, swing_raw, status_cmd), unit_id

    def _parse(self):
        fields = re.split(r"\s+", self._raw.strip())

        if len(fields) not in (8, 9):
            raise ConnectionError("Unexpected status line format: " + str(fields))

        self._is_on = fields[1] == "ON"
        self._temperature_unit = "imperial" if fields[2][-1] == "F" else "celsius"
        self._thermostat = float(fields[2][:-1])
        self._temperature = float(fields[3][:-1].replace(",", "."))
        self._fan_speed = fields[4].lower()
        self._mode = fields[5].lower()
        self._error_code = fields[6] if fields[6] != "OK" else None
        self._clean_filter = fields[7] in ("#", "1")
        self._swing = _SWING_CHAR_TO_NAME.get(self._swing_raw)

    async def _make_unit_request(self, request):
        return await self._bridge._make_request(request.replace("UID", self._unit_id))

    async def refresh(self):
        """Refresh the data from CoolMasterNet and return it as a new instance."""
        return (await CoolMasterNetUnit.create(self._bridge, self._unit_id))[0]

    @property
    def unit_id(self):
        """The unit id."""
        return self._unit_id

    @property
    def is_on(self):
        """Is the unit on."""
        return self._is_on
    
    @property
    def thermostat(self):
        """The target temperature."""
        return self._thermostat

    @property
    def temperature(self):
        """The current temperature."""
        return self._temperature

    @property
    def fan_speed(self):
        """The fan spped."""
        return self._fan_speed

    @property
    def mode(self):
        """The current mode (e.g. heat, cool)."""
        return self._mode

    @property
    def error_code(self):
        """Error code on error, otherwise None."""
        return self._error_code

    @property
    def clean_filter(self):
        """True when the air filter needs to be cleaned."""
        return self._clean_filter

    @property
    def swing(self):
        """The current swing mode (e.g. horizontal)."""
        return self._swing

    @property
    def temperature_unit(self):
        return self._temperature_unit
    
    async def set_fan_speed(self, value):
        """Set the fan speed."""
        await self._make_unit_request(f"fspeed UID {value}")
        return await self.refresh()

    async def set_mode(self, value):
        """Set the mode."""
        if not value in _MODES:
            raise ValueError(
                f"Unrecognized mode {value}. Valid values: {' '.join(_MODES)}"
            )

        await self._make_unit_request(value + " UID")
        return await self.refresh()

    async def set_thermostat(self, value):
        """Set the target temperature."""
        rounded = round(value, 1)
        await self._make_unit_request(f"temp UID {rounded}")
        return await self.refresh()

    async def set_swing(self, value):
        """Set the swing mode."""
        if not value in SWING_MODES:
            raise ValueError(
                f"Unrecognized swing mode {value}. Valid values: {', '.join(SWING_MODES)}"
            )

        return_value = await self._make_unit_request(f"swing UID {_SWING_NAME_TO_CHAR[value]}")
        if return_value.startswith("Unsupported Feature"):
            raise ValueError(
                f"Unit {self._unit_id} doesn't support swing mode {value}."
            )

        return await self.refresh()

    async def turn_on(self):
        """Turn a unit on."""
        await self._make_unit_request("on UID")
        return await self.refresh()

    async def turn_off(self):
        """Turn a unit off."""
        await self._make_unit_request("off UID")
        return await self.refresh()

    async def reset_filter(self):
        """Report that the air filter was cleaned and reset the timer."""
        await self._make_unit_request(f"filt UID")
        return await self.refresh()

    async def feed(self, value):
        """Provides ambient temperature hint to the unit."""
        rounded = round(value, 1)
        await self._make_unit_request(f"feed UID {rounded}")