File: async_callback_with_queue.py

package info (click to toggle)
bleak 2.0.0-2
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 1,828 kB
  • sloc: python: 10,660; makefile: 165; java: 105
file content (147 lines) | stat: -rw-r--r-- 4,237 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
"""
Async callbacks with a queue and external consumer
--------------------------------------------------

An example showing how async notification callbacks can be used to
send data received through notifications to some external consumer of
that data.

Created on 2021-02-25 by hbldh <henrik.blidh@nedomkull.com>

"""

import argparse
import asyncio
import logging
import time
from typing import Optional

from bleak import BleakClient, BleakScanner
from bleak.backends.characteristic import BleakGATTCharacteristic

logger = logging.getLogger(__name__)


class DeviceNotFoundError(Exception):
    pass


class Args(argparse.Namespace):
    name: Optional[str]
    address: Optional[str]
    characteristic: str
    macos_use_bdaddr: bool
    services: list[str]
    debug: bool


async def run_ble_client(
    args: Args, queue: asyncio.Queue[tuple[float, Optional[bytearray]]]
):
    logger.info("starting scan...")

    if args.address:
        device = await BleakScanner.find_device_by_address(
            args.address, cb={"use_bdaddr": args.macos_use_bdaddr}
        )
        if device is None:
            logger.error("could not find device with address '%s'", args.address)
            raise DeviceNotFoundError
    elif args.name:
        device = await BleakScanner.find_device_by_name(
            args.name, cb={"use_bdaddr": args.macos_use_bdaddr}
        )
        if device is None:
            logger.error("could not find device with name '%s'", args.name)
            raise DeviceNotFoundError
    else:
        raise ValueError("Either --name or --address must be provided")

    logger.info("connecting to device...")

    async def callback_handler(_: BleakGATTCharacteristic, data: bytearray) -> None:
        await queue.put((time.time(), data))

    async with BleakClient(device) as client:
        logger.info("connected")
        await client.start_notify(args.characteristic, callback_handler)
        await asyncio.sleep(10.0)
        await client.stop_notify(args.characteristic)
        # Send an "exit command to the consumer"
        await queue.put((time.time(), None))

    logger.info("disconnected")


async def run_queue_consumer(queue: asyncio.Queue[tuple[float, Optional[bytearray]]]):
    logger.info("Starting queue consumer")

    while True:
        # Use await asyncio.wait_for(queue.get(), timeout=1.0) if you want a timeout for getting data.
        epoch, data = await queue.get()
        if data is None:
            logger.info(
                "Got message from client about disconnection. Exiting consumer loop..."
            )
            break
        else:
            logger.info("Received callback data via async queue at %s: %r", epoch, data)


async def main(args: Args) -> None:
    queue: asyncio.Queue[tuple[float, Optional[bytearray]]] = asyncio.Queue()
    client_task = run_ble_client(args, queue)
    consumer_task = run_queue_consumer(queue)

    try:
        await asyncio.gather(client_task, consumer_task)
    except DeviceNotFoundError:
        pass

    logger.info("Main method done.")


if __name__ == "__main__":
    parser = argparse.ArgumentParser()

    device_group = parser.add_mutually_exclusive_group(required=True)

    device_group.add_argument(
        "--name",
        metavar="<name>",
        help="the name of the bluetooth device to connect to",
    )
    device_group.add_argument(
        "--address",
        metavar="<address>",
        help="the address of the bluetooth device to connect to",
    )

    parser.add_argument(
        "--macos-use-bdaddr",
        action="store_true",
        help="when true use Bluetooth address instead of UUID on macOS",
    )

    parser.add_argument(
        "characteristic",
        metavar="<notify uuid>",
        help="UUID of a characteristic that supports notifications",
    )

    parser.add_argument(
        "-d",
        "--debug",
        action="store_true",
        help="sets the logging level to debug",
    )

    args = parser.parse_args(namespace=Args())

    log_level = logging.DEBUG if args.debug else logging.INFO
    logging.basicConfig(
        level=log_level,
        format="%(asctime)-15s %(name)-8s %(levelname)s: %(message)s",
    )

    asyncio.run(main(args))