1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101
|
import aiohttp
import asyncio
import logging
import datetime
import typing
from aiohttp import ClientSession
from urllib.parse import urljoin
LOGGER = logging.getLogger(__name__)
class FlussApiClientError(Exception):
"""Exception to indicate a general API error."""
class FlussDeviceError(FlussApiClientError):
"""Exception to indicate an error occurred when retrieving devices."""
class FlussApiClientCommunicationError(FlussApiClientError):
"""Exception to indicate a communication error."""
class FlussApiClientAuthenticationError(FlussApiClientError):
"""Exception to indicate an authentication error."""
class FlussApiClient:
"""Fluss+ API Client."""
def __init__(
self,
api_key: str,
session: typing.Optional[ClientSession] = None,
timeout: int = 10,
) -> None:
"""Initialize the Fluss+ API Client."""
self._api_key = api_key
self._base_url = "https://zgekzokxrl.execute-api.eu-west-1.amazonaws.com/v1/api/"
self._timeout = timeout
self._session = session or ClientSession()
async def async_get_devices(self) -> typing.Any:
"""Get data from the API."""
try:
return await self._api_wrapper(
method="GET",
endpoint="device/list",
headers={"Authorization": self._api_key},
)
except FlussApiClientError as error:
LOGGER.error("Failed to get devices: %s", error)
raise FlussDeviceError("Failed to retrieve devices") from error
async def async_trigger_device(self, device_id: str) -> typing.Any:
"""Trigger the device."""
timestamp = int(datetime.datetime.now().timestamp() * 1000)
return await self._api_wrapper(
method="POST",
endpoint=f"device/{device_id}/trigger",
headers={"Authorization": self._api_key},
data={"timeStamp": timestamp, "metaData": {}},
)
async def _api_wrapper(
self,
method: str,
endpoint: str,
data: dict | None = None,
headers: dict | None = None,
) -> typing.Any:
"""Make a request to the Fluss API."""
url = urljoin(self._base_url, endpoint)
try:
async with asyncio.timeout(self._timeout):
async with self._session.request(
method=method,
url=url,
headers=headers,
json=data,
) as response:
if response.status == 401:
raise FlussApiClientAuthenticationError("Invalid credentials")
elif response.status == 403:
raise FlussApiClientAuthenticationError("Access forbidden")
response.raise_for_status()
return await response.json()
except asyncio.TimeoutError as e:
LOGGER.error("Timeout error fetching information from %s", url)
raise FlussApiClientCommunicationError("Timeout error fetching information") from e
except aiohttp.ClientError as ex:
LOGGER.error("Client error fetching information from %s: %s", url, ex)
raise FlussApiClientCommunicationError("Error fetching information") from ex
except FlussApiClientAuthenticationError as auth_ex:
LOGGER.error("Authentication error: %s", auth_ex)
raise
except Exception as exception:
LOGGER.error("Unexpected error occurred: %s", exception)
raise FlussApiClientError("An unexpected error occurred") from exception
async def close(self):
"""Close the aiohttp session if it was created by this client."""
if self._session and not self._session.closed:
await self._session.close()
|