1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202
|
"""Support for Airthings sensor."""
from __future__ import annotations
import asyncio
from dataclasses import dataclass
import json
import logging
from aiohttp import ClientError
import async_timeout
_LOGGER = logging.getLogger(__name__)
API_URL = "https://ext-api.airthings.com/v1/"
TIMEOUT = 10
@dataclass
class AirthingsLocation:
"""Airthings location."""
location_id: str
name: str
@classmethod
def init_from_response(cls, response):
"""Class method."""
return cls(
response.get("id"),
response.get("name"),
)
@dataclass
class AirthingsDevice:
"""Airthings device."""
device_id: str
name: str
sensors: dict[str, float | None]
is_active: bool
location_name: str
device_type: str | None
product_name: str | None
@classmethod
def init_from_response(cls, response, location_name, device):
"""Class method."""
return cls(
response.get("id"),
response.get("segment").get("name"),
response.get("data"),
response.get("segment").get("isActive"),
location_name,
device.get('deviceType'),
device.get('productName'),
)
@property
def sensor_types(self):
"""Sensor types."""
return self.sensors.keys()
class AirthingsError(Exception):
"""General Airthings exception occurred."""
class AirthingsConnectionError(AirthingsError):
"""ConnectionError Airthings occurred."""
class AirthingsAuthError(AirthingsError):
"""AirthingsAuthError Airthings occurred."""
class Airthings:
"""Airthings data handler."""
def __init__(self, client_id, secret, websession):
"""Init Airthings data handler."""
self._client_id = client_id
self._secret = secret
self._websession = websession
self._access_token = None
self._locations = []
self._devices = {}
async def update_devices(self):
"""Update data."""
if not self._locations:
response = await self._request(API_URL + "locations")
json_data = await response.json()
self._locations = []
for location in json_data.get("locations"):
self._locations.append(AirthingsLocation.init_from_response(location))
if not self._devices:
response = await self._request(API_URL + "devices")
json_data = await response.json()
self._devices = {}
for device in json_data.get("devices"):
self._devices[device['id']] = device
res = {}
for location in self._locations:
if not location.location_id:
continue
response = await self._request(
API_URL + f"/locations/{location.location_id}/latest-samples"
)
if response is None:
continue
json_data = await response.json()
if json_data is None:
continue
if devices := json_data.get("devices"):
for device in devices:
id = device.get('id')
res[id] = AirthingsDevice.init_from_response(
device,
location.name,
self._devices.get(id)
)
else:
_LOGGER.debug("No devices in location '%s'", location.name)
return res
async def _request(self, url, json_data=None, retry=3):
_LOGGER.debug("Request %s %s, %s", url, retry, json_data)
if self._access_token is None:
self._access_token = await get_token(
self._websession, self._client_id, self._secret
)
if self._access_token is None:
return None
headers = {"Authorization": self._access_token}
try:
async with async_timeout.timeout(TIMEOUT):
if json_data:
response = await self._websession.post(
url, json=json_data, headers=headers
)
else:
response = await self._websession.get(url, headers=headers)
if response.status != 200:
if retry > 0 and response.status != 429:
self._access_token = None
return await self._request(url, json_data, retry=retry - 1)
_LOGGER.error(
"Error connecting to Airthings, response: %s %s",
response.status,
response.reason,
)
raise AirthingsError(
f"Error connecting to Airthings, response: {response.reason}"
)
except ClientError as err:
self._access_token = None
_LOGGER.error("Error connecting to Airthings: %s ", err, exc_info=True)
raise AirthingsError from err
except asyncio.TimeoutError as err:
self._access_token = None
if retry > 0:
return await self._request(url, json_data, retry=retry - 1)
_LOGGER.error("Timed out when connecting to Airthings")
raise AirthingsError from err
return response
async def get_token(websession, client_id, secret, retry=3, timeout=10):
"""Get token for Airthings."""
try:
async with async_timeout.timeout(timeout):
response = await websession.post(
"https://accounts-api.airthings.com/v1/token",
headers={
"Content-type": "application/x-www-form-urlencoded",
"Accept": "application/json",
},
data={
"grant_type": "client_credentials",
"client_id": client_id,
"client_secret": secret,
},
)
except ClientError as err:
if retry > 0:
return await get_token(websession, client_id, secret, retry - 1, timeout)
_LOGGER.error("Error getting token Airthings: %s ", err, exc_info=True)
raise AirthingsConnectionError from err
except asyncio.TimeoutError as err:
if retry > 0:
return await get_token(websession, client_id, secret, retry - 1, timeout)
_LOGGER.error("Timed out when connecting to Airthings for token")
raise AirthingsConnectionError from err
if response.status != 200:
_LOGGER.error(
"Airthings: Failed to login to retrieve token: %s %s",
response.status,
response.reason,
)
raise AirthingsAuthError(f"Failed to login to retrieve token {response.reason}")
token_data = json.loads(await response.text())
return token_data.get("access_token")
|