1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133
|
# Ping-pong GATT notifications between two devices.
from micropython import const
import time, machine, bluetooth
TIMEOUT_MS = 2000
_IRQ_CENTRAL_CONNECT = const(1)
_IRQ_CENTRAL_DISCONNECT = const(2)
_IRQ_PERIPHERAL_CONNECT = const(7)
_IRQ_PERIPHERAL_DISCONNECT = const(8)
_IRQ_GATTC_CHARACTERISTIC_RESULT = const(11)
_IRQ_GATTC_CHARACTERISTIC_DONE = const(12)
_IRQ_GATTC_NOTIFY = const(18)
# How long to run the test for.
_NUM_NOTIFICATIONS = const(50)
SERVICE_UUID = bluetooth.UUID("A5A5A5A5-FFFF-9999-1111-5A5A5A5A5A5A")
CHAR_UUID = bluetooth.UUID("00000000-1111-2222-3333-444444444444")
CHAR = (
CHAR_UUID,
bluetooth.FLAG_NOTIFY,
)
SERVICE = (
SERVICE_UUID,
(CHAR,),
)
SERVICES = (SERVICE,)
is_central = False
waiting_events = {}
def irq(event, data):
if event == _IRQ_CENTRAL_CONNECT:
waiting_events[event] = data[0]
elif event == _IRQ_PERIPHERAL_CONNECT:
waiting_events[event] = data[0]
elif event == _IRQ_GATTC_CHARACTERISTIC_RESULT:
# conn_handle, def_handle, value_handle, properties, uuid = data
if data[-1] == CHAR_UUID:
waiting_events[event] = data[2]
else:
return
elif event == _IRQ_GATTC_NOTIFY:
if is_central:
conn_handle, value_handle, notify_data = data
ble.gatts_notify(conn_handle, value_handle, b"central" + notify_data)
if event not in waiting_events:
waiting_events[event] = None
def wait_for_event(event, timeout_ms):
t0 = time.ticks_ms()
while time.ticks_diff(time.ticks_ms(), t0) < timeout_ms:
if event in waiting_events:
return waiting_events.pop(event)
machine.idle()
raise ValueError("Timeout waiting for {}".format(event))
# Acting in peripheral role.
def instance0():
multitest.globals(BDADDR=ble.config("mac"))
((char_handle,),) = ble.gatts_register_services(SERVICES)
print("gap_advertise")
ble.gap_advertise(20_000, b"\x02\x01\x06\x04\xffMPY")
multitest.next()
try:
# Wait for central to connect to us.
conn_handle = wait_for_event(_IRQ_CENTRAL_CONNECT, TIMEOUT_MS)
# Discover characteristics.
ble.gattc_discover_characteristics(conn_handle, 1, 65535)
value_handle = wait_for_event(_IRQ_GATTC_CHARACTERISTIC_RESULT, TIMEOUT_MS)
wait_for_event(_IRQ_GATTC_CHARACTERISTIC_DONE, TIMEOUT_MS)
# Give the central enough time to discover chars.
time.sleep_ms(500)
ticks_start = time.ticks_ms()
for i in range(_NUM_NOTIFICATIONS):
# Send a notification and wait for a response.
ble.gatts_notify(conn_handle, value_handle, "peripheral" + str(i))
wait_for_event(_IRQ_GATTC_NOTIFY, TIMEOUT_MS)
ticks_end = time.ticks_ms()
ticks_total = time.ticks_diff(ticks_end, ticks_start)
multitest.output_metric(
"Acknowledged {} notifications in {} ms. {} ms/notification.".format(
_NUM_NOTIFICATIONS, ticks_total, ticks_total // _NUM_NOTIFICATIONS
)
)
# Disconnect the central.
print("gap_disconnect:", ble.gap_disconnect(conn_handle))
wait_for_event(_IRQ_CENTRAL_DISCONNECT, TIMEOUT_MS)
finally:
ble.active(0)
# Acting in central role.
def instance1():
global is_central
is_central = True
((char_handle,),) = ble.gatts_register_services(SERVICES)
multitest.next()
try:
# Connect to peripheral, with a short connection interval to reduce notify latency.
print("gap_connect")
ble.gap_connect(BDADDR[0], BDADDR[1], 2000, 12500, 12500)
conn_handle = wait_for_event(_IRQ_PERIPHERAL_CONNECT, TIMEOUT_MS)
# Discover characteristics.
ble.gattc_discover_characteristics(conn_handle, 1, 65535)
value_handle = wait_for_event(_IRQ_GATTC_CHARACTERISTIC_RESULT, TIMEOUT_MS)
wait_for_event(_IRQ_GATTC_CHARACTERISTIC_DONE, TIMEOUT_MS)
# The IRQ handler will respond to each notification.
# Wait for the peripheral to disconnect us.
wait_for_event(_IRQ_PERIPHERAL_DISCONNECT, 20000)
finally:
ble.active(0)
ble = bluetooth.BLE()
ble.active(1)
ble.irq(irq)
|