File: aiorequest.py

package info (click to toggle)
python-aiopvapi 3.1.1-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 684 kB
  • sloc: python: 3,123; xml: 850; makefile: 5
file content (201 lines) | stat: -rw-r--r-- 8,430 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
"""Class containing the async http methods."""

import asyncio
import logging

import aiohttp

_LOGGER = logging.getLogger(__name__)


class PvApiError(Exception):
    """General Api error."""


class PvApiResponseStatusError(PvApiError):
    """Wrong http response error."""


class PvApiMaintenance(PvApiError):
    """Hub is undergoing maintenance."""


class PvApiConnectionError(PvApiError):
    """Problem connecting to PowerView Hub."""


class PvApiEmptyData(PvApiError):
    """PowerView Hub returned empty data."""


class AioRequest:
    """Request class managing Powerview Hub connection."""

    def __init__(
        self,
        hub_ip,
        loop=None,
        websession=None,
        timeout: int = 15,
        api_version: int | None = None,
    ) -> None:
        """Initialize request class."""
        self.hub_ip = hub_ip
        self._timeout = timeout
        if loop:
            self.loop = loop
        else:
            self.loop = asyncio.get_event_loop()
        if websession:
            self.websession = websession
        else:
            self.websession = aiohttp.ClientSession()
        self.api_version: int | None = api_version
        self._last_request_status: int = 0
        _LOGGER.debug("Powerview api version: %s", self.api_version)

    @property
    def api_path(self) -> str:
        """Return the initial api call path."""
        if self.api_version and self.api_version >= 3:
            return "home"
        return "api"

    async def check_response(self, response, valid_response_codes):
        """Check the response for correctness."""
        _val = None
        if response.status == 403 and self._last_request_status == 423:
            # if last status was hub undergoing maint then it is common
            # on reboot for a 403 response. Generally this should raise
            # PvApiResponseStatusError but as this is unavoidable we
            # class this situation as still undergoing maintenance
            _val = False
        elif response.status in [204, 423]:
            # 423 hub under maintenance, returns data, but not shade
            _val = True
        elif response.status in valid_response_codes:
            _val = await response.json()

        # store the status for next check
        self._last_request_status = response.status

        # raise a maintenance error
        if isinstance(_val, bool):
            raise PvApiMaintenance("Powerview Hub is undergoing maintenance")

        # if none of the above checks passed, raise a response error
        if _val is None:
            raise PvApiResponseStatusError(response.status)

        # finally, return the result
        return _val

    async def get(self, url: str, params: str = None, suppress_timeout: bool = False, **kwargs) -> dict:
        """Get a resource.

        :param url: The URL to fetch.
        :param params: Dictionary or bytes to be sent in the query string of the new request
                    (optional).
        :param suppress_timeout: Stermine if timeouts will return an error
        :param kwargs: Keyword arguments to be passed to aiohttp ClientSession get method.
                    For example, timeout can be passed as kwargs.
        :return: A dictionary representing the JSON response.
        """
        response = None
        try:
            timeout = kwargs.pop("timeout", None) or self._timeout
            _LOGGER.debug("Sending GET request to: %s params: %s timeout: %s kwargs: %s", url, params, timeout, kwargs)
            response = await self.websession.get(url, params=params, timeout=timeout, **kwargs)
            return await self.check_response(response, [200, 204])
        except TimeoutError as error:
            if suppress_timeout:
                _LOGGER.debug("Timeout occurred but was suppressed: %s", error)
                return None
            raise PvApiConnectionError("Timeout in communicating with PowerView Hub") from error
        except aiohttp.ClientError as error:
            raise PvApiConnectionError("Failed to communicate with PowerView Hub") from error
        finally:
            if response is not None:
                await response.release()

    async def post(self, url: str, data: dict = None, suppress_timeout: bool = False, **kwargs):
        """Post a resource update.

        :param url: The URL to fetch.
        :param data: Dictionary later converted to json. Sent in the request
        :param suppress_timeout: Stermine if timeouts will return an error
        :param kwargs: Keyword arguments to be passed to aiohttp ClientSession get method.
                    For example, timeout can be passed as kwargs.
        :return: A dictionary representing the JSON response.
        """
        response = None
        try:
            timeout = kwargs.pop("timeout", None) or self._timeout
            _LOGGER.debug("Sending POST request to: %s data: %s timeout: %s kwargs: %s", url, data, timeout, kwargs)
            response = await self.websession.post(url,json=data,timeout=timeout,**kwargs)
            return await self.check_response(response, [200, 201])
        except TimeoutError as error:
            if suppress_timeout:
                _LOGGER.debug("Timeout occurred but was suppressed: %s", error)
                return None
            raise PvApiConnectionError("Timeout in communicating with PowerView Hub") from error
        except aiohttp.ClientError as error:
            raise PvApiConnectionError("Failed to communicate with PowerView Hub") from error
        finally:
            if response is not None:
                await response.release()

    async def put(self, url: str, data: dict = None, params=None, suppress_timeout: bool = False, **kwargs):
        """Do a put request.

        :param url: The URL to fetch.
        :param data: Dictionary later converted to json. Sent in the request
        :param params: Dictionary or bytes to be sent in the query string of the new request
                    (optional).
        :param suppress_timeout: Stermine if timeouts will return an error
        :param kwargs: Keyword arguments to be passed to aiohttp ClientSession get method.
                    For example, timeout can be passed as kwargs.
        :return: A dictionary representing the JSON response.
        """
        response = None
        try:
            timeout = kwargs.pop("timeout", None) or self._timeout
            _LOGGER.debug("Sending PUT request to: %s params: %s data: %s timeout: %s kwargs: %s", url, params, data, timeout, kwargs)
            response = await self.websession.put(url, json=data, params=params, timeout=timeout, **kwargs)
            return await self.check_response(response, [200, 204])
        except TimeoutError as error:
            if suppress_timeout:
                _LOGGER.debug("Timeout occurred but was suppressed: %s", error)
                return None
            raise PvApiConnectionError("Timeout in communicating with PowerView Hub") from error
        except aiohttp.ClientError as error:
            raise PvApiConnectionError("Failed to communicate with PowerView Hub") from error
        finally:
            if response is not None:
                await response.release()

    async def delete(self, url: str, params: dict = None, suppress_timeout: bool = False, **kwargs):
        """Delete a resource.

        :param url: Endpoint
        :param params: parameters
        :return: Response body

        :raises PvApiError when something is wrong.
        """
        response = None
        try:
            timeout = kwargs.pop("timeout", None) or self._timeout
            _LOGGER.debug("Sending DELETE request to: %s params: %s timeout: %s kwargs: %s",url,params,timeout, kwargs)
            response = await self.websession.delete(url, params=params, timeout=timeout, **kwargs)
            return await self.check_response(response, [200, 204])
        except TimeoutError as error:
            if suppress_timeout:
                _LOGGER.debug("Timeout occurred but was suppressed: %s", error)
                return None
            raise PvApiConnectionError("Timeout in communicating with PowerView Hub") from error
        except aiohttp.ClientError as error:
            raise PvApiConnectionError("Failed to communicate with PowerView Hub") from error
        finally:
            if response is not None:
                await response.release()