1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171
|
# Test MTU exchange (initiated by central) and the effect on notify and write
# size.
#
# See ble_mtu_peripheral.py which tests peripheral-initiated MTU exchange (not supported on btstack).
#
# Four connections are made:
#
# Test | Requested | Preferred | Result | Notes
# 0 | 300 (C) | 256 (P) | 256 |
# 1 | 300 (C) | 200 (P) | 200 |
# 2 | 300 (C) | 400 (P) | 300 |
# 3 | 300 (C) | 50 (P) | 50 | Shorter than 64 so the notification is truncated.
#
# For each connection a notification is sent by the server (peripheral) and a characteristic
# is written by the client (central) to ensure that the expected size is transmitted.
from micropython import const
import time, machine, bluetooth
TIMEOUT_MS = 5000
_IRQ_CENTRAL_CONNECT = const(1)
_IRQ_CENTRAL_DISCONNECT = const(2)
_IRQ_GATTS_WRITE = const(3)
_IRQ_PERIPHERAL_CONNECT = const(7)
_IRQ_PERIPHERAL_DISCONNECT = const(8)
_IRQ_GATTC_CHARACTERISTIC_RESULT = const(11)
_IRQ_GATTC_CHARACTERISTIC_DONE = const(12)
_IRQ_GATTC_WRITE_DONE = const(17)
_IRQ_GATTC_NOTIFY = const(18)
_IRQ_MTU_EXCHANGED = const(21)
SERVICE_UUID = bluetooth.UUID("A5A5A5A5-FFFF-9999-1111-5A5A5A5A5A5A")
CHAR_UUID = bluetooth.UUID("00000000-1111-2222-3333-444444444444")
CHAR = (
CHAR_UUID,
bluetooth.FLAG_READ | bluetooth.FLAG_WRITE | bluetooth.FLAG_NOTIFY,
)
SERVICE = (
SERVICE_UUID,
(CHAR,),
)
SERVICES = (SERVICE,)
waiting_events = {}
def irq(event, data):
if event == _IRQ_CENTRAL_CONNECT:
print("_IRQ_CENTRAL_CONNECT")
waiting_events[event] = data[0]
elif event == _IRQ_CENTRAL_DISCONNECT:
print("_IRQ_CENTRAL_DISCONNECT")
elif event == _IRQ_GATTS_WRITE:
print("_IRQ_GATTS_WRITE")
elif event == _IRQ_PERIPHERAL_CONNECT:
print("_IRQ_PERIPHERAL_CONNECT")
waiting_events[event] = data[0]
elif event == _IRQ_PERIPHERAL_DISCONNECT:
print("_IRQ_PERIPHERAL_DISCONNECT")
elif event == _IRQ_GATTC_CHARACTERISTIC_RESULT:
if data[-1] == CHAR_UUID:
print("_IRQ_GATTC_CHARACTERISTIC_RESULT", data[-1])
waiting_events[event] = data[2]
else:
return
elif event == _IRQ_GATTC_CHARACTERISTIC_DONE:
print("_IRQ_GATTC_CHARACTERISTIC_DONE")
elif event == _IRQ_GATTC_WRITE_DONE:
print("_IRQ_GATTC_WRITE_DONE")
elif event == _IRQ_GATTC_NOTIFY:
print("_IRQ_GATTC_NOTIFY", len(data[-1]), chr(data[-1][0]))
elif event == _IRQ_MTU_EXCHANGED:
print("_IRQ_MTU_EXCHANGED", data[-1])
waiting_events[event] = data[-1]
if event not in waiting_events:
waiting_events[event] = None
def wait_for_event(event, timeout_ms):
t0 = time.ticks_ms()
while time.ticks_diff(time.ticks_ms(), t0) < timeout_ms:
if event in waiting_events:
return waiting_events.pop(event)
machine.idle()
raise ValueError("Timeout waiting for {}".format(event))
# Acting in peripheral role.
def instance0():
multitest.globals(BDADDR=ble.config("mac"))
((char_handle,),) = ble.gatts_register_services(SERVICES)
ble.gatts_set_buffer(char_handle, 500, False)
multitest.next()
default_mtus = [256, 200, 400, 50]
try:
for i in range(4):
ble.config(mtu=default_mtus[i])
print("gap_advertise")
ble.gap_advertise(20_000, b"\x02\x01\x06\x04\xffMPY")
multitest.broadcast(f"peripheral:adv:{i}")
# Wait for central to connect to us.
conn_handle = wait_for_event(_IRQ_CENTRAL_CONNECT, TIMEOUT_MS)
mtu = wait_for_event(_IRQ_MTU_EXCHANGED, TIMEOUT_MS)
multitest.wait(f"client:discovery:{i}")
print("gatts_notify")
ble.gatts_notify(conn_handle, char_handle, str(i) * 64)
wait_for_event(_IRQ_GATTS_WRITE, TIMEOUT_MS)
print("gatts_read")
data = ble.gatts_read(char_handle)
print("characteristic len:", len(data), chr(data[0]))
# Wait for the central to disconnect.
wait_for_event(_IRQ_CENTRAL_DISCONNECT, TIMEOUT_MS)
finally:
ble.active(0)
# Acting in central role.
def instance1():
multitest.next()
try:
for i in range(4):
ble.config(mtu=300)
multitest.wait(f"peripheral:adv:{i}")
print("gap_connect")
ble.gap_connect(BDADDR[0], BDADDR[1], TIMEOUT_MS)
conn_handle = wait_for_event(_IRQ_PERIPHERAL_CONNECT, TIMEOUT_MS)
# Central-initiated mtu exchange.
print("gattc_exchange_mtu")
ble.gattc_exchange_mtu(conn_handle)
mtu = wait_for_event(_IRQ_MTU_EXCHANGED, TIMEOUT_MS)
print("gattc_discover_characteristics")
ble.gattc_discover_characteristics(conn_handle, 1, 65535)
value_handle = wait_for_event(_IRQ_GATTC_CHARACTERISTIC_RESULT, TIMEOUT_MS)
wait_for_event(_IRQ_GATTC_CHARACTERISTIC_DONE, TIMEOUT_MS)
multitest.broadcast(f"client:discovery:{i}")
wait_for_event(_IRQ_GATTC_NOTIFY, TIMEOUT_MS)
print("gattc_write")
ble.gattc_write(conn_handle, value_handle, chr(ord("a") + i) * (mtu - 3), 1)
wait_for_event(_IRQ_GATTC_WRITE_DONE, TIMEOUT_MS)
time.sleep_ms(300)
# Disconnect from peripheral.
print("gap_disconnect:", ble.gap_disconnect(conn_handle))
wait_for_event(_IRQ_PERIPHERAL_DISCONNECT, TIMEOUT_MS)
finally:
ble.active(0)
ble = bluetooth.BLE()
ble.active(1)
ble.irq(irq)
|