1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211
|
import sys
from typing import TYPE_CHECKING
if TYPE_CHECKING:
if sys.platform != "linux":
assert False, "This is only available on Linux"
import asyncio
import contextlib
import logging
from typing import AsyncGenerator
from bumble.controller import Controller
from bumble.link import LocalLink
from bumble.transport import open_transport
from bumble.transport.common import Transport
from dbus_fast import BusType, Message, MessageType, Variant
from dbus_fast.aio.message_bus import MessageBus
from bleak._compat import timeout as async_timeout
from bleak.backends.bluezdbus import defs
from bleak.backends.bluezdbus.signals import MatchRules, add_match
from bleak.backends.bluezdbus.utils import assert_reply, get_dbus_authenticator
BLEAK_TEST_MANUFACTURER_ID = 0xB1EA
logger = logging.getLogger(__name__)
@contextlib.asynccontextmanager
async def open_message_bus() -> AsyncGenerator[MessageBus, None]:
"""
Open a D-Bus message bus connection.
"""
bus = MessageBus(bus_type=BusType.SYSTEM, auth=get_dbus_authenticator())
try:
await bus.connect()
yield bus
finally:
bus.disconnect()
await bus.wait_for_disconnect()
async def power_on_controller(
bus: MessageBus, adapter_path: str, timeout: float = 2.0
) -> None:
"""
Power on a Bluetooth controller via D-Bus.
It may take some time for the adapter to fully configure itself after
powering on. So we make multiple attempts until timeout is reached.
"""
async with async_timeout(timeout):
while True:
try:
reply = await bus.call(
Message(
destination=defs.BLUEZ_SERVICE,
path=adapter_path,
interface=defs.PROPERTIES_INTERFACE,
member="Set",
signature="ssv",
body=[defs.ADAPTER_INTERFACE, "Powered", Variant("b", True)],
)
)
assert_reply(reply)
return
except Exception as e:
logger.debug("Failed to power on adapter at %s: %s", adapter_path, e)
await asyncio.sleep(0.1)
@contextlib.asynccontextmanager
async def wait_for_new_adapter() -> (
AsyncGenerator[tuple[MessageBus, asyncio.Future[str]], None]
):
"""
Connect to D-Bus and wait for a new Bluetooth adapter to be added.
Yields a Future that will be resolved with the adapter path when
a new adapter is detected via InterfacesAdded signal.
"""
async with open_message_bus() as bus:
loop = asyncio.get_running_loop()
adapter_path_future: asyncio.Future[str] = loop.create_future()
def _on_interfaces_added(message: Message):
if message.message_type != MessageType.SIGNAL:
return
if message.member != "InterfacesAdded":
return
obj_path, ifaces = message.body
adapter = ifaces.get(defs.ADAPTER_INTERFACE)
if not adapter:
return
# Check for our test manufacturer ID to identify the adapter
if adapter["Manufacturer"].value != BLEAK_TEST_MANUFACTURER_ID:
return
if not adapter_path_future.done():
adapter_path_future.set_result(obj_path)
bus.add_message_handler(_on_interfaces_added)
try:
# Subscribe to InterfacesAdded signals
reply = await add_match(
bus,
MatchRules(
interface=defs.OBJECT_MANAGER_INTERFACE,
member="InterfacesAdded",
arg0path="/org/bluez/",
),
)
assert_reply(reply)
# Yield the future and bus for the caller to use
yield bus, adapter_path_future
finally:
bus.remove_message_handler(_on_interfaces_added)
def _clear_bit(flags: bytes, bit_pos: int) -> bytes:
int_flags = int.from_bytes(flags, byteorder="little")
int_flags &= ~(1 << bit_pos)
return int_flags.to_bytes(len(flags), byteorder="little")
@contextlib.asynccontextmanager
async def open_bluez_bluetooth_controller_link(
hci_transport_name: str,
) -> AsyncGenerator[LocalLink, None]:
"""
Open a local link (virtual RF connection) to a bumble Bluetooth
controller that is connected to BlueZ.
"""
async with wait_for_new_adapter() as (bus, adapter_path_future):
# Open a HCI transport connected to the OS
async with await open_transport(hci_transport_name) as hci_transport:
# Local link to create virtual RF connection between multiple Bluetooth Controllers
link = LocalLink()
# Bluetooth controller that BlueZ can connect to.
# (This will register itself to the link.)
bluez_controller = Controller(
"BLEAK-TEST-BLUEZ",
host_source=hci_transport.source,
host_sink=hci_transport.sink,
link=link,
)
bluez_controller.manufacturer_name = BLEAK_TEST_MANUFACTURER_ID
# HACK: Work around Bumble missing feature combined with Linux kernel
# requirement. https://github.com/google/bumble/issues/841
#
# According to the Bluetooth spec:
#
# C24: [HCI_LE_Enhanced_Connection_Complete event is] Mandatory if
# the LE Controller supports Connection State and either LE Feature (LL
# Privacy) or LE Feature (Extended Advertising) is supported, otherwise optional if
# the LE Controller supports Connection State, otherwise excluded.
#
# And the Linux kernel enforces this in hci_le_create_conn_sync().
# It will get a timeout if one of these features is enabled and the
# Enhanced Connection Complete event is not sent.
#
# However, Bumble (as of 0.0.220) always sends HCI_LE_Connection_Complete_Event
# in response to HCI_LE_Create_Connection_Command even when it should
# be sending HCI_LE_Enhanced_Connection_Complete_Event.
#
# For now, we can work around the issue by disabling LL Privacy and
# Extended Advertising features in the BlueZ controller.
#
# Ideally, this should be fixed in Bumble.
bluez_controller.le_features = _clear_bit(
bluez_controller.le_features, 6 # LL Privacy
)
bluez_controller.le_features = _clear_bit(
bluez_controller.le_features, 12 # Extended Advertising
)
# Wait up to 5 seconds for the new adapter to appear via InterfacesAdded
adapter_path = await asyncio.wait_for(adapter_path_future, timeout=5.0)
logger.info(f"New adapter detected at {adapter_path}")
# Ensure controller is powered on. This also ensures that BlueZ has fully
# initialized the adapter and it is ready for use.
await power_on_controller(bus, adapter_path)
# We dont need the bus anymore. We have done everything needed with it.
# Bleak will open its own D-Bus connection.
bus.disconnect()
await bus.wait_for_disconnect()
# Yield the local link for use in tests
yield link
@contextlib.asynccontextmanager
async def open_transport_with_bluez_vhci() -> AsyncGenerator[Transport, None]:
"""
Create a bumble HCI Transport connected to BlueZ via the vhci driver and connect
a Bluetooth controller for a peripheral device to it.
"""
async with open_bluez_bluetooth_controller_link("vhci") as local_link:
peripheral_controller = Controller("BLEAK-TEST-PERIPHERAL", link=local_link)
yield Transport(peripheral_controller, peripheral_controller)
|