File: __init__.py

package info (click to toggle)
python-aiohue 4.8.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 612 kB
  • sloc: python: 4,444; sh: 30; makefile: 5
file content (182 lines) | stat: -rw-r--r-- 6,145 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
"""AIOHue implementation for HueBridge over V1/legacy api."""

from __future__ import annotations

import asyncio
import logging
from types import TracebackType

import aiohttp
from asyncio_throttle import Throttler

from aiohue.errors import BridgeBusy, Unauthorized, raise_from_error

from .config import Config
from .groups import Groups
from .lights import Lights
from .scenes import Scenes
from .sensors import Sensors

# how many times do we retry on a 503 or 429 (bridge overload/rate limit)
MAX_RETRIES = 25
THROTTLE_CONCURRENT_REQUESTS = 1  # how many concurrent requests to the bridge
THROTTLE_TIMESPAN = 0.25  # timespan/period (in seconds) for the rate limiting


class HueBridgeV1:
    """Control a Hue bridge with legacy/V1 API.."""

    def __init__(
        self,
        host: str,
        app_key: str,
        websession: aiohttp.ClientSession | None = None,
    ) -> None:
        """
        Initialize the Bridge instance.

        Parameters:
            `host`: the hostname or IP-address of the bridge as string.
            `app_key`: provide the hue appkey/username for authentication.
            `websession`: optionally provide a aiohttp ClientSession.
        """
        self._host = host
        self._app_key = app_key
        self._websession = websession
        self._websession_provided = websession is not None

        self.logger = logging.getLogger(f"{__package__}[{host}]")
        # all api controllers
        self._config = None
        self._devices = None
        self._lights = None
        self._scenes = None
        self._groups = None
        self._sensors = None
        # Setup the Throttler/rate limiter for requests to the bridge.
        self._throttler = Throttler(
            rate_limit=THROTTLE_CONCURRENT_REQUESTS, period=THROTTLE_TIMESPAN
        )

    @property
    def bridge_id(self) -> str | None:
        """Return the ID of the bridge we're currently connected to."""
        return self._config.bridge_id if self._config else None

    @property
    def host(self) -> str:
        """Return the hostname of the bridge."""
        return self._host

    @property
    def config(self) -> Config | None:
        """Get the bridge config."""
        return self._config

    @property
    def lights(self) -> Lights | None:
        """Get the light resources."""
        return self._lights

    @property
    def scenes(self) -> Scenes | None:
        """Get the scene resources."""
        return self._scenes

    @property
    def groups(self) -> Groups | None:
        """Get the group resources."""
        return self._groups

    @property
    def sensors(self) -> Sensors | None:
        """Get the sensor resources."""
        return self._sensors

    async def initialize(self):
        """Initialize the connection to the bridge and fetch all data."""
        result = await self.request("get", "")
        self._config = Config(result.pop("config"), self.request)
        self._groups = Groups(self.logger, result.pop("groups"), self.request)
        self._lights = Lights(self.logger, result.pop("lights"), self.request)
        if "scenes" in result:
            self._scenes = Scenes(self.logger, result.pop("scenes"), self.request)
        if "sensors" in result:
            self._sensors = Sensors(self.logger, result.pop("sensors"), self.request)
        self.logger.debug("Unused result: %s", result)

    async def close(self) -> None:
        """Close connection and cleanup."""
        if not self._websession_provided:
            await self._websession.close()
        self.logger.info("Connection to bridge closed.")

    async def request(self, method, endpoint, json=None):
        """Make request on the api and return response data."""
        if self._websession is None:
            self._websession = aiohttp.ClientSession()

        # Old bridges and (most) emulators only use `http`
        url = f"http://{self.host}/api/{self._app_key}/{endpoint}"
        # The bridge will rate limit if we send more requests than about 2-5 per second
        # we guard ourselves from hitting the rate limit by using a throttler
        # but others apps/services are hitting the Hue bridge too so we still
        # might hit the rate limit/overload at some point so we also retry if this happens.
        retries = 0

        while retries < MAX_RETRIES:
            retries += 1

            if retries > 1:
                retry_wait = 0.25 * retries
                self.logger.debug(
                    "Got 503 or 429 error from Hue bridge, retry request in %s seconds",
                    retry_wait,
                )
                await asyncio.sleep(retry_wait)

            async with self._websession.request(method, url, json=json) as resp:
                # 503 means the service is temporarily unavailable, back off a bit.
                if resp.status == 503:
                    continue
                # 429 means the bridge is rate limiting/overloaded, we should back off a bit.
                if resp.status == 429:
                    continue
                if resp.status == 403:
                    raise Unauthorized
                # raise on all other error status codes
                resp.raise_for_status()
                data = await resp.json()
                _raise_on_error(data)
                return data

        raise BridgeBusy(
            f"{retries} requests to the bridge failed, "
            "its probably overloaded. Giving up."
        )

    async def __aenter__(self) -> HueBridgeV1:
        """Return Context manager."""
        await self.initialize()
        return self

    async def __aexit__(
        self,
        exc_type: type[BaseException],
        exc_val: BaseException,
        exc_tb: TracebackType,
    ) -> bool | None:
        """Exit context manager."""
        await self.close()
        if exc_val:
            raise exc_val
        return exc_type


def _raise_on_error(data):
    """Check response for error message."""
    if isinstance(data, list):
        data = data[0]

    if isinstance(data, dict) and "error" in data:
        raise_from_error(data["error"])