1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225
|
"""Example of provisioning WiFi credentials via Bluetooth.
This script demonstrates how to:
1. Scan for and connect to a Shelly device via BLE
2. Scan for available WiFi networks
3. Configure WiFi credentials
Usage:
python ble_provision_wifi.py [MAC_ADDRESS] [SSID] [PASSWORD] [-d|--debug]
If no MAC_ADDRESS is provided, the script will scan for all Shelly devices
and prompt you to select one.
If no SSID/PASSWORD is provided, you will be prompted after scanning networks.
Use -d or --debug to enable debug logging.
"""
from __future__ import annotations
import argparse
import asyncio
import getpass
import logging
import platform
import sys
from contextlib import suppress
from bleak import BleakScanner
from bleak.backends.device import BLEDevice
from bleak.backends.scanner import AdvertisementData
from aioshelly.ble.provisioning import async_provision_wifi, async_scan_wifi_networks
# Check if we're on macOS
IS_MACOS = platform.system() == "Darwin"
class DeviceScanner:
"""Scanner to find a specific BLE device by MAC address."""
def __init__(self, mac_address: str) -> None:
"""Initialize scanner."""
self.mac_address = mac_address.upper()
self.found_event = asyncio.Event()
self.found_device: BLEDevice | None = None
def detection_callback(
self,
device: BLEDevice,
advertisement_data: AdvertisementData, # noqa: ARG002
) -> None:
"""Handle device detection."""
if device.address.upper() == self.mac_address:
self.found_device = device
self.found_event.set()
async def find_device(self, timeout: float = 10.0) -> BLEDevice | None:
"""Scan for device and return it if found."""
# On macOS, use_bdaddr=True to get real MAC addresses in callback
scanner_kwargs: dict = {"detection_callback": self.detection_callback}
if IS_MACOS:
scanner_kwargs["cb"] = {"use_bdaddr": True}
scanner = BleakScanner(**scanner_kwargs)
await scanner.start()
with suppress(TimeoutError):
await asyncio.wait_for(self.found_event.wait(), timeout=timeout)
await scanner.stop()
return self.found_device
class ShellyScannerAll:
"""Scanner to find all Shelly BLE devices."""
def __init__(self) -> None:
"""Initialize scanner."""
self.found_devices: list[BLEDevice] = []
def detection_callback(
self,
device: BLEDevice,
advertisement_data: AdvertisementData, # noqa: ARG002
) -> None:
"""Handle device detection."""
# Only include devices with names starting with "Shelly"
if (
device.name
and device.name.startswith("Shelly")
and not any(d.address == device.address for d in self.found_devices)
):
self.found_devices.append(device)
async def scan_for_devices(self, timeout: float = 10.0) -> list[BLEDevice]:
"""Scan for all Shelly devices and return list."""
# On macOS, use_bdaddr=True to get real MAC addresses in callback
scanner_kwargs: dict = {"detection_callback": self.detection_callback}
if IS_MACOS:
scanner_kwargs["cb"] = {"use_bdaddr": True}
scanner = BleakScanner(**scanner_kwargs)
await scanner.start()
await asyncio.sleep(timeout)
await scanner.stop()
return self.found_devices
async def main() -> None: # noqa: PLR0915
"""Run the WiFi provisioning example."""
# Parse command-line arguments
parser = argparse.ArgumentParser(
description="Provision WiFi credentials to Shelly device via BLE"
)
parser.add_argument("mac_address", nargs="?", help="BLE MAC address of device")
parser.add_argument("ssid", nargs="?", help="WiFi SSID")
parser.add_argument("password", nargs="?", help="WiFi password")
parser.add_argument(
"-d", "--debug", action="store_true", help="Enable debug logging"
)
args = parser.parse_args()
# Check if MAC address was provided
if args.mac_address:
# MAC address provided, scan for specific device
mac_address = args.mac_address.upper()
print(f"Scanning for device {mac_address}...")
device_scanner = DeviceScanner(mac_address)
ble_device = await device_scanner.find_device(timeout=10.0)
if not ble_device:
print(f"Device {mac_address} not found or out of range")
sys.exit(1)
print(f"Found device: {ble_device.name or 'Unknown'} ({ble_device.address})")
else:
# No MAC address provided, scan for all Shelly devices
print("Scanning for Shelly devices...")
shelly_scanner = ShellyScannerAll()
devices = await shelly_scanner.scan_for_devices(timeout=10.0)
if not devices:
print("No Shelly devices found")
sys.exit(1)
print(f"\nFound {len(devices)} Shelly device(s):")
for i, dev in enumerate(devices, 1):
print(f" {i}. {dev.name or 'Unknown'} ({dev.address})")
# Prompt user to select a device
while True:
try:
choice = input(f"\nSelect device (1-{len(devices)}): ").strip()
device_idx = int(choice) - 1
if 0 <= device_idx < len(devices):
ble_device = devices[device_idx]
name = ble_device.name or "Unknown"
print(f"Selected: {name} ({ble_device.address})")
break
print(f"Please enter a number between 1 and {len(devices)}")
except (ValueError, KeyboardInterrupt):
print("\nCancelled")
sys.exit(1)
# Enable debug logging if requested
if args.debug:
logging.basicConfig(
level=logging.DEBUG,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
force=True,
)
# Scan for available networks using the provisioning helper
print("\nScanning for WiFi networks...")
networks = await async_scan_wifi_networks(ble_device)
print(f"\nFound {len(networks)} networks:")
for i, network in enumerate(networks, 1):
ssid = network.get("ssid", "Unknown")
rssi = network.get("rssi", 0)
auth = network.get("auth", 0)
auth_str = "Open" if auth == 0 else "Secured"
print(f" {i}. {ssid} (Signal: {rssi} dBm, {auth_str})")
# Get SSID and password from command line or prompt
if args.ssid and args.password:
ssid = args.ssid
password = args.password
else:
# Prompt for SSID - can select from list or enter custom
print()
ssid_input = input(
f"Enter network number (1-{len(networks)}) or custom SSID: "
).strip()
if not ssid_input:
print("No SSID provided, skipping WiFi configuration.")
return
# Check if user entered a number to select from list
try:
network_idx = int(ssid_input) - 1
if 0 <= network_idx < len(networks):
ssid = networks[network_idx].get("ssid", "")
print(f"Selected network: {ssid}")
else:
print(
f"Invalid selection. Please enter 1-{len(networks)} "
"or a custom SSID"
)
return
except ValueError:
# Not a number, treat as custom SSID
ssid = ssid_input
password = getpass.getpass("Enter WiFi password: ")
# Provision WiFi credentials using the provisioning helper
print(f"\nConfiguring WiFi: {ssid}")
await async_provision_wifi(ble_device, ssid, password)
print("WiFi configuration complete!")
print("\nDevice should now connect to WiFi.")
print("You can now connect to it via IP address.")
if __name__ == "__main__":
asyncio.run(main())
|