File: __init__.py

package info (click to toggle)
python-mystrom 2.5.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 292 kB
  • sloc: python: 901; makefile: 2
file content (107 lines) | stat: -rw-r--r-- 3,076 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
"""Base details for the myStrom Python bindings."""

import asyncio
import socket
from typing import Any, Mapping, Optional

import aiohttp
from yarl import URL

from .exceptions import MyStromConnectionError

TIMEOUT = 10
USER_AGENT = "PythonMyStrom/1.0"


async def _request(
    self,
    uri: str,
    method: str = "GET",
    data: Optional[Any] = None,
    json_data: Optional[dict] = None,
    params: Optional[Mapping[str, str]] = None,
) -> Any:
    """Handle a request to the myStrom device."""
    headers = {
        "User-Agent": USER_AGENT,
        "Accept": "application/json, text/plain, */*",
    }

    if self._session is None:
        self._session = aiohttp.ClientSession()
        self._close_session = True

    try:
        response = await asyncio.wait_for(
            self._session.request(
                method,
                uri,
                data=data,
                json=json_data,
                params=params,
                headers=headers,
            ),
            timeout=TIMEOUT,
        )
    except asyncio.TimeoutError as exception:
        raise MyStromConnectionError(
            "Timeout occurred while connecting to myStrom device."
        ) from exception
    except (aiohttp.ClientError, socket.gaierror) as exception:
        raise MyStromConnectionError(
            "Error occurred while communicating with myStrom device."
        ) from exception

    content_type = response.headers.get("Content-Type", "")
    if (response.status // 100) in [4, 5]:
        response.close()

    if "application/json" in content_type:
        response_json = await response.json()
        return response_json

    return response.text


class MyStromDevice:
    """A class for a myStrom device."""

    def __init__(
        self,
        host,
        session: aiohttp.client.ClientSession = None,
    ):
        """Initialize the device."""
        self._close_session = False
        self._host = host
        self._session = session
        self.uri = URL.build(scheme="http", host=self._host)

    async def get_device_info(self) -> dict:
        """Get the device info of a myStrom device."""
        url = URL(self.uri).join(URL("api/v1/info"))
        response = await _request(self, uri=url)
        if not isinstance(response, dict):
            # Fall back to the old API version if the device runs with old firmware
            url = URL(self.uri).join(URL("info.json"))
            response = await _request(self, uri=url)
        return response

    async def close(self) -> None:
        """Close an open client session."""
        if self._session and self._close_session:
            await self._session.close()

    async def __aenter__(self) -> "MyStromDevice":
        """Async enter."""
        return self

    async def __aexit__(self, *exc_info) -> None:
        """Async exit."""
        await self.close()


async def get_device_info(host: str) -> dict:
    """Get the device info of a myStrom device."""
    async with MyStromDevice(host) as device:
        return await device.get_device_info()