1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201
|
"""Class containing the async http methods."""
import asyncio
import logging
import aiohttp
_LOGGER = logging.getLogger(__name__)
class PvApiError(Exception):
"""General Api error."""
class PvApiResponseStatusError(PvApiError):
"""Wrong http response error."""
class PvApiMaintenance(PvApiError):
"""Hub is undergoing maintenance."""
class PvApiConnectionError(PvApiError):
"""Problem connecting to PowerView Hub."""
class PvApiEmptyData(PvApiError):
"""PowerView Hub returned empty data."""
class AioRequest:
"""Request class managing Powerview Hub connection."""
def __init__(
self,
hub_ip,
loop=None,
websession=None,
timeout: int = 15,
api_version: int | None = None,
) -> None:
"""Initialize request class."""
self.hub_ip = hub_ip
self._timeout = timeout
if loop:
self.loop = loop
else:
self.loop = asyncio.get_event_loop()
if websession:
self.websession = websession
else:
self.websession = aiohttp.ClientSession()
self.api_version: int | None = api_version
self._last_request_status: int = 0
_LOGGER.debug("Powerview api version: %s", self.api_version)
@property
def api_path(self) -> str:
"""Return the initial api call path."""
if self.api_version and self.api_version >= 3:
return "home"
return "api"
async def check_response(self, response, valid_response_codes):
"""Check the response for correctness."""
_val = None
if response.status == 403 and self._last_request_status == 423:
# if last status was hub undergoing maint then it is common
# on reboot for a 403 response. Generally this should raise
# PvApiResponseStatusError but as this is unavoidable we
# class this situation as still undergoing maintenance
_val = False
elif response.status in [204, 423]:
# 423 hub under maintenance, returns data, but not shade
_val = True
elif response.status in valid_response_codes:
_val = await response.json()
# store the status for next check
self._last_request_status = response.status
# raise a maintenance error
if isinstance(_val, bool):
raise PvApiMaintenance("Powerview Hub is undergoing maintenance")
# if none of the above checks passed, raise a response error
if _val is None:
raise PvApiResponseStatusError(response.status)
# finally, return the result
return _val
async def get(self, url: str, params: str = None, suppress_timeout: bool = False, **kwargs) -> dict:
"""Get a resource.
:param url: The URL to fetch.
:param params: Dictionary or bytes to be sent in the query string of the new request
(optional).
:param suppress_timeout: Stermine if timeouts will return an error
:param kwargs: Keyword arguments to be passed to aiohttp ClientSession get method.
For example, timeout can be passed as kwargs.
:return: A dictionary representing the JSON response.
"""
response = None
try:
timeout = kwargs.pop("timeout", None) or self._timeout
_LOGGER.debug("Sending GET request to: %s params: %s timeout: %s kwargs: %s", url, params, timeout, kwargs)
response = await self.websession.get(url, params=params, timeout=timeout, **kwargs)
return await self.check_response(response, [200, 204])
except TimeoutError as error:
if suppress_timeout:
_LOGGER.debug("Timeout occurred but was suppressed: %s", error)
return None
raise PvApiConnectionError("Timeout in communicating with PowerView Hub") from error
except aiohttp.ClientError as error:
raise PvApiConnectionError("Failed to communicate with PowerView Hub") from error
finally:
if response is not None:
await response.release()
async def post(self, url: str, data: dict = None, suppress_timeout: bool = False, **kwargs):
"""Post a resource update.
:param url: The URL to fetch.
:param data: Dictionary later converted to json. Sent in the request
:param suppress_timeout: Stermine if timeouts will return an error
:param kwargs: Keyword arguments to be passed to aiohttp ClientSession get method.
For example, timeout can be passed as kwargs.
:return: A dictionary representing the JSON response.
"""
response = None
try:
timeout = kwargs.pop("timeout", None) or self._timeout
_LOGGER.debug("Sending POST request to: %s data: %s timeout: %s kwargs: %s", url, data, timeout, kwargs)
response = await self.websession.post(url,json=data,timeout=timeout,**kwargs)
return await self.check_response(response, [200, 201])
except TimeoutError as error:
if suppress_timeout:
_LOGGER.debug("Timeout occurred but was suppressed: %s", error)
return None
raise PvApiConnectionError("Timeout in communicating with PowerView Hub") from error
except aiohttp.ClientError as error:
raise PvApiConnectionError("Failed to communicate with PowerView Hub") from error
finally:
if response is not None:
await response.release()
async def put(self, url: str, data: dict = None, params=None, suppress_timeout: bool = False, **kwargs):
"""Do a put request.
:param url: The URL to fetch.
:param data: Dictionary later converted to json. Sent in the request
:param params: Dictionary or bytes to be sent in the query string of the new request
(optional).
:param suppress_timeout: Stermine if timeouts will return an error
:param kwargs: Keyword arguments to be passed to aiohttp ClientSession get method.
For example, timeout can be passed as kwargs.
:return: A dictionary representing the JSON response.
"""
response = None
try:
timeout = kwargs.pop("timeout", None) or self._timeout
_LOGGER.debug("Sending PUT request to: %s params: %s data: %s timeout: %s kwargs: %s", url, params, data, timeout, kwargs)
response = await self.websession.put(url, json=data, params=params, timeout=timeout, **kwargs)
return await self.check_response(response, [200, 204])
except TimeoutError as error:
if suppress_timeout:
_LOGGER.debug("Timeout occurred but was suppressed: %s", error)
return None
raise PvApiConnectionError("Timeout in communicating with PowerView Hub") from error
except aiohttp.ClientError as error:
raise PvApiConnectionError("Failed to communicate with PowerView Hub") from error
finally:
if response is not None:
await response.release()
async def delete(self, url: str, params: dict = None, suppress_timeout: bool = False, **kwargs):
"""Delete a resource.
:param url: Endpoint
:param params: parameters
:return: Response body
:raises PvApiError when something is wrong.
"""
response = None
try:
timeout = kwargs.pop("timeout", None) or self._timeout
_LOGGER.debug("Sending DELETE request to: %s params: %s timeout: %s kwargs: %s",url,params,timeout, kwargs)
response = await self.websession.delete(url, params=params, timeout=timeout, **kwargs)
return await self.check_response(response, [200, 204])
except TimeoutError as error:
if suppress_timeout:
_LOGGER.debug("Timeout occurred but was suppressed: %s", error)
return None
raise PvApiConnectionError("Timeout in communicating with PowerView Hub") from error
except aiohttp.ClientError as error:
raise PvApiConnectionError("Failed to communicate with PowerView Hub") from error
finally:
if response is not None:
await response.release()
|