1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107
|
"""Base details for the myStrom Python bindings."""
import asyncio
import aiohttp
import async_timeout
from yarl import URL
from typing import Any, Mapping, Optional
import socket
from .exceptions import MyStromConnectionError
from importlib.metadata import version
__version__ = version("python-mystrom")
TIMEOUT = 10
USER_AGENT = f"PythonMyStrom/{__version__}"
async def _request(
self,
uri: str,
method: str = "GET",
data: Optional[Any] = None,
json_data: Optional[dict] = None,
params: Optional[Mapping[str, str]] = None,
) -> Any:
"""Handle a request to the myStrom device."""
headers = {
"User-Agent": USER_AGENT,
"Accept": "application/json, text/plain, */*",
}
if self._session is None:
self._session = aiohttp.ClientSession()
self._close_session = True
try:
with async_timeout.timeout(TIMEOUT):
response = await self._session.request(
method,
uri,
data=data,
json=json_data,
params=params,
headers=headers,
)
except asyncio.TimeoutError as exception:
raise MyStromConnectionError(
"Timeout occurred while connecting to myStrom device."
) from exception
except (aiohttp.ClientError, socket.gaierror) as exception:
raise MyStromConnectionError(
"Error occurred while communicating with myStrom device."
) from exception
content_type = response.headers.get("Content-Type", "")
if (response.status // 100) in [4, 5]:
response.close()
if "application/json" in content_type:
response_json = await response.json()
return response_json
return response.text
class MyStromDevice:
"""A class for a myStrom device."""
def __init__(
self,
host,
session: aiohttp.client.ClientSession = None,
):
"""Initialize the device."""
self._close_session = False
self._host = host
self._session = session
self.uri = URL.build(scheme="http", host=self._host)
async def get_device_info(self) -> dict:
"""Get the device info of a myStrom device."""
url = URL(self.uri).join(URL("api/v1/info"))
response = await _request(self, uri=url)
if not isinstance(response, dict):
# Fall back to the old API version if the device runs with old firmware
url = URL(self.uri).join(URL("info.json"))
response = await _request(self, uri=url)
return response
async def close(self) -> None:
"""Close an open client session."""
if self._session and self._close_session:
await self._session.close()
async def __aenter__(self) -> "MyStromDevice":
"""Async enter."""
return self
async def __aexit__(self, *exc_info) -> None:
"""Async exit."""
await self.close()
async def get_device_info(host: str) -> dict:
"""Get the device info of a myStrom device."""
async with MyStromDevice(host) as device:
return await device.get_device_info()
|