1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134
|
"""Asynchronous Python client for IOmeter."""
import asyncio
from dataclasses import dataclass
from typing import Optional, Self
from aiohttp import ClientResponseError, ClientSession
from yarl import URL
from .exceptions import (
IOmeterConnectionError,
IOmeterNoReadingsError,
IOmeterNoStatusError,
IOmeterTimeoutError,
)
from .reading import Reading
from .status import Status
@dataclass
class IOmeterClient:
"""Main IOmeter client class for handling HTTP connections with the IOmeter bridge.
Attributes:
host: The hostname or IP address of the IOmeter bridge
request_timeout: Number of seconds to wait for bridge response
session: Optional aiohttp ClientSession for making requests
Example:
async with IOmeterClient("192.168.1.100") as client:
reading = await client.get_current_reading()
status = await client.get_current_status()
"""
host: str
request_timeout: int = 60
session: Optional[ClientSession] = None
async def _request(self, uri: str) -> str:
"""Make a request to the IOmeter bridge.
Args:
uri: The URI endpoint to request
Returns:
The response text from the bridge
Raises:
IOmeterConnectionError: If any communication error occurs
"""
if not self.session:
raise RuntimeError("Client session not initialized")
url = URL.build(scheme="http", host=self.host).joinpath(uri)
headers = {
"User-Agent": "PythonIOmeter/0.1",
"Accept": "application/json",
}
try:
async with asyncio.timeout(self.request_timeout):
response = await self.session.get(url, headers=headers)
response.raise_for_status()
return await response.text()
except asyncio.TimeoutError as error:
raise IOmeterTimeoutError(
"Timeout while communicating with IOmeter bridge"
) from error
except ClientResponseError as error:
# Map 404 responses to more specific exceptions depending on the
# requested endpoint so callers can handle "no data" cases.
if error.status == 404:
if uri.endswith("v1/reading") or "/reading" in uri:
raise IOmeterNoReadingsError(
"No readings available from IOmeter bridge"
) from error
if uri.endswith("v1/status") or "/status" in uri:
raise IOmeterNoStatusError(
"No status available from IOmeter bridge"
) from error
# For other HTTP errors, raise a generic connection error.
raise IOmeterConnectionError(
f"Bridge returned error {error.status}: {error.message}"
) from error
except Exception as error:
raise IOmeterConnectionError(
f"Error communicating with IOmeter bridge: {str(error)}"
) from error
async def get_current_reading(self) -> Reading:
"""Get current reading from IOmeter bridge.
Returns:
Reading object containing the current meter values
Raises:
IOmeterConnectionError: If communication with bridge fails
"""
response = await self._request("v1/reading")
return Reading.from_json(response)
async def get_current_status(self) -> Status:
"""Get device status from IOmeter bridge.
Returns:
Status object containing the current bridge status
Raises:
IOmeterConnectionError: If communication with bridge fails
"""
response = await self._request("v1/status")
return Status.from_json(response)
async def close(self) -> None:
"""Close the client session."""
if self.session:
await self.session.close()
self.session = None
async def __aenter__(self) -> Self:
"""Set up the client session.
Returns:
The configured client instance
"""
self.session = self.session or ClientSession()
return self
async def __aexit__(self, *_exc_info: object) -> None:
"""Clean up the client session."""
await self.close()
|