1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85
|
"""Define an object to interact with the REST API."""
from __future__ import annotations
import asyncio
import logging
from typing import Any
from aiohttp import ClientSession, ClientTimeout
from aiohttp.client_exceptions import ClientError
from .const import LOGGER
from .errors import RequestError
DEFAULT_TIMEOUT = 10
# Request returns either a list of dicts or a dict itself.
RequestResponseT = list[dict[str, Any]] | dict[str, Any]
class ApiRequestHandler: # pylint: disable=too-few-public-methods
"""Handle API requests. Base class for both the API and OpenAPI classes.
Handles all requests to Ambient services."""
def __init__( # pylint: disable=too-many-arguments
self,
base_url: str,
*,
logger: logging.Logger = LOGGER,
session: ClientSession | None = None,
) -> None:
"""Initialize.
Args:
base_url: Base URL for each request
logger: The logger to use.
session: An optional aiohttp ClientSession.
"""
self._logger = logger
self._session: ClientSession | None = session
self._base_url = base_url
async def _request(
self, method: str, endpoint: str, **kwargs: dict[str, Any]
) -> RequestResponseT:
"""Make a request against the API.
In order to deal with Ambient's fairly aggressive rate limiting, we
pause for a second before continuing:
https://ambientweather.docs.apiary.io/#introduction/rate-limiting
Args:
method: An HTTP method.
endpoint: A relative API endpoint.
**kwargs: Additional kwargs to send with the request.
Returns:
An API response payload.
Raises:
RequestError: Raised upon an underlying HTTP error.
"""
await asyncio.sleep(1)
url = f"{self._base_url}/{endpoint}"
if use_running_session := self._session and not self._session.closed:
session = self._session
else:
session = ClientSession(timeout=ClientTimeout(total=DEFAULT_TIMEOUT))
try:
async with session.request(method, url, **kwargs) as resp:
resp.raise_for_status()
data: RequestResponseT = await resp.json()
except ClientError as err:
raise RequestError(f"Error requesting data from {url}: {err}") from err
finally:
if not use_running_session:
await session.close()
self._logger.debug("Received data for %s: %s", endpoint, data)
# Returns either a list of dicts or a dict itself.
return data
|