File: __init__.py

package info (click to toggle)
python-advantage-air 0.4.4-4
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 80 kB
  • sloc: python: 171; makefile: 4
file content (165 lines) | stat: -rw-r--r-- 5,939 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
import json
import asyncio
import aiohttp
import collections.abc

ON = "on"
OFF = "off"

def update(d, u):
    for k, v in u.items():
        if isinstance(v, collections.abc.Mapping):
            d[k] = update(d.get(k, {}), v)
        else:
            d[k] = v
    return d


class ApiError(Exception):
    """AdvantageAir Error"""


class advantage_air:
    """AdvantageAir Connection"""

    def __init__(self, ip: str, port: int=2025, session: aiohttp.ClientSession=None, retry: int=5):
        if session is None:
            session = aiohttp.ClientSession()

        self.ip = ip
        self.port = port
        self.session = session
        self.retry = retry

        self.aircon = self._aircon(ip, port, session, retry)
        self.lights = self._lights(ip, port, session, retry)
        self.things = self._things(ip, port, session, retry)

    async def async_get(self, retry:int =None):
        retry = retry or self.retry
        data = {}
        count = 0
        error = None
        while count < retry:
            count += 1
            try:
                async with self.session.get(
                    f"http://{self.ip}:{self.port}/getSystemData",
                    timeout=aiohttp.ClientTimeout(total=4),
                ) as resp:
                    assert resp.status == 200
                    data = await resp.json(content_type=None)
                    if "aircons" in data:
                        return data
            except (
                aiohttp.ClientError,
                aiohttp.ClientConnectorError,
                aiohttp.client_exceptions.ServerDisconnectedError,
                ConnectionResetError,
            ) as err:
                error = err
            except asyncio.TimeoutError:
                error = "Connection timed out."
            except AssertionError:
                error = "Response status not 200."
                break
            except SyntaxError as err:
                error = "Invalid response"
                break

            await asyncio.sleep(1)
        raise ApiError(
            f"No valid response after {count} failed attempt{['','s'][count>1]}. Last error was: {error}"
        )

    class _endpoint:
        def __init__(self, ip, port, session, retry) -> None:
            self.ip = ip
            self.port = port
            self.session = session
            self.retry = retry
            self.changes = {}
            self.lock = asyncio.Lock()

        async def async_update(self, change: dict[str, any]) -> bool:
            """Merge changes with queue and send when possible, returning True when done"""

            self.changes = update(self.changes, change)
            if self.lock.locked():
                return False
            async with self.lock:
                while self.changes:
                    # Allow any addition changes from the event loop to be collected
                    await asyncio.sleep(0)
                    # Collect all changes
                    payload = self.changes
                    self.changes = {}
                    try:
                        async with self.session.get(
                            f"http://{self.ip}:{self.port}/{self.endpoint}",
                            params={"json": json.dumps(payload)},
                            timeout=aiohttp.ClientTimeout(total=4),
                        ) as resp:
                            data = await resp.json(content_type=None)
                        if data["ack"] == False:
                            raise ApiError(data["reason"])
                    except (
                        aiohttp.client_exceptions.ServerDisconnectedError,
                        ConnectionResetError,
                    ) as err:
                        # Recoverable error, reinsert the changes and try again in a second
                        self.changes = update(self.changes, payload)
                        await asyncio.sleep(1)
                    except aiohttp.ClientError as err:
                        raise ApiError(err)
                    except asyncio.TimeoutError:
                        raise ApiError("Connection timed out.")
                    except AssertionError:
                        raise ApiError("Response status not 200.")
                    except SyntaxError as err:
                        raise ApiError("Invalid response")
            return True

    class _aircon(_endpoint):
        """Aircon endpoint"""

        endpoint = "setAircon"

        async def async_update_ac(self, ac: str, body: dict) -> bool:
            """Update an aircon"""
            return await self.async_update({ac: {"info": body}})

        async def async_update_zone(self, ac: str, zone: str, body: dict) -> bool:
            """Update a zone"""
            return await self.async_update({ac: {"zones": {zone: body}}})

    class _lights(_endpoint):
        """Lights endpoint"""

        endpoint = "setLights"

        async def async_update_state(self, id: str, state: str|bool) -> bool:
            """Update a lights state"""
            if state == True:
                state = ON
            elif state == False:
                state = OFF
            return await self.async_update({id: {"id": id, "state": state}})

        async def async_update_value(self, id: str, value: int) -> bool:
            """Update a lights state and value"""
            return await self.async_update({id: {"id": id, "state": (ON if value > 0 else OFF), "value": value}})

    class _things(_endpoint):
        """Things endpoint"""

        endpoint = "setThings"

        async def async_update_value(self, id: str, value: int|bool) -> bool:
            """Update a things state and value"""
            if value == True:
                value = 100
            elif value == False:
                value = 0
                
            return await self.async_update({id: {"id": id, "value": value}})