1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
|
"""API client for Tilt Pi."""
import asyncio
from typing import Final
import aiohttp
from tiltpi.errors import TiltPiConnectionError, TiltPiConnectionTimeoutError
from tiltpi.model import TiltColor, TiltHydrometerData
ENDPOINT_GET_ALL: Final = "/macid/all"
class TiltPiClient:
"""API client for Tilt Pi."""
def __init__(
self,
host: str,
port: int,
session: aiohttp.ClientSession,
timeout: int = 15,
) -> None:
"""Initialize the API client."""
self._host = host
self._port = port
self._session = session
self._timeout = aiohttp.ClientTimeout(total=timeout)
async def get_hydrometers(self) -> list[TiltHydrometerData]:
"""Get all hydrometer data."""
try:
async with self._session.get(
f"http://{self._host}:{self._port}{ENDPOINT_GET_ALL}",
timeout=self._timeout,
) as resp:
resp.raise_for_status()
data = await resp.json()
except asyncio.TimeoutError as err:
raise TiltPiConnectionTimeoutError(
f"Timeout while connecting to Tilt Pi at {self._host}"
) from err
except aiohttp.ClientError as err:
raise TiltPiConnectionError(
f"Error connecting to Tilt Pi at {self._host}"
) from err
return [
TiltHydrometerData(
mac_id=hydrometer["mac"],
color=TiltColor(hydrometer["Color"].title()),
temperature=float(hydrometer["Temp"]),
gravity=float(hydrometer["SG"]),
)
for hydrometer in data
]
|