File: __init__.py

package info (click to toggle)
python-aioshelly 13.23.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 764 kB
  • sloc: python: 7,609; makefile: 7; sh: 3
file content (144 lines) | stat: -rw-r--r-- 4,047 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
"""Shelly Gen2 BLE support."""

from __future__ import annotations

import logging
from typing import TYPE_CHECKING

from habluetooth import BluetoothScanningMode, HaBluetoothConnector

from ..const import MODEL_ID_TO_DEVICE
from ..exceptions import RpcCallError
from ..rpc_device import RpcDevice
from .backend.scanner import ShellyBLEScanner
from .const import (
    BLE_CODE,
    BLE_SCRIPT_NAME,
    VAR_ACTIVE,
    VAR_EVENT_TYPE,
    VAR_VERSION,
)

if TYPE_CHECKING:
    from ..const import ShellyDevice

LOGGER = logging.getLogger(__name__)


async def _async_get_scripts_by_name(device: RpcDevice) -> dict[str, int]:
    """Get scripts by name."""
    scripts = await device.script_list()
    return {script["name"]: script["id"] for script in scripts}


async def async_stop_scanner(device: RpcDevice) -> None:
    """Stop scanner."""
    script_name_to_id = await _async_get_scripts_by_name(device)
    if script_id := script_name_to_id.get(BLE_SCRIPT_NAME):
        await device.script_stop(script_id)


async def async_start_scanner(
    device: RpcDevice, active: bool, event_type: str, data_version: int
) -> None:
    """Start scanner."""
    script_name_to_id = await _async_get_scripts_by_name(device)
    if BLE_SCRIPT_NAME not in script_name_to_id:
        await device.script_create(BLE_SCRIPT_NAME)
        script_name_to_id = await _async_get_scripts_by_name(device)

    ble_script_id = script_name_to_id[BLE_SCRIPT_NAME]

    # Not using format strings here because the script
    # code contains curly braces
    code = (
        BLE_CODE.replace(VAR_ACTIVE, "true" if active else "false")
        .replace(VAR_EVENT_TYPE, event_type)
        .replace(VAR_VERSION, str(data_version))
    )

    needs_putcode = False
    try:
        code_response = await device.script_getcode(ble_script_id)
    except RpcCallError:
        # Script has no code yet
        needs_putcode = True
    else:
        needs_putcode = code_response["data"] != code

    if needs_putcode:
        # Avoid writing the flash unless we actually need to
        # update the script
        await device.script_stop(ble_script_id)
        await device.script_putcode(ble_script_id, code)

    await device.script_start(ble_script_id)


def create_scanner(
    source: str,
    name: str,
    requested_mode: BluetoothScanningMode | None = None,
    current_mode: BluetoothScanningMode | None = None,
) -> ShellyBLEScanner:
    """Create scanner."""
    return ShellyBLEScanner(
        source,
        name,
        HaBluetoothConnector(
            # no active connections to shelly yet
            client=None,  # type: ignore[arg-type]
            source=source,
            can_connect=lambda: False,
        ),
        False,
        requested_mode=requested_mode,
        current_mode=current_mode,
    )


async def async_ensure_ble_enabled(device: RpcDevice) -> bool:
    """Ensure BLE is enabled.

    Returns True if the device was restarted.

    Raises RpcCallError if BLE is not supported or could not
    be enabled.
    """
    ble_config = await device.ble_getconfig()
    if ble_config["enable"]:
        return False
    ble_enable = await device.ble_setconfig(enable=True, enable_rpc=False)
    if not ble_enable["restart_required"]:
        return False
    LOGGER.info("BLE enabled, restarting device %s:%s", device.ip_address, device.port)
    await device.trigger_reboot(3500)
    return True


def get_device_from_model_id(model_id: int) -> ShellyDevice | None:
    """Get the ShellyDevice from a BLE model ID.

    Args:
        model_id: Model ID from BLE manufacturer data

    Returns:
        ShellyDevice object or None if not found

    """
    return MODEL_ID_TO_DEVICE.get(model_id)


def get_name_from_model_id(model_id: int) -> str | None:
    """Get the device name from a BLE model ID.

    Args:
        model_id: Model ID from BLE manufacturer data

    Returns:
        Device name or None if not found

    """
    if device := get_device_from_model_id(model_id):
        return device.name
    return None