File: bond.py

package info (click to toggle)
python-bond-async 0.2.1-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 204 kB
  • sloc: python: 1,537; makefile: 4
file content (224 lines) | stat: -rw-r--r-- 9,137 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
"""Bond Local API wrapper."""
import random
import uuid
from typing import Any, Callable, List, Optional
from xmlrpc.client import Boolean

import orjson
from aiohttp import ClientSession, ClientTimeout
from aiohttp.client_exceptions import ClientOSError, ServerDisconnectedError

from bond_async.bond_type import BondType
from .action import Action
from .requestor_uuid import RequestorUUID


class Bond:
    """Bond API."""

    def __init__(
            self,
            host: str,
            token: str,
            requestor_uuid: RequestorUUID = RequestorUUID.ANONYMOUS,
            *,
            session: Optional[ClientSession] = None,
            timeout: Optional[ClientTimeout] = None,
    ):
        """Initialize Bond with provided host and token."""
        if not requestor_uuid.is_allowed():
            raise ValueError(f"Requestor UUID {requestor_uuid} is not allowed. Please use a requestor UUID with a "
                             f"number greater than 0xA0.")
        self._requestor_uuid = requestor_uuid
        self._session_uuid = uuid.uuid4().hex[:4]
        self._host = host
        self._api_kwargs = {"headers": {"BOND-Token": token}}
        if timeout:
            self._api_kwargs["timeout"] = timeout
        self._session = session

    async def version(self) -> dict:
        """Return the version of Bond reported by API."""
        return await self.__get("/v2/sys/version")

    async def bond_type(self) -> BondType:
        """Return the BondType based on the serial number reported by API."""
        version = await self.version()
        return BondType.from_serial(version["bondid"])

    async def token(self) -> dict:
        """Return the token after power rest or proof of ownership event."""
        return await self.__get("/v2/token")

    async def bridge(self) -> dict:
        """Return the name and location of the bridge."""
        return await self.__get("/v2/bridge")

    async def set_bluelight_brightness(self, brightness: int) -> None:
        """Set the brightness of the blue light on the bridge. Accepts values from 0 to 255."""
        if brightness < 0 or brightness > 255:
            raise ValueError("Brightness must be between 0 and 255")
        await self.__patch("/v2/bridge", {"bluelight": brightness})

    async def devices(self) -> List[str]:
        """Return the list of available device IDs reported by API."""
        json = await self.__get("/v2/devices")
        return [key for key in json if not key.startswith("_") and isinstance(json[key], dict)]

    async def device(self, device_id: str) -> dict:
        """Return main device metadata reported by API."""
        return await self.__get(f"/v2/devices/{device_id}")

    async def device_properties(self, device_id: str) -> dict:
        """Return device properties reported by API."""
        return await self.__get(f"/v2/devices/{device_id}/properties")

    async def device_state(self, device_id: str) -> dict:
        """Return current device state reported by API."""
        return await self.__get(f"/v2/devices/{device_id}/state")

    async def device_skeds(self, device_id: str) -> dict:
        """Return current device schedules reported by API."""
        return await self.__get(f"/v2/devices/{device_id}/skeds")

    async def action(self, device_id: str, action: Action) -> None:
        """Execute given action for a given device."""
        if action.name == Action.SET_STATE_BELIEF:
            path = f"/v2/devices/{device_id}/state"

            async def patch(session: ClientSession) -> None:
                self._api_kwargs["headers"]["BOND-UUID"] = self.__create_message_id()
                async with session.patch(
                        f"http://{self._host}{path}",
                        **self._api_kwargs,
                        json=action.argument,
                ) as response:
                    response.raise_for_status()

            await self.__call(patch)
        else:
            path = f"/v2/devices/{device_id}/actions/{action.name}"

            async def put(session: ClientSession) -> None:
                self._api_kwargs["headers"]["BOND-UUID"] = self.__create_message_id()
                async with session.put(
                        f"http://{self._host}{path}",
                        **self._api_kwargs,
                        json=action.argument,
                ) as response:
                    response.raise_for_status()

            await self.__call(put)

    async def supports_groups(self) -> Boolean:
        """Return 'True' if the Bond supports the Groups feature."""
        json = await self.__get("/v2/")
        return "groups" in json

    async def groups(self) -> List[str]:
        """Return the list of available group IDs reported by API."""
        json = await self.__get("/v2/groups")
        return [key for key in json if not key.startswith("_") and type(json[key]) is dict]

    async def group(self, group_id: str) -> dict:
        """Return main group metadata reported by API."""
        return await self.__get(f"/v2/groups/{group_id}")

    async def group_properties(self, group_id: str) -> dict:
        """Return group properties reported by API."""
        return await self.__get(f"/v2/groups/{group_id}/properties")

    async def group_state(self, group_id: str) -> dict:
        """Return current group state reported by API."""
        return await self.__get(f"/v2/groups/{group_id}/state")

    async def group_skeds(self, group_id: str) -> dict:
        """Return current group schedules reported by API."""
        return await self.__get(f"/v2/groups/{group_id}/skeds")

    async def group_action(self, group_id: str, action: Action) -> None:
        """Execute given action for a given group."""
        if action.name == Action.SET_STATE_BELIEF:
            path = f"/v2/groups/{group_id}/state"

            async def patch(session: ClientSession) -> None:
                self._api_kwargs["headers"]["BOND-UUID"] = self.__create_message_id()
                async with session.patch(
                        f"http://{self._host}{path}",
                        **self._api_kwargs,
                        json=action.argument,
                ) as response:
                    response.raise_for_status()

            await self.__call(patch)
        else:
            path = f"/v2/groups/{group_id}/actions/{action.name}"

            async def put(session: ClientSession) -> None:
                self._api_kwargs["headers"]["BOND-UUID"] = self.__create_message_id()
                async with session.put(
                        f"http://{self._host}{path}",
                        **self._api_kwargs,
                        json=action.argument,
                ) as response:
                    response.raise_for_status()

            await self.__call(put)

    async def __get(self, path) -> dict:
        async def get(session: ClientSession) -> dict:
            self._api_kwargs["headers"]["BOND-UUID"] = self.__create_message_id()
            async with session.get(
                    f"http://{self._host}{path}", **self._api_kwargs
            ) as response:
                response.raise_for_status()
                return await response.json(loads=orjson.loads)

        return await self.__call(get)

    async def __patch(self, path, json) -> None:
        async def patch(session: ClientSession) -> None:
            self._api_kwargs["headers"]["BOND-UUID"] = self.__create_message_id()
            async with session.patch(
                f"http://{self._host}{path}",
                **self._api_kwargs,
                json=json,
            ) as response:
                response.raise_for_status()

        await self.__call(patch)

    async def __put(self, path, json) -> None:
        async def put(session: ClientSession) -> None:
            self._api_kwargs["headers"]["BOND-UUID"] = self.__create_message_id()
            async with session.put(
                f"http://{self._host}{path}",
                **self._api_kwargs,
                json=json,
            ) as response:
                response.raise_for_status()

        await self.__call(put)

    async def __call(self, handler: Callable[[ClientSession], Any]):
        if not self._session:
            async with ClientSession() as request_session:
                return await handler(request_session)
        else:
            try:
                return await handler(self._session)
            except (ClientOSError, ServerDisconnectedError):
                # bond has a short connection close time
                # so we need to retry if we idled for a bit
                return await handler(self._session)

    def __create_message_id(self) -> str:
        """Create a unique hex message ID.
        The first 2 characters is the requestor_uuid (in hex),
        the next 4 characters are always the same for a session,
        and the last 10 characters are random."""
        return (
            f"{self._requestor_uuid.hex_value()}"
            f"{self._session_uuid}"
            f"{random.randint(0, 0xFFFFFFFF):010x}"
        ).lower()