File: bluez_controller.py

package info (click to toggle)
bleak 2.1.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 2,076 kB
  • sloc: python: 11,282; makefile: 165; java: 105
file content (211 lines) | stat: -rw-r--r-- 8,024 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
import sys
from typing import TYPE_CHECKING

if TYPE_CHECKING:
    if sys.platform != "linux":
        assert False, "This is only available on Linux"


import asyncio
import contextlib
import logging
from typing import AsyncGenerator

from bumble.controller import Controller
from bumble.link import LocalLink
from bumble.transport import open_transport
from bumble.transport.common import Transport
from dbus_fast import BusType, Message, MessageType, Variant
from dbus_fast.aio.message_bus import MessageBus

from bleak._compat import timeout as async_timeout
from bleak.backends.bluezdbus import defs
from bleak.backends.bluezdbus.signals import MatchRules, add_match
from bleak.backends.bluezdbus.utils import assert_reply, get_dbus_authenticator

BLEAK_TEST_MANUFACTURER_ID = 0xB1EA

logger = logging.getLogger(__name__)


@contextlib.asynccontextmanager
async def open_message_bus() -> AsyncGenerator[MessageBus, None]:
    """
    Open a D-Bus message bus connection.
    """
    bus = MessageBus(bus_type=BusType.SYSTEM, auth=get_dbus_authenticator())
    try:
        await bus.connect()
        yield bus
    finally:
        bus.disconnect()
        await bus.wait_for_disconnect()


async def power_on_controller(
    bus: MessageBus, adapter_path: str, timeout: float = 2.0
) -> None:
    """
    Power on a Bluetooth controller via D-Bus.

    It may take some time for the adapter to fully configure itself after
    powering on. So we make multiple attempts until timeout is reached.
    """
    async with async_timeout(timeout):
        while True:
            try:
                reply = await bus.call(
                    Message(
                        destination=defs.BLUEZ_SERVICE,
                        path=adapter_path,
                        interface=defs.PROPERTIES_INTERFACE,
                        member="Set",
                        signature="ssv",
                        body=[defs.ADAPTER_INTERFACE, "Powered", Variant("b", True)],
                    )
                )
                assert_reply(reply)
                return
            except Exception as e:
                logger.debug("Failed to power on adapter at %s: %s", adapter_path, e)
                await asyncio.sleep(0.1)


@contextlib.asynccontextmanager
async def wait_for_new_adapter() -> (
    AsyncGenerator[tuple[MessageBus, asyncio.Future[str]], None]
):
    """
    Connect to D-Bus and wait for a new Bluetooth adapter to be added.

    Yields a Future that will be resolved with the adapter path when
    a new adapter is detected via InterfacesAdded signal.
    """
    async with open_message_bus() as bus:
        loop = asyncio.get_running_loop()
        adapter_path_future: asyncio.Future[str] = loop.create_future()

        def _on_interfaces_added(message: Message):
            if message.message_type != MessageType.SIGNAL:
                return
            if message.member != "InterfacesAdded":
                return

            obj_path, ifaces = message.body
            adapter = ifaces.get(defs.ADAPTER_INTERFACE)
            if not adapter:
                return

            # Check for our test manufacturer ID to identify the adapter
            if adapter["Manufacturer"].value != BLEAK_TEST_MANUFACTURER_ID:
                return

            if not adapter_path_future.done():
                adapter_path_future.set_result(obj_path)

        bus.add_message_handler(_on_interfaces_added)

        try:
            # Subscribe to InterfacesAdded signals
            reply = await add_match(
                bus,
                MatchRules(
                    interface=defs.OBJECT_MANAGER_INTERFACE,
                    member="InterfacesAdded",
                    arg0path="/org/bluez/",
                ),
            )
            assert_reply(reply)

            # Yield the future and bus for the caller to use
            yield bus, adapter_path_future
        finally:
            bus.remove_message_handler(_on_interfaces_added)


def _clear_bit(flags: bytes, bit_pos: int) -> bytes:
    int_flags = int.from_bytes(flags, byteorder="little")
    int_flags &= ~(1 << bit_pos)
    return int_flags.to_bytes(len(flags), byteorder="little")


@contextlib.asynccontextmanager
async def open_bluez_bluetooth_controller_link(
    hci_transport_name: str,
) -> AsyncGenerator[LocalLink, None]:
    """
    Open a local link (virtual RF connection) to a bumble Bluetooth
    controller that is connected to BlueZ.
    """
    async with wait_for_new_adapter() as (bus, adapter_path_future):
        # Open a HCI transport connected to the OS
        async with await open_transport(hci_transport_name) as hci_transport:
            # Local link to create virtual RF connection between multiple Bluetooth Controllers
            link = LocalLink()

            # Bluetooth controller that BlueZ can connect to.
            # (This will register itself to the link.)
            bluez_controller = Controller(
                "BLEAK-TEST-BLUEZ",
                host_source=hci_transport.source,
                host_sink=hci_transport.sink,
                link=link,
            )
            bluez_controller.manufacturer_name = BLEAK_TEST_MANUFACTURER_ID

            # HACK: Work around Bumble missing feature combined with Linux kernel
            # requirement. https://github.com/google/bumble/issues/841
            #
            # According to the Bluetooth spec:
            #
            #   C24: [HCI_LE_Enhanced_Connection_Complete event is] Mandatory if
            #   the LE Controller supports Connection State and either LE Feature (LL
            #   Privacy) or LE Feature (Extended Advertising) is supported, otherwise optional if
            #   the LE Controller supports Connection State, otherwise excluded.
            #
            # And the Linux kernel enforces this in hci_le_create_conn_sync().
            # It will get a timeout if one of these features is enabled and the
            # Enhanced Connection Complete event is not sent.
            #
            # However, Bumble (as of 0.0.220) always sends HCI_LE_Connection_Complete_Event
            # in response to HCI_LE_Create_Connection_Command even when it should
            # be sending HCI_LE_Enhanced_Connection_Complete_Event.
            #
            # For now, we can work around the issue by disabling LL Privacy and
            # Extended Advertising features in the BlueZ controller.
            #
            # Ideally, this should be fixed in Bumble.

            bluez_controller.le_features = _clear_bit(
                bluez_controller.le_features, 6  # LL Privacy
            )
            bluez_controller.le_features = _clear_bit(
                bluez_controller.le_features, 12  # Extended Advertising
            )

            # Wait up to 5 seconds for the new adapter to appear via InterfacesAdded
            adapter_path = await asyncio.wait_for(adapter_path_future, timeout=5.0)
            logger.info(f"New adapter detected at {adapter_path}")

            # Ensure controller is powered on. This also ensures that BlueZ has fully
            # initialized the adapter and it is ready for use.
            await power_on_controller(bus, adapter_path)

            # We dont need the bus anymore. We have done everything needed with it.
            # Bleak will open its own D-Bus connection.
            bus.disconnect()
            await bus.wait_for_disconnect()

            # Yield the local link for use in tests
            yield link


@contextlib.asynccontextmanager
async def open_transport_with_bluez_vhci() -> AsyncGenerator[Transport, None]:
    """
    Create a bumble HCI Transport connected to BlueZ via the vhci driver and connect
    a Bluetooth controller for a peripheral device to it.
    """
    async with open_bluez_bluetooth_controller_link("vhci") as local_link:
        peripheral_controller = Controller("BLEAK-TEST-PERIPHERAL", link=local_link)
        yield Transport(peripheral_controller, peripheral_controller)