1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
|
"""util for SwitchBot API."""
import aiohttp
from aiohttp import ClientResponse
from .exceptions import SwitchBotDeviceRequestError
def check_response_status(response: ClientResponse) -> None:
"""Check https response status."""
if response.status != 200:
msg = f"status code != 200 (actual: {response.status})"
raise SwitchBotDeviceRequestError(msg)
async def get_file_stream_from_cloud(url: str, timeout: float = 5) -> bytes:
"""Get file stream from cloud."""
# now only for download <AI Art Frame> Picture
try:
async with (
aiohttp.ClientSession() as session,
session.get(url, timeout=aiohttp.ClientTimeout(total=timeout)) as response,
):
check_response_status(response)
return await response.read()
except Exception as e:
msg = f"{e}"
raise SwitchBotDeviceRequestError(msg) from e
|