1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119
|
"""Shelly Gen2 BLE support."""
from __future__ import annotations
import logging
from habluetooth import HaBluetoothConnector
from ..exceptions import RpcCallError
from ..rpc_device import RpcDevice
from .backend.scanner import ShellyBLEScanner
from .const import (
BLE_CODE,
BLE_SCRIPT_NAME,
VAR_ACTIVE,
VAR_DURATION_MS,
VAR_EVENT_TYPE,
VAR_INTERVAL_MS,
VAR_VERSION,
VAR_WINDOW_MS,
)
LOGGER = logging.getLogger(__name__)
async def _async_get_scripts_by_name(device: RpcDevice) -> dict[str, int]:
"""Get scripts by name."""
scripts = await device.script_list()
return {script["name"]: script["id"] for script in scripts}
async def async_stop_scanner(device: RpcDevice) -> None:
"""Stop scanner."""
script_name_to_id = await _async_get_scripts_by_name(device)
if script_id := script_name_to_id.get(BLE_SCRIPT_NAME):
await device.script_stop(script_id)
async def async_start_scanner( # noqa: PLR0913
device: RpcDevice,
active: bool,
event_type: str,
data_version: int,
interval_ms: int,
window_ms: int,
duration_ms: int,
) -> None:
"""Start scanner."""
script_name_to_id = await _async_get_scripts_by_name(device)
if BLE_SCRIPT_NAME not in script_name_to_id:
await device.script_create(BLE_SCRIPT_NAME)
script_name_to_id = await _async_get_scripts_by_name(device)
ble_script_id = script_name_to_id[BLE_SCRIPT_NAME]
# Not using format strings here because the script
# code contains curly braces
code = (
BLE_CODE.replace(VAR_ACTIVE, "true" if active else "false")
.replace(VAR_EVENT_TYPE, event_type)
.replace(VAR_VERSION, str(data_version))
.replace(VAR_INTERVAL_MS, str(interval_ms))
.replace(VAR_WINDOW_MS, str(window_ms))
.replace(VAR_DURATION_MS, str(duration_ms))
)
needs_putcode = False
try:
code_response = await device.script_getcode(ble_script_id)
except RpcCallError:
# Script has no code yet
needs_putcode = True
else:
needs_putcode = code_response["data"] != code
if needs_putcode:
# Avoid writing the flash unless we actually need to
# update the script
await device.script_stop(ble_script_id)
await device.script_putcode(ble_script_id, code)
await device.script_start(ble_script_id)
def create_scanner(
source: str,
name: str,
) -> ShellyBLEScanner:
"""Create scanner."""
return ShellyBLEScanner(
source,
name,
HaBluetoothConnector(
# no active connections to shelly yet
client=None, # type: ignore[arg-type]
source=source,
can_connect=lambda: False,
),
False,
)
async def async_ensure_ble_enabled(device: RpcDevice) -> bool:
"""Ensure BLE is enabled.
Returns True if the device was restarted.
Raises RpcCallError if BLE is not supported or could not
be enabled.
"""
ble_config = await device.ble_getconfig()
if ble_config["enable"]:
return False
ble_enable = await device.ble_setconfig(enable=True, enable_rpc=True)
if not ble_enable["restart_required"]:
return False
LOGGER.info("BLE enabled, restarting device %s:%s", device.ip_address, device.port)
await device.trigger_reboot(3500)
return True
|