1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199
|
# This example demonstrates a simple temperature sensor peripheral.
#
# The sensor's local value updates every second, and it will notify
# any connected central every 10 seconds.
#
# Work-in-progress demo of implementing bonding and passkey auth.
#
# This example demonstrates the low-level bluetooth module. For most
# applications, we recommend using the higher-level aioble library, which
# includes an implementation of the secret store. See
# https://github.com/micropython/micropython-lib/tree/master/micropython/bluetooth/aioble
import bluetooth
import random
import struct
import time
import json
import binascii
from ble_advertising import advertising_payload
from micropython import const
_IRQ_CENTRAL_CONNECT = const(1)
_IRQ_CENTRAL_DISCONNECT = const(2)
_IRQ_GATTS_INDICATE_DONE = const(20)
_IRQ_ENCRYPTION_UPDATE = const(28)
_IRQ_PASSKEY_ACTION = const(31)
_IRQ_GET_SECRET = const(29)
_IRQ_SET_SECRET = const(30)
_FLAG_READ = const(0x0002)
_FLAG_NOTIFY = const(0x0010)
_FLAG_INDICATE = const(0x0020)
_FLAG_READ_ENCRYPTED = const(0x0200)
# org.bluetooth.service.environmental_sensing
_ENV_SENSE_UUID = bluetooth.UUID(0x181A)
# org.bluetooth.characteristic.temperature
_TEMP_CHAR = (
bluetooth.UUID(0x2A6E),
_FLAG_READ | _FLAG_NOTIFY | _FLAG_INDICATE | _FLAG_READ_ENCRYPTED,
)
_ENV_SENSE_SERVICE = (
_ENV_SENSE_UUID,
(_TEMP_CHAR,),
)
# org.bluetooth.characteristic.gap.appearance.xml
_ADV_APPEARANCE_GENERIC_THERMOMETER = const(768)
_IO_CAPABILITY_DISPLAY_ONLY = const(0)
_IO_CAPABILITY_DISPLAY_YESNO = const(1)
_IO_CAPABILITY_KEYBOARD_ONLY = const(2)
_IO_CAPABILITY_NO_INPUT_OUTPUT = const(3)
_IO_CAPABILITY_KEYBOARD_DISPLAY = const(4)
_PASSKEY_ACTION_INPUT = const(2)
_PASSKEY_ACTION_DISP = const(3)
_PASSKEY_ACTION_NUMCMP = const(4)
class BLETemperature:
def __init__(self, ble, name="mpy-temp"):
self._ble = ble
self._load_secrets()
self._ble.irq(self._irq)
self._ble.config(bond=True)
self._ble.config(le_secure=True)
self._ble.config(mitm=True)
self._ble.config(io=_IO_CAPABILITY_DISPLAY_YESNO)
self._ble.active(True)
self._ble.config(addr_mode=2)
((self._handle,),) = self._ble.gatts_register_services((_ENV_SENSE_SERVICE,))
self._connections = set()
self._payload = advertising_payload(
name=name, services=[_ENV_SENSE_UUID], appearance=_ADV_APPEARANCE_GENERIC_THERMOMETER
)
self._advertise()
def _irq(self, event, data):
# Track connections so we can send notifications.
if event == _IRQ_CENTRAL_CONNECT:
conn_handle, _, _ = data
self._connections.add(conn_handle)
elif event == _IRQ_CENTRAL_DISCONNECT:
conn_handle, _, _ = data
self._connections.remove(conn_handle)
self._save_secrets()
# Start advertising again to allow a new connection.
self._advertise()
elif event == _IRQ_ENCRYPTION_UPDATE:
conn_handle, encrypted, authenticated, bonded, key_size = data
print("encryption update", conn_handle, encrypted, authenticated, bonded, key_size)
elif event == _IRQ_PASSKEY_ACTION:
conn_handle, action, passkey = data
print("passkey action", conn_handle, action, passkey)
if action == _PASSKEY_ACTION_NUMCMP:
accept = int(input("accept? "))
self._ble.gap_passkey(conn_handle, action, accept)
elif action == _PASSKEY_ACTION_DISP:
print("displaying 123456")
self._ble.gap_passkey(conn_handle, action, 123456)
elif action == _PASSKEY_ACTION_INPUT:
print("prompting for passkey")
passkey = int(input("passkey? "))
self._ble.gap_passkey(conn_handle, action, passkey)
else:
print("unknown action")
elif event == _IRQ_GATTS_INDICATE_DONE:
conn_handle, value_handle, status = data
elif event == _IRQ_SET_SECRET:
sec_type, key, value = data
key = sec_type, bytes(key)
value = bytes(value) if value else None
print("set secret:", key, value)
if value is None:
if key in self._secrets:
del self._secrets[key]
return True
else:
return False
else:
self._secrets[key] = value
return True
elif event == _IRQ_GET_SECRET:
sec_type, index, key = data
print("get secret:", sec_type, index, bytes(key) if key else None)
if key is None:
i = 0
for (t, _key), value in self._secrets.items():
if t == sec_type:
if i == index:
return value
i += 1
return None
else:
key = sec_type, bytes(key)
return self._secrets.get(key, None)
def set_temperature(self, temp_deg_c, notify=False, indicate=False):
# Data is sint16 in degrees Celsius with a resolution of 0.01 degrees Celsius.
# Write the local value, ready for a central to read.
self._ble.gatts_write(self._handle, struct.pack("<h", int(temp_deg_c * 100)))
if notify or indicate:
for conn_handle in self._connections:
if notify:
# Notify connected centrals.
self._ble.gatts_notify(conn_handle, self._handle)
if indicate:
# Indicate connected centrals.
self._ble.gatts_indicate(conn_handle, self._handle)
def _advertise(self, interval_us=500000):
self._ble.config(addr_mode=2)
self._ble.gap_advertise(interval_us, adv_data=self._payload)
def _load_secrets(self):
self._secrets = {}
try:
with open("secrets.json", "r") as f:
entries = json.load(f)
for sec_type, key, value in entries:
self._secrets[sec_type, binascii.a2b_base64(key)] = binascii.a2b_base64(value)
except:
print("no secrets available")
def _save_secrets(self):
try:
with open("secrets.json", "w") as f:
json_secrets = [
(sec_type, binascii.b2a_base64(key), binascii.b2a_base64(value))
for (sec_type, key), value in self._secrets.items()
]
json.dump(json_secrets, f)
except:
print("failed to save secrets")
def demo():
ble = bluetooth.BLE()
temp = BLETemperature(ble)
t = 25
i = 0
while True:
# Write every second, notify every 10 seconds.
i = (i + 1) % 10
temp.set_temperature(t, notify=i == 0, indicate=False)
# Random walk the temperature.
t += random.uniform(-0.5, 0.5)
time.sleep_ms(1000)
if __name__ == "__main__":
demo()
|