1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147
|
"""
Async callbacks with a queue and external consumer
--------------------------------------------------
An example showing how async notification callbacks can be used to
send data received through notifications to some external consumer of
that data.
Created on 2021-02-25 by hbldh <henrik.blidh@nedomkull.com>
"""
import argparse
import asyncio
import logging
import time
from typing import Optional
from bleak import BleakClient, BleakScanner
from bleak.backends.characteristic import BleakGATTCharacteristic
logger = logging.getLogger(__name__)
class DeviceNotFoundError(Exception):
pass
class Args(argparse.Namespace):
name: Optional[str]
address: Optional[str]
characteristic: str
macos_use_bdaddr: bool
services: list[str]
debug: bool
async def run_ble_client(
args: Args, queue: asyncio.Queue[tuple[float, Optional[bytearray]]]
):
logger.info("starting scan...")
if args.address:
device = await BleakScanner.find_device_by_address(
args.address, cb={"use_bdaddr": args.macos_use_bdaddr}
)
if device is None:
logger.error("could not find device with address '%s'", args.address)
raise DeviceNotFoundError
elif args.name:
device = await BleakScanner.find_device_by_name(
args.name, cb={"use_bdaddr": args.macos_use_bdaddr}
)
if device is None:
logger.error("could not find device with name '%s'", args.name)
raise DeviceNotFoundError
else:
raise ValueError("Either --name or --address must be provided")
logger.info("connecting to device...")
async def callback_handler(_: BleakGATTCharacteristic, data: bytearray) -> None:
await queue.put((time.time(), data))
async with BleakClient(device) as client:
logger.info("connected")
await client.start_notify(args.characteristic, callback_handler)
await asyncio.sleep(10.0)
await client.stop_notify(args.characteristic)
# Send an "exit command to the consumer"
await queue.put((time.time(), None))
logger.info("disconnected")
async def run_queue_consumer(queue: asyncio.Queue[tuple[float, Optional[bytearray]]]):
logger.info("Starting queue consumer")
while True:
# Use await asyncio.wait_for(queue.get(), timeout=1.0) if you want a timeout for getting data.
epoch, data = await queue.get()
if data is None:
logger.info(
"Got message from client about disconnection. Exiting consumer loop..."
)
break
else:
logger.info("Received callback data via async queue at %s: %r", epoch, data)
async def main(args: Args) -> None:
queue: asyncio.Queue[tuple[float, Optional[bytearray]]] = asyncio.Queue()
client_task = run_ble_client(args, queue)
consumer_task = run_queue_consumer(queue)
try:
await asyncio.gather(client_task, consumer_task)
except DeviceNotFoundError:
pass
logger.info("Main method done.")
if __name__ == "__main__":
parser = argparse.ArgumentParser()
device_group = parser.add_mutually_exclusive_group(required=True)
device_group.add_argument(
"--name",
metavar="<name>",
help="the name of the bluetooth device to connect to",
)
device_group.add_argument(
"--address",
metavar="<address>",
help="the address of the bluetooth device to connect to",
)
parser.add_argument(
"--macos-use-bdaddr",
action="store_true",
help="when true use Bluetooth address instead of UUID on macOS",
)
parser.add_argument(
"characteristic",
metavar="<notify uuid>",
help="UUID of a characteristic that supports notifications",
)
parser.add_argument(
"-d",
"--debug",
action="store_true",
help="sets the logging level to debug",
)
args = parser.parse_args(namespace=Args())
log_level = logging.DEBUG if args.debug else logging.INFO
logging.basicConfig(
level=log_level,
format="%(asctime)-15s %(name)-8s %(levelname)s: %(message)s",
)
asyncio.run(main(args))
|