File: __init__.py

package info (click to toggle)
python-aioskybell 23.12.0-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 304 kB
  • sloc: python: 1,422; makefile: 22
file content (297 lines) | stat: -rw-r--r-- 10,591 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
"""An asynchronous client for Skybell API.

Async spinoff of: https://github.com/MisterWil/skybellpy

Published under the MIT license - See LICENSE file for more details.

"Skybell" is a trademark owned by SkyBell Technologies, Inc, see
www.skybell.com for more information. I am in no way affiliated with Skybell.
"""
from __future__ import annotations

import asyncio
import logging
import os
from asyncio.exceptions import TimeoutError as Timeout
from typing import Any, Collection, cast

from aiohttp.client import ClientSession, ClientTimeout
from aiohttp.client_exceptions import ClientConnectorError, ClientError

from . import utils as UTILS
from .device import SkybellDevice
from .exceptions import SkybellAuthenticationException, SkybellException
from .helpers import const as CONST
from .helpers import errors as ERROR
from .helpers.models import DeviceTypeDict, EventTypeDict

_LOGGER = logging.getLogger(__name__)


class Skybell:  # pylint:disable=too-many-instance-attributes
    """Main Skybell class."""

    _close_session = False

    def __init__(  # pylint:disable=too-many-arguments
        self,
        username: str | None = None,
        password: str | None = None,
        auto_login: bool = False,
        get_devices: bool = False,
        cache_path: str = CONST.CACHE_PATH,
        disable_cache: bool = False,
        login_sleep: bool = True,
        session: ClientSession | None = None,
    ) -> None:
        """Initialize Skybell object."""
        self._auto_login = auto_login
        self._cache_path = cache_path
        self._devices: dict[str, SkybellDevice] = {}
        self._disable_cache = disable_cache
        self._get_devices = get_devices
        self._password = password
        if username is not None and self._cache_path == CONST.CACHE_PATH:
            self._cache_path = f"skybell_{username.replace('.', '')}.pickle"
        self._username = username
        if session is None:
            session = ClientSession()
            self._close_session = True
        self._session = session
        self._login_sleep = login_sleep
        self._user: dict[str, str] = {}

        # Create a new cache template
        self._cache: dict[str, str | dict[str, EventTypeDict]] = {
            CONST.APP_ID: UTILS.gen_id(),
            CONST.CLIENT_ID: UTILS.gen_id(),
            CONST.TOKEN: UTILS.gen_token(),
            CONST.ACCESS_TOKEN: "",
            CONST.DEVICES: {},
        }

    async def __aenter__(self) -> Skybell:
        """Async enter."""
        return self

    async def __aexit__(self, *exc_info: Any) -> None:
        """Async exit."""
        if self._session and self._close_session:
            await self._session.close()

    async def async_initialize(self) -> list[SkybellDevice]:
        """Initialize."""
        if not self._disable_cache:
            await self._async_load_cache()
        if (
            self._username is not None
            and self._password is not None
            and self._auto_login
        ):
            await self.async_login()
        self._user = await self.async_send_request(CONST.USERS_ME_URL)
        return await self.async_get_devices()

    async def async_login(
        self, username: str | None = None, password: str | None = None
    ) -> bool:
        """Execute Skybell login."""
        if username is not None:
            self._username = username
        if password is not None:
            self._password = password

        if self._username is None or self._password is None:
            raise SkybellAuthenticationException(
                self, f"{ERROR.USERNAME}: {ERROR.PASSWORD}"
            )

        await self.async_update_cache({CONST.ACCESS_TOKEN: ""})

        login_data: dict[str, str | int] = {
            "username": self._username,
            "password": self._password,
            "appId": cast(str, self.cache(CONST.APP_ID)),
            CONST.TOKEN: cast(str, self.cache(CONST.TOKEN)),
        }

        response = await self.async_send_request(
            CONST.LOGIN_URL, json=login_data, method=CONST.HTTPMethod.POST, retry=False
        )

        _LOGGER.debug("Login Response: %s", response)

        await self.async_update_cache(
            {CONST.ACCESS_TOKEN: response[CONST.ACCESS_TOKEN]}
        )

        if self._login_sleep:
            _LOGGER.info("Login successful, waiting 5 seconds...")
            await asyncio.sleep(5)
        else:
            _LOGGER.info("Login successful")

        return True

    async def async_logout(self) -> bool:
        """Explicit Skybell logout."""
        if len(self.cache(CONST.ACCESS_TOKEN)) > 0:
            # No explicit logout call as it doesn't seem to matter
            # if a logout happens without registering the app which
            # we aren't currently doing.
            if self._session and self._close_session:
                await self._session.close()
            self._devices = {}

            await self.async_update_cache({CONST.ACCESS_TOKEN: ""})

        return True

    async def async_get_devices(self, refresh: bool = False) -> list[SkybellDevice]:
        """Get all devices from Skybell."""
        if refresh or len(self._devices) == 0:
            _LOGGER.info("Updating all devices...")
            response = await self.async_send_request(CONST.DEVICES_URL)

            _LOGGER.debug("Get Devices Response: %s", response)

            for device_json in response:
                # Attempt to reuse an existing device
                device = self._devices.get(device_json[CONST.ID])

                # No existing device, create a new one
                if device:
                    await device.async_update({device_json[CONST.ID]: device_json})
                else:
                    device = SkybellDevice(device_json, self)
                    self._devices[device.device_id] = device

        return list(self._devices.values())

    async def async_get_device(
        self, device_id: str, refresh: bool = False
    ) -> SkybellDevice:
        """Get a single device."""
        if len(self._devices) == 0:
            await self.async_get_devices()
            refresh = False

        device = self._devices.get(device_id)

        if not device:
            raise SkybellException(self, "Device not found")
        if refresh:
            await device.async_update()

        return device

    @property
    def user_id(self) -> str:
        """Return logged in user id."""
        return self._user[CONST.ID]

    @property
    def user_first_name(self) -> str:
        """Return logged in user first name."""
        return self._user["firstName"]

    @property
    def user_last_name(self) -> str:
        """Return logged in user last name."""
        return self._user["lastName"]

    async def async_send_request(  # pylint:disable=too-many-arguments
        self,
        url: str,
        headers: dict[str, str] | None = None,
        method: CONST.HTTPMethod = CONST.HTTPMethod.GET,
        retry: bool = True,
        **kwargs: Any,
    ) -> Any:
        """Send requests to Skybell."""
        if len(self.cache(CONST.ACCESS_TOKEN)) == 0 and url != CONST.LOGIN_URL:
            await self.async_login()

        headers = headers if headers else {}
        if "cloud.myskybell.com" in url:
            if len(self.cache(CONST.ACCESS_TOKEN)) > 0:
                headers["Authorization"] = f"Bearer {self.cache(CONST.ACCESS_TOKEN)}"
            headers["content-type"] = "application/json"
            headers["accept"] = "*/*"
            headers["x-skybell-app-id"] = cast(str, self.cache(CONST.APP_ID))
            headers["x-skybell-client-id"] = cast(str, self.cache(CONST.CLIENT_ID))

        _LOGGER.debug("HTTP %s %s Request with headers: %s", method, url, headers)

        try:
            response = await self._session.request(
                method.value,
                url,
                headers=headers,
                timeout=ClientTimeout(30),
                **kwargs,
            )
            if response.status == 401:
                raise SkybellAuthenticationException(await response.text())
            if response.status in (403, 404):
                # 403/404 for expired request/device key no longer present in S3
                _LOGGER.exception(await response.text())
                return None
            response.raise_for_status()
        except ClientError as ex:
            if retry:
                await self.async_login()

                return await self.async_send_request(
                    url, headers=headers, method=method, retry=False, **kwargs
                )
            raise SkybellException from ex
        if response.content_type == "application/json":
            return await response.json()
        return await response.read()

    def cache(self, key: str) -> str | Collection[str]:
        """Get a cached value."""
        return self._cache.get(key, "")

    async def async_update_cache(
        self, data: dict[str, str] | dict[str, DeviceTypeDict]
    ) -> None:
        """Update a cached value."""
        UTILS.update(self._cache, data)
        await self._async_save_cache()

    async def _async_load_cache(self) -> None:
        """Load existing cache and merge for updating if required."""
        if not self._disable_cache:
            if os.path.exists(self._cache_path):
                _LOGGER.debug("Cache found at: %s", self._cache_path)
                if os.path.getsize(self._cache_path) > 0:
                    loaded_cache = await UTILS.async_load_cache(self._cache_path)
                    UTILS.update(self._cache, loaded_cache)
                else:
                    _LOGGER.debug("Cache file is empty.  Removing it.")
                    os.remove(self._cache_path)

        await self._async_save_cache()

    async def _async_save_cache(self) -> None:
        """Trigger a cache save."""
        if not self._disable_cache:
            await UTILS.async_save_cache(self._cache, self._cache_path)

    async def async_test_ports(self, host: str, ports: list[int] | None = None) -> bool:
        """Test if ports are open. Only use this for discovery."""
        result = False
        for port in ports or [6881, 6969]:
            try:
                await self._session.get(
                    f"http://{host}:{port}",
                    timeout=ClientTimeout(10),
                )
            except ClientConnectorError as ex:
                if ex.errno == 61:
                    result = True
            except Timeout:
                return False
        return result