File: test_client_connecting.py

package info (click to toggle)
bleak 2.1.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 2,076 kB
  • sloc: python: 11,282; makefile: 165; java: 105
file content (71 lines) | stat: -rw-r--r-- 2,290 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
import asyncio

from bumble.device import Device

from bleak import BleakClient
from bleak._compat import timeout as async_timeout
from tests.integration.conftest import (
    configure_and_power_on_bumble_peripheral,
    find_ble_device,
)


async def test_connect(bumble_peripheral: Device):
    """Connecting to a BLE device is possible."""
    await configure_and_power_on_bumble_peripheral(bumble_peripheral)

    device = await find_ble_device(bumble_peripheral)

    async with BleakClient(device) as client:
        assert client.name == bumble_peripheral.name


async def test_connect_multiple_times(bumble_peripheral: Device):
    """Connecting to a BLE device multiple times is possible."""
    await configure_and_power_on_bumble_peripheral(bumble_peripheral)

    device = await find_ble_device(bumble_peripheral)

    async with BleakClient(device):
        pass

    await bumble_peripheral.start_advertising()

    async with BleakClient(device):
        pass


async def test_is_connected(bumble_peripheral: Device):
    """Check if a connection is connected is working."""
    await configure_and_power_on_bumble_peripheral(bumble_peripheral)

    device = await find_ble_device(bumble_peripheral)

    client = BleakClient(device)

    assert client.is_connected is False
    async with BleakClient(device) as client:
        assert client.is_connected is True
    assert client.is_connected is False


async def test_disconnect_callback(bumble_peripheral: Device):
    """Check if disconnect callback is called."""
    await configure_and_power_on_bumble_peripheral(bumble_peripheral)

    device = await find_ble_device(bumble_peripheral)

    disconnected_client_future: asyncio.Future[BleakClient] = asyncio.Future()

    def disconnected_callback(client: BleakClient):
        disconnected_client_future.set_result(client)

    async with BleakClient(device, disconnected_callback) as client:
        # Disconnect from virtual device side
        virtual_connection = list(bumble_peripheral.connections.values())[0]
        await virtual_connection.disconnect()

        # Wait for disconnected callback to be called
        async with async_timeout(5):
            disconnected_client = await disconnected_client_future
        assert disconnected_client is client