1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224
|
"""Bond Local API wrapper."""
import random
import uuid
from typing import Any, Callable, List, Optional
from xmlrpc.client import Boolean
import orjson
from aiohttp import ClientSession, ClientTimeout
from aiohttp.client_exceptions import ClientOSError, ServerDisconnectedError
from bond_async.bond_type import BondType
from .action import Action
from .requestor_uuid import RequestorUUID
class Bond:
"""Bond API."""
def __init__(
self,
host: str,
token: str,
requestor_uuid: RequestorUUID = RequestorUUID.ANONYMOUS,
*,
session: Optional[ClientSession] = None,
timeout: Optional[ClientTimeout] = None,
):
"""Initialize Bond with provided host and token."""
if not requestor_uuid.is_allowed():
raise ValueError(f"Requestor UUID {requestor_uuid} is not allowed. Please use a requestor UUID with a "
f"number greater than 0xA0.")
self._requestor_uuid = requestor_uuid
self._session_uuid = uuid.uuid4().hex[:4]
self._host = host
self._api_kwargs = {"headers": {"BOND-Token": token}}
if timeout:
self._api_kwargs["timeout"] = timeout
self._session = session
async def version(self) -> dict:
"""Return the version of Bond reported by API."""
return await self.__get("/v2/sys/version")
async def bond_type(self) -> BondType:
"""Return the BondType based on the serial number reported by API."""
version = await self.version()
return BondType.from_serial(version["bondid"])
async def token(self) -> dict:
"""Return the token after power rest or proof of ownership event."""
return await self.__get("/v2/token")
async def bridge(self) -> dict:
"""Return the name and location of the bridge."""
return await self.__get("/v2/bridge")
async def set_bluelight_brightness(self, brightness: int) -> None:
"""Set the brightness of the blue light on the bridge. Accepts values from 0 to 255."""
if brightness < 0 or brightness > 255:
raise ValueError("Brightness must be between 0 and 255")
await self.__patch("/v2/bridge", {"bluelight": brightness})
async def devices(self) -> List[str]:
"""Return the list of available device IDs reported by API."""
json = await self.__get("/v2/devices")
return [key for key in json if not key.startswith("_") and isinstance(json[key], dict)]
async def device(self, device_id: str) -> dict:
"""Return main device metadata reported by API."""
return await self.__get(f"/v2/devices/{device_id}")
async def device_properties(self, device_id: str) -> dict:
"""Return device properties reported by API."""
return await self.__get(f"/v2/devices/{device_id}/properties")
async def device_state(self, device_id: str) -> dict:
"""Return current device state reported by API."""
return await self.__get(f"/v2/devices/{device_id}/state")
async def device_skeds(self, device_id: str) -> dict:
"""Return current device schedules reported by API."""
return await self.__get(f"/v2/devices/{device_id}/skeds")
async def action(self, device_id: str, action: Action) -> None:
"""Execute given action for a given device."""
if action.name == Action.SET_STATE_BELIEF:
path = f"/v2/devices/{device_id}/state"
async def patch(session: ClientSession) -> None:
self._api_kwargs["headers"]["BOND-UUID"] = self.__create_message_id()
async with session.patch(
f"http://{self._host}{path}",
**self._api_kwargs,
json=action.argument,
) as response:
response.raise_for_status()
await self.__call(patch)
else:
path = f"/v2/devices/{device_id}/actions/{action.name}"
async def put(session: ClientSession) -> None:
self._api_kwargs["headers"]["BOND-UUID"] = self.__create_message_id()
async with session.put(
f"http://{self._host}{path}",
**self._api_kwargs,
json=action.argument,
) as response:
response.raise_for_status()
await self.__call(put)
async def supports_groups(self) -> Boolean:
"""Return 'True' if the Bond supports the Groups feature."""
json = await self.__get("/v2/")
return "groups" in json
async def groups(self) -> List[str]:
"""Return the list of available group IDs reported by API."""
json = await self.__get("/v2/groups")
return [key for key in json if not key.startswith("_") and type(json[key]) is dict]
async def group(self, group_id: str) -> dict:
"""Return main group metadata reported by API."""
return await self.__get(f"/v2/groups/{group_id}")
async def group_properties(self, group_id: str) -> dict:
"""Return group properties reported by API."""
return await self.__get(f"/v2/groups/{group_id}/properties")
async def group_state(self, group_id: str) -> dict:
"""Return current group state reported by API."""
return await self.__get(f"/v2/groups/{group_id}/state")
async def group_skeds(self, group_id: str) -> dict:
"""Return current group schedules reported by API."""
return await self.__get(f"/v2/groups/{group_id}/skeds")
async def group_action(self, group_id: str, action: Action) -> None:
"""Execute given action for a given group."""
if action.name == Action.SET_STATE_BELIEF:
path = f"/v2/groups/{group_id}/state"
async def patch(session: ClientSession) -> None:
self._api_kwargs["headers"]["BOND-UUID"] = self.__create_message_id()
async with session.patch(
f"http://{self._host}{path}",
**self._api_kwargs,
json=action.argument,
) as response:
response.raise_for_status()
await self.__call(patch)
else:
path = f"/v2/groups/{group_id}/actions/{action.name}"
async def put(session: ClientSession) -> None:
self._api_kwargs["headers"]["BOND-UUID"] = self.__create_message_id()
async with session.put(
f"http://{self._host}{path}",
**self._api_kwargs,
json=action.argument,
) as response:
response.raise_for_status()
await self.__call(put)
async def __get(self, path) -> dict:
async def get(session: ClientSession) -> dict:
self._api_kwargs["headers"]["BOND-UUID"] = self.__create_message_id()
async with session.get(
f"http://{self._host}{path}", **self._api_kwargs
) as response:
response.raise_for_status()
return await response.json(loads=orjson.loads)
return await self.__call(get)
async def __patch(self, path, json) -> None:
async def patch(session: ClientSession) -> None:
self._api_kwargs["headers"]["BOND-UUID"] = self.__create_message_id()
async with session.patch(
f"http://{self._host}{path}",
**self._api_kwargs,
json=json,
) as response:
response.raise_for_status()
await self.__call(patch)
async def __put(self, path, json) -> None:
async def put(session: ClientSession) -> None:
self._api_kwargs["headers"]["BOND-UUID"] = self.__create_message_id()
async with session.put(
f"http://{self._host}{path}",
**self._api_kwargs,
json=json,
) as response:
response.raise_for_status()
await self.__call(put)
async def __call(self, handler: Callable[[ClientSession], Any]):
if not self._session:
async with ClientSession() as request_session:
return await handler(request_session)
else:
try:
return await handler(self._session)
except (ClientOSError, ServerDisconnectedError):
# bond has a short connection close time
# so we need to retry if we idled for a bit
return await handler(self._session)
def __create_message_id(self) -> str:
"""Create a unique hex message ID.
The first 2 characters is the requestor_uuid (in hex),
the next 4 characters are always the same for a session,
and the last 10 characters are random."""
return (
f"{self._requestor_uuid.hex_value()}"
f"{self._session_uuid}"
f"{random.randint(0, 0xFFFFFFFF):010x}"
).lower()
|