File: parser.py

package info (click to toggle)
airthings-ble 1.2.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 348 kB
  • sloc: python: 1,577; makefile: 3
file content (578 lines) | stat: -rw-r--r-- 21,490 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
"""Parser for Airthings BLE devices"""

from __future__ import annotations

import asyncio
import dataclasses
import re
from collections import namedtuple
from functools import partial
from logging import Logger

from async_interrupt import interrupt
from bleak import BleakClient, BleakError
from bleak.backends.device import BLEDevice
from bleak.backends.service import BleakGATTService
from bleak_retry_connector import BleakClientWithServiceCache, establish_connection

from airthings_ble.airthings_firmware import AirthingsFirmwareVersion
from airthings_ble.atom.request_path import AtomRequestPath
from airthings_ble.command_decode import COMMAND_DECODERS, AtomCommandDecode
from airthings_ble.radon_level import get_radon_level
from airthings_ble.sensor_decoders import SENSOR_DECODERS

from .const import (
    ATOM_BAT,
    ATOM_CO2,
    ATOM_HUMIDITY,
    ATOM_LUX,
    ATOM_NOISE,
    ATOM_PRESSURE,
    ATOM_RADON_1DAY_AVG,
    ATOM_RADON_MONTH_AVG,
    ATOM_RADON_WEEK_AVG,
    ATOM_RADON_YEAR_AVG,
    ATOM_TEMPERATURE,
    ATOM_VOC,
    BATTERY,
    BQ_TO_PCI_MULTIPLIER,
    CHAR_UUID_DATETIME,
    CHAR_UUID_DEVICE_NAME,
    CHAR_UUID_FIRMWARE_REV,
    CHAR_UUID_HARDWARE_REV,
    CHAR_UUID_HUMIDITY,
    CHAR_UUID_ILLUMINANCE_ACCELEROMETER,
    CHAR_UUID_MANUFACTURER_NAME,
    CHAR_UUID_MODEL_NUMBER_STRING,
    CHAR_UUID_RADON_1DAYAVG,
    CHAR_UUID_RADON_LONG_TERM_AVG,
    CHAR_UUID_SERIAL_NUMBER_STRING,
    CHAR_UUID_TEMPERATURE,
    CHAR_UUID_WAVE_2_DATA,
    CHAR_UUID_WAVE_PLUS_DATA,
    CHAR_UUID_WAVEMINI_DATA,
    CO2,
    COMMAND_UUID_ATOM,
    COMMAND_UUID_ATOM_NOTIFY,
    COMMAND_UUID_WAVE_2,
    COMMAND_UUID_WAVE_MINI,
    COMMAND_UUID_WAVE_PLUS,
    DEFAULT_MAX_UPDATE_ATTEMPTS,
    HUMIDITY,
    ILLUMINANCE,
    LUX,
    NOISE,
    PRESSURE,
    RADON_1DAY_AVG,
    RADON_1DAY_LEVEL,
    RADON_LONGTERM_AVG,
    RADON_LONGTERM_LEVEL,
    RADON_MONTH_AVG,
    RADON_MONTH_LEVEL,
    RADON_WEEK_AVG,
    RADON_WEEK_LEVEL,
    RADON_YEAR_AVG,
    RADON_YEAR_LEVEL,
    TEMPERATURE,
    UPDATE_TIMEOUT,
    VOC,
)
from .device_type import AirthingsDeviceType

Characteristic = namedtuple("Characteristic", ["uuid", "name", "format"])

wave_gen_1_device_info_characteristics = [
    Characteristic(CHAR_UUID_MANUFACTURER_NAME, "manufacturer", "utf-8"),
    Characteristic(CHAR_UUID_SERIAL_NUMBER_STRING, "serial_nr", "utf-8"),
    Characteristic(CHAR_UUID_DEVICE_NAME, "device_name", "utf-8"),
    Characteristic(CHAR_UUID_FIRMWARE_REV, "firmware_rev", "utf-8"),
]
device_info_characteristics = wave_gen_1_device_info_characteristics + [
    Characteristic(CHAR_UUID_HARDWARE_REV, "hardware_rev", "utf-8"),
]
_CHARS_BY_MODELS = {
    "2900": wave_gen_1_device_info_characteristics,
}

sensors_characteristics_uuid = [
    CHAR_UUID_DATETIME,
    CHAR_UUID_TEMPERATURE,
    CHAR_UUID_HUMIDITY,
    CHAR_UUID_RADON_1DAYAVG,
    CHAR_UUID_RADON_LONG_TERM_AVG,
    CHAR_UUID_ILLUMINANCE_ACCELEROMETER,
    CHAR_UUID_WAVE_PLUS_DATA,
    CHAR_UUID_WAVE_2_DATA,
    CHAR_UUID_WAVEMINI_DATA,
    COMMAND_UUID_WAVE_2,
    COMMAND_UUID_WAVE_PLUS,
    COMMAND_UUID_WAVE_MINI,
]
sensors_characteristics_uuid_str = [str(x) for x in sensors_characteristics_uuid]


class DisconnectedError(Exception):
    """Disconnected from device."""


class UnsupportedDeviceError(Exception):
    """Unsupported device."""


def short_address(address: str) -> str:
    """Convert a Bluetooth address to a short address."""
    return address.replace("-", "").replace(":", "")[-6:].upper()


# pylint: disable=too-many-instance-attributes


@dataclasses.dataclass
class AirthingsDeviceInfo:
    """Response data with information about the Airthings device without sensors."""

    manufacturer: str = ""
    hw_version: str = ""
    sw_version: str = ""
    model: AirthingsDeviceType = AirthingsDeviceType.UNKNOWN
    name: str = ""
    identifier: str = ""
    address: str = ""
    did_first_sync: bool = False

    def friendly_name(self) -> str:
        """Generate a name for the device."""

        return f"Airthings {self.model.product_name}"


@dataclasses.dataclass
class AirthingsDevice(AirthingsDeviceInfo):
    """Response data with information about the Airthings device"""

    firmware = AirthingsFirmwareVersion()

    sensors: dict[str, str | float | None] = dataclasses.field(
        default_factory=lambda: {}
    )

    def friendly_name(self) -> str:
        """Generate a name for the device."""

        return f"Airthings {self.model.product_name}"


# pylint: disable=too-many-locals
# pylint: disable=too-many-branches
# pylint: disable=too-few-public-methods
class AirthingsBluetoothDeviceData:
    """Data for Airthings BLE sensors."""

    def __init__(
        self,
        logger: Logger,
        is_metric: bool = True,
        max_attempts: int = DEFAULT_MAX_UPDATE_ATTEMPTS,
    ) -> None:
        """Initialize the Airthings BLE sensor data object."""
        self.logger = logger
        self.is_metric = is_metric
        self.device_info = AirthingsDeviceInfo()
        self.max_attempts = max_attempts

    def set_max_attempts(self, max_attempts: int) -> None:
        """Set the number of attempts."""
        self.max_attempts = max_attempts

    async def _get_device_characteristics(
        self, client: BleakClient, device: AirthingsDevice
    ) -> None:
        device_info = self.device_info
        device_info.address = client.address
        did_first_sync = device_info.did_first_sync

        device.firmware.update_current_version(device_info.sw_version)

        # We need to fetch model to determ what to fetch.
        if not did_first_sync:
            try:
                data = await client.read_gatt_char(CHAR_UUID_MODEL_NUMBER_STRING)
            except BleakError as err:
                self.logger.debug("Get device characteristics exception: %s", err)
                return

            device_info.model = AirthingsDeviceType.from_raw_value(data.decode("utf-8"))
            if device_info.model == AirthingsDeviceType.UNKNOWN:
                raise UnsupportedDeviceError(
                    f"Model {data.decode('utf-8')} is not supported"
                )

        characteristics = _CHARS_BY_MODELS.get(
            device_info.model.raw_value, device_info_characteristics
        )

        self.logger.debug("Fetching device info characteristics: %s", characteristics)

        for characteristic in characteristics:
            if did_first_sync and characteristic.name != "firmware_rev":
                # Only the sw_version can change once set, so we can skip the rest.
                continue

            try:
                data = await client.read_gatt_char(characteristic.uuid)
            except BleakError as err:
                self.logger.debug("Get device characteristics exception: %s", err)
                continue
            if characteristic.name == "manufacturer":
                device_info.manufacturer = data.decode(characteristic.format)
            elif characteristic.name == "hardware_rev":
                device_info.hw_version = data.decode(characteristic.format)
            elif characteristic.name == "firmware_rev":
                device_info.sw_version = data.decode(characteristic.format)
            elif characteristic.name == "device_name":
                device_info.name = data.decode(characteristic.format)
            elif characteristic.name == "serial_nr":
                identifier = data.decode(characteristic.format)
                # Some devices return `Serial Number` on Mac instead of
                # the actual serial number.
                if identifier != "Serial Number":
                    device_info.identifier = identifier
            else:
                self.logger.debug(
                    "Characteristics not handled: %s", characteristic.uuid
                )

        if (
            device_info.model == AirthingsDeviceType.WAVE_GEN_1
            and device_info.name
            and not device_info.identifier
        ):
            # For the Wave gen. 1 we need to fetch the identifier in the device name.
            # Example: From `AT#123456-2900Radon` we need to fetch `123456`.
            wave1_identifier = re.search(r"(?<=\#)[0-9]{1,6}", device_info.name)
            if wave1_identifier is not None and len(wave1_identifier.group()) == 6:
                device_info.identifier = wave1_identifier.group()

        # In some cases the device name will be empty, for example when using a Mac.
        if not device_info.name:
            device_info.name = device_info.friendly_name()

        if device_info.model:
            device_info.did_first_sync = True

        # Copy the cached device_info to device
        for field in dataclasses.fields(device_info):
            name = field.name
            setattr(device, name, getattr(device_info, name))

    async def _get_service_characteristics(
        self, client: BleakClient, device: AirthingsDevice
    ) -> None:
        svcs = client.services
        sensors = device.sensors
        for service in svcs:
            if (
                (
                    str(COMMAND_UUID_ATOM)
                    in (str(x.uuid) for x in service.characteristics)
                )
                and (
                    str(COMMAND_UUID_ATOM_NOTIFY)
                    in (str(x.uuid) for x in service.characteristics)
                )
                and device.model in AirthingsDeviceType.atom_devices()
            ):
                await self._atom_sensor_data(client, device, sensors, service)
            else:
                await self._wave_sensor_data(client, device, sensors, service)

    async def _wave_sensor_data(
        self,
        client: BleakClient,
        device: AirthingsDevice,
        sensors: dict[str, str | float | None],
        service: BleakGATTService,
    ) -> None:
        for characteristic in service.characteristics:
            uuid = characteristic.uuid
            uuid_str = str(uuid)
            if uuid in sensors_characteristics_uuid_str and uuid_str in SENSOR_DECODERS:
                try:
                    data = await client.read_gatt_char(characteristic)
                except BleakError as err:
                    self.logger.debug("Get service characteristics exception: %s", err)
                    continue

                sensor_data = SENSOR_DECODERS[uuid_str](data)

                # Skipping for now
                if "date_time" in sensor_data:
                    sensor_data.pop("date_time")

                sensors.update(sensor_data)

                # Manage radon values
                if (d := sensor_data.get(RADON_1DAY_AVG)) is not None:
                    sensors[RADON_1DAY_LEVEL] = get_radon_level(float(d))
                    if not self.is_metric:
                        sensors[RADON_1DAY_AVG] = float(d) * BQ_TO_PCI_MULTIPLIER
                if (d := sensor_data.get(RADON_LONGTERM_AVG)) is not None:
                    sensors[RADON_LONGTERM_LEVEL] = get_radon_level(float(d))
                    if not self.is_metric:
                        sensors[RADON_LONGTERM_AVG] = float(d) * BQ_TO_PCI_MULTIPLIER

            if uuid_str in COMMAND_DECODERS:
                decoder = COMMAND_DECODERS[uuid_str]
                command_data_receiver = decoder.make_data_receiver()

                # Set up the notification handlers
                await client.start_notify(characteristic, command_data_receiver)
                # send command to this 'indicate' characteristic
                await client.write_gatt_char(characteristic, bytearray(decoder.cmd))
                # Wait for up to one second to see if a callback comes in.
                try:
                    await command_data_receiver.wait_for_message(5)
                except asyncio.TimeoutError:
                    self.logger.warning("Timeout getting command data.")

                command_sensor_data = decoder.decode_data(
                    logger=self.logger, raw_data=command_data_receiver.message
                )
                if command_sensor_data is not None:
                    new_values: dict[str, float | str | None] = {}

                    if (bat_data := command_sensor_data.get(BATTERY)) is not None:
                        new_values[BATTERY] = device.model.battery_percentage(
                            float(bat_data)
                        )

                    if illuminance := command_sensor_data.get(ILLUMINANCE):
                        new_values[ILLUMINANCE] = illuminance

                    sensors.update(new_values)

                # Stop notification handler
                await client.stop_notify(characteristic)

    async def _atom_sensor_data(
        self,
        client: BleakClient,
        device: AirthingsDevice,
        sensors: dict[str, str | float | None],
        service: BleakGATTService,
    ) -> None:
        """Get sensor data from the device."""
        device.firmware = device.model.need_firmware_upgrade(
            self.device_info.sw_version
        )

        if device.firmware.need_firmware_upgrade:
            self.logger.warning(
                "The firmware for this device (%s) is not up to date, "
                "please update to %s or newer using the Airthings app.",
                self.device_info.address,
                device.firmware.required_version or "N/A",
            )

        connectivity_data = await self._create_decoder_and_fetch(
            client=client,
            service=service,
            url=AtomRequestPath.CONNECTIVITY_MODE,
        )
        if connectivity_data is not None:
            sensors.update(connectivity_data)

        sensor_data = await self._create_decoder_and_fetch(
            client=client,
            service=service,
            url=AtomRequestPath.LATEST_VALUES,
        )
        if sensor_data is not None:
            self._parse_sensor_data(
                device=device,
                sensors=sensors,
                sensor_data=sensor_data,
            )

    async def _create_decoder_and_fetch(
        self,
        client: BleakClient,
        service: BleakGATTService,
        url: AtomRequestPath,
    ) -> dict[str, float | str | None] | None:
        """Create decoder and fetch data."""
        decoder = AtomCommandDecode(url=url)
        command_data_receiver = decoder.make_data_receiver()

        atom_write = service.get_characteristic(COMMAND_UUID_ATOM)
        atom_notify = service.get_characteristic(COMMAND_UUID_ATOM_NOTIFY)

        if atom_write is None or atom_notify is None:
            raise ValueError("Missing characteristics for device")

        # Set up the notification handlers
        await client.start_notify(
            char_specifier=atom_notify, callback=command_data_receiver
        )

        # send command to this 'indicate' characteristic
        await client.write_gatt_char(atom_write, bytearray(decoder.cmd))
        # Wait for up to five seconds to see if a callback comes in.
        try:
            await command_data_receiver.wait_for_message(5)
        except asyncio.TimeoutError:
            self.logger.warning("Timeout getting command data.")

        data = decoder.decode_data(
            logger=self.logger,
            raw_data=command_data_receiver.message,
        )

        await client.stop_notify(atom_notify)

        return data

    def _parse_sensor_data(
        self,
        device: AirthingsDevice,
        sensors: dict[str, str | float | None],
        sensor_data: dict[str, float | str | None],
    ) -> None:
        """Parse sensor data from the device."""
        if sensor_data is not None:
            new_values: dict[str, float | str | None] = {}

            if (bat_data := sensor_data.get(ATOM_BAT)) is not None:
                new_values[BATTERY] = device.model.battery_percentage(
                    float(bat_data) / 1000.0
                )

            if (lux := sensor_data.get(ATOM_LUX)) is not None:
                new_values[LUX] = lux

            if (co2 := sensor_data.get(ATOM_CO2)) is not None:
                new_values[CO2] = co2

            if (voc := sensor_data.get(ATOM_VOC)) is not None:
                new_values[VOC] = voc

            if (hum := sensor_data.get(ATOM_HUMIDITY)) is not None:
                new_values[HUMIDITY] = float(hum) / 100.0

            if (temperature := sensor_data.get(ATOM_TEMPERATURE)) is not None:
                # Temperature reported as kelvin
                new_values[TEMPERATURE] = round(float(temperature) / 100.0 - 273.15, 2)

            if (noise := sensor_data.get(ATOM_NOISE)) is not None:
                new_values[NOISE] = noise

            if (pressure := sensor_data.get(ATOM_PRESSURE)) is not None:
                new_values[PRESSURE] = float(pressure) / (64 * 100)

            if (radon_1day_avg := sensor_data.get(ATOM_RADON_1DAY_AVG)) is not None:
                new_values[RADON_1DAY_AVG] = (
                    float(radon_1day_avg)
                    if self.is_metric
                    else float(radon_1day_avg) * BQ_TO_PCI_MULTIPLIER
                )
                new_values[RADON_1DAY_LEVEL] = get_radon_level(float(radon_1day_avg))

            if (radon_week_avg := sensor_data.get(ATOM_RADON_WEEK_AVG)) is not None:
                new_values[RADON_WEEK_AVG] = (
                    float(radon_week_avg)
                    if self.is_metric
                    else float(radon_week_avg) * BQ_TO_PCI_MULTIPLIER
                )
                new_values[RADON_WEEK_LEVEL] = get_radon_level(float(radon_week_avg))

            if (radon_month_avg := sensor_data.get(ATOM_RADON_MONTH_AVG)) is not None:
                new_values[RADON_MONTH_AVG] = (
                    float(radon_month_avg)
                    if self.is_metric
                    else float(radon_month_avg) * BQ_TO_PCI_MULTIPLIER
                )
                new_values[RADON_MONTH_LEVEL] = get_radon_level(float(radon_month_avg))

            if (radon_year_avg := sensor_data.get(ATOM_RADON_YEAR_AVG)) is not None:
                new_values[RADON_YEAR_AVG] = (
                    float(radon_year_avg)
                    if self.is_metric
                    else float(radon_year_avg) * BQ_TO_PCI_MULTIPLIER
                )
                new_values[RADON_YEAR_LEVEL] = get_radon_level(float(radon_year_avg))

            self.logger.debug("Sensor values: %s", new_values)

            sensors.update(new_values)

    def _handle_disconnect(
        self, disconnect_future: asyncio.Future[bool], client: BleakClient
    ) -> None:
        """Handle disconnect from device."""
        self.logger.debug("Disconnected from %s", client.address)
        if not disconnect_future.done():
            disconnect_future.set_result(True)

    async def update_device(self, ble_device: BLEDevice) -> AirthingsDevice:
        """Connects to the device through BLE and retrieves relevant data"""
        # Try to abort early if the device name indicates it is not supported.
        # In some cases we only get the mac address, so we need to connect to
        # the device to get the name.
        if name := ble_device.name:
            if "Renew" in name or "View" in name:
                raise UnsupportedDeviceError(f"Model {name} is not supported")
        for attempt in range(self.max_attempts):
            is_final_attempt = attempt == self.max_attempts - 1
            try:
                return await self._update_device(ble_device)
            except DisconnectedError:
                if is_final_attempt:
                    raise
                self.logger.debug(
                    "Unexpectedly disconnected from %s", ble_device.address
                )
            except BleakError as err:
                if is_final_attempt:
                    raise
                self.logger.debug("Bleak error: %s", err)
        raise RuntimeError("Should not reach this point")

    async def _update_device(self, ble_device: BLEDevice) -> AirthingsDevice:
        """Connects to the device through BLE and retrieves relevant data"""
        device = AirthingsDevice()
        loop = asyncio.get_running_loop()
        disconnect_future = loop.create_future()
        client: BleakClientWithServiceCache = (
            await establish_connection(  # pylint: disable=line-too-long
                BleakClientWithServiceCache,
                ble_device,
                ble_device.address,
                disconnected_callback=partial(
                    self._handle_disconnect, disconnect_future
                ),
            )
        )
        try:
            async with (
                interrupt(
                    disconnect_future,
                    DisconnectedError,
                    f"Disconnected from {client.address}",
                ),
                asyncio.timeout(UPDATE_TIMEOUT),
            ):
                await self._get_device_characteristics(client, device)
                await self._get_service_characteristics(client, device)
        except BleakError as err:
            if "not found" in str(err):  # In future bleak this is a named exception
                # Clear the char cache since a char is likely
                # missing from the cache
                await client.clear_cache()
            raise
        except UnsupportedDeviceError:
            await client.disconnect()
            raise
        finally:
            await client.disconnect()

        return device