File: __init__.py

package info (click to toggle)
python-mystrom 2.2.0-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, trixie
  • size: 188 kB
  • sloc: python: 855; makefile: 3
file content (107 lines) | stat: -rw-r--r-- 3,165 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
"""Base details for the myStrom Python bindings."""
import asyncio
import aiohttp
import async_timeout
from yarl import URL
from typing import Any, Mapping, Optional
import socket
from .exceptions import MyStromConnectionError

from importlib.metadata import version

__version__ = version("python-mystrom")

TIMEOUT = 10
USER_AGENT = f"PythonMyStrom/{__version__}"


async def _request(
    self,
    uri: str,
    method: str = "GET",
    data: Optional[Any] = None,
    json_data: Optional[dict] = None,
    params: Optional[Mapping[str, str]] = None,
) -> Any:
    """Handle a request to the myStrom device."""
    headers = {
        "User-Agent": USER_AGENT,
        "Accept": "application/json, text/plain, */*",
    }

    if self._session is None:
        self._session = aiohttp.ClientSession()
        self._close_session = True

    try:
        with async_timeout.timeout(TIMEOUT):
            response = await self._session.request(
                method,
                uri,
                data=data,
                json=json_data,
                params=params,
                headers=headers,
            )
    except asyncio.TimeoutError as exception:
        raise MyStromConnectionError(
            "Timeout occurred while connecting to myStrom device."
        ) from exception
    except (aiohttp.ClientError, socket.gaierror) as exception:
        raise MyStromConnectionError(
            "Error occurred while communicating with myStrom device."
        ) from exception

    content_type = response.headers.get("Content-Type", "")
    if (response.status // 100) in [4, 5]:
        response.close()

    if "application/json" in content_type:
        response_json = await response.json()
        return response_json

    return response.text


class MyStromDevice:
    """A class for a myStrom device."""

    def __init__(
        self,
        host,
        session: aiohttp.client.ClientSession = None,
    ):
        """Initialize the device."""
        self._close_session = False
        self._host = host
        self._session = session
        self.uri = URL.build(scheme="http", host=self._host)

    async def get_device_info(self) -> dict:
        """Get the device info of a myStrom device."""
        url = URL(self.uri).join(URL("api/v1/info"))
        response = await _request(self, uri=url)
        if not isinstance(response, dict):
            # Fall back to the old API version if the device runs with old firmware
            url = URL(self.uri).join(URL("info.json"))
            response = await _request(self, uri=url)
        return response

    async def close(self) -> None:
        """Close an open client session."""
        if self._session and self._close_session:
            await self._session.close()

    async def __aenter__(self) -> "MyStromDevice":
        """Async enter."""
        return self

    async def __aexit__(self, *exc_info) -> None:
        """Async exit."""
        await self.close()


async def get_device_info(host: str) -> dict:
    """Get the device info of a myStrom device."""
    async with MyStromDevice(host) as device:
        return await device.get_device_info()