File: api.py

package info (click to toggle)
python-tilt-pi 0.2.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 192 kB
  • sloc: python: 153; makefile: 2
file content (56 lines) | stat: -rw-r--r-- 1,703 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
"""API client for Tilt Pi."""

import asyncio
from typing import Final

import aiohttp

from tiltpi.errors import TiltPiConnectionError, TiltPiConnectionTimeoutError
from tiltpi.model import TiltColor, TiltHydrometerData

ENDPOINT_GET_ALL: Final = "/macid/all"


class TiltPiClient:
    """API client for Tilt Pi."""

    def __init__(
        self,
        host: str,
        port: int,
        session: aiohttp.ClientSession,
        timeout: int = 15,
    ) -> None:
        """Initialize the API client."""
        self._host = host
        self._port = port
        self._session = session
        self._timeout = aiohttp.ClientTimeout(total=timeout)

    async def get_hydrometers(self) -> list[TiltHydrometerData]:
        """Get all hydrometer data."""
        try:
            async with self._session.get(
                f"http://{self._host}:{self._port}{ENDPOINT_GET_ALL}",
                timeout=self._timeout,
            ) as resp:
                resp.raise_for_status()
                data = await resp.json()
        except asyncio.TimeoutError as err:
            raise TiltPiConnectionTimeoutError(
                f"Timeout while connecting to Tilt Pi at {self._host}"
            ) from err
        except aiohttp.ClientError as err:
            raise TiltPiConnectionError(
                f"Error connecting to Tilt Pi at {self._host}"
            ) from err

        return [
            TiltHydrometerData(
                mac_id=hydrometer["mac"],
                color=TiltColor(hydrometer["Color"].title()),
                temperature=float(hydrometer["Temp"]),
                gravity=float(hydrometer["SG"]),
            )
            for hydrometer in data
        ]