1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89
|
"""YoLink client."""
from typing import Any, Dict
from aiohttp import ClientError, ClientResponse
from tenacity import retry, stop_after_attempt, retry_if_exception_type
from .auth_mgr import YoLinkAuthMgr
from .exception import YoLinkClientError, YoLinkDeviceConnectionFailed
from .model import BRDP
class YoLinkClient:
"""YoLink client."""
def __init__(self, auth_mgr: YoLinkAuthMgr) -> None:
"""Init YoLink client"""
self._auth_mgr = auth_mgr
async def request(
self,
method: str,
url: str,
auth_required: bool = True,
**kwargs: Any,
) -> ClientResponse:
"""Proxy Request and add Auth/CV headers."""
headers = kwargs.pop("headers", {})
params = kwargs.pop("params", None)
data = kwargs.pop("data", None)
timeout = kwargs.pop("timeout", 8)
# Extra, user supplied values
extra_headers = kwargs.pop("extra_headers", None)
extra_params = kwargs.pop("extra_params", None)
extra_data = kwargs.pop("extra_data", None)
if auth_required:
# Ensure token valid
await self._auth_mgr.check_and_refresh_token()
# Set auth header
headers["Authorization"] = self._auth_mgr.http_auth_header()
# Extend with optionally supplied values
if extra_headers:
headers.update(extra_headers)
if extra_params:
# Query parameters
params = params or {}
params.update(extra_params)
if extra_data:
# form encoded post data
data = data or {}
data.update(extra_data)
return await self._auth_mgr.client_session().request(
method,
url,
**kwargs,
headers=headers,
params=params,
data=data,
timeout=timeout,
)
async def get(self, url: str, **kwargs: Any) -> ClientResponse:
"""Call http request with Get Method."""
return await self.request("GET", url, True, **kwargs)
async def post(self, url: str, **kwargs: Any) -> ClientResponse:
"""Call Http Request with POST Method"""
return await self.request("POST", url, True, **kwargs)
@retry(
retry=retry_if_exception_type(YoLinkDeviceConnectionFailed),
stop=stop_after_attempt(2),
)
async def execute(self, url: str, bsdp: Dict, **kwargs: Any) -> BRDP:
"""Call YoLink Api"""
try:
yl_resp = await self.post(url, json=bsdp, **kwargs)
yl_resp.raise_for_status()
_yl_body = await yl_resp.text()
brdp = BRDP.model_validate_json(_yl_body)
brdp.check_response()
except ClientError as client_err:
raise YoLinkClientError(
"-1003", "yolink client request failed!"
) from client_err
except YoLinkClientError as yl_client_err:
raise yl_client_err
return brdp
|