File: api_request_handler.py

package info (click to toggle)
python-aioambient 2024.1.0-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 472 kB
  • sloc: python: 755; sh: 41; makefile: 5
file content (85 lines) | stat: -rw-r--r-- 2,665 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
"""Define an object to interact with the REST API."""
from __future__ import annotations

import asyncio
import logging
from typing import Any

from aiohttp import ClientSession, ClientTimeout
from aiohttp.client_exceptions import ClientError

from .const import LOGGER
from .errors import RequestError

DEFAULT_TIMEOUT = 10


# Request returns either a list of dicts or a dict itself.
RequestResponseT = list[dict[str, Any]] | dict[str, Any]


class ApiRequestHandler:  # pylint: disable=too-few-public-methods
    """Handle API requests. Base class for both the API and OpenAPI classes.
    Handles all requests to Ambient services."""

    def __init__(  # pylint: disable=too-many-arguments
        self,
        base_url: str,
        *,
        logger: logging.Logger = LOGGER,
        session: ClientSession | None = None,
    ) -> None:
        """Initialize.

        Args:
            base_url: Base URL for each request
            logger: The logger to use.
            session: An optional aiohttp ClientSession.
        """
        self._logger = logger
        self._session: ClientSession | None = session
        self._base_url = base_url

    async def _request(
        self, method: str, endpoint: str, **kwargs: dict[str, Any]
    ) -> RequestResponseT:
        """Make a request against the API.

        In order to deal with Ambient's fairly aggressive rate limiting, we
        pause for a second before continuing:
        https://ambientweather.docs.apiary.io/#introduction/rate-limiting

        Args:
            method: An HTTP method.
            endpoint: A relative API endpoint.
            **kwargs: Additional kwargs to send with the request.

        Returns:
            An API response payload.

        Raises:
            RequestError: Raised upon an underlying HTTP error.
        """
        await asyncio.sleep(1)

        url = f"{self._base_url}/{endpoint}"

        if use_running_session := self._session and not self._session.closed:
            session = self._session
        else:
            session = ClientSession(timeout=ClientTimeout(total=DEFAULT_TIMEOUT))

        try:
            async with session.request(method, url, **kwargs) as resp:
                resp.raise_for_status()
                data: RequestResponseT = await resp.json()
        except ClientError as err:
            raise RequestError(f"Error requesting data from {url}: {err}") from err
        finally:
            if not use_running_session:
                await session.close()

        self._logger.debug("Received data for %s: %s", endpoint, data)

        # Returns either a list of dicts or a dict itself.
        return data