File: __init__.py

package info (click to toggle)
python-aioshelly 11.2.0-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 396 kB
  • sloc: python: 2,583; makefile: 8; sh: 3
file content (191 lines) | stat: -rw-r--r-- 5,753 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
# Common tools methods
"""Methods for aioshelly cmdline tools."""

from __future__ import annotations

import asyncio
import signal
import sys
from collections.abc import Callable
from datetime import UTC, datetime
from functools import partial
from typing import TYPE_CHECKING, Any, cast

from aiohttp import ClientSession

from aioshelly.block_device import BLOCK_VALUE_UNIT, COAP, BlockDevice, BlockUpdateType
from aioshelly.common import ConnectionOptions, get_info
from aioshelly.const import (
    BLOCK_GENERATIONS,
    DEFAULT_HTTP_PORT,
    MODEL_NAMES,
    RPC_GENERATIONS,
)
from aioshelly.exceptions import (
    CustomPortNotSupported,
    DeviceConnectionError,
    InvalidAuthError,
    MacAddressMismatchError,
    ShellyError,
    WrongShellyGen,
)
from aioshelly.rpc_device import RpcDevice, RpcUpdateType, WsServer

coap_context = COAP()
ws_context = WsServer()
init_tasks_ref = set()


async def create_device(
    aiohttp_session: ClientSession,
    options: ConnectionOptions,
    gen: int | None,
) -> Any:
    """Create a Gen1/Gen2/Gen3 device."""
    if gen is None:
        if info := await get_info(
            aiohttp_session, options.ip_address, port=options.port
        ):
            gen = info.get("gen", 1)
        else:
            raise ShellyError("Unknown Gen")

    if gen in BLOCK_GENERATIONS:
        return await BlockDevice.create(aiohttp_session, coap_context, options)

    if gen in RPC_GENERATIONS:
        return await RpcDevice.create(aiohttp_session, ws_context, options)

    raise ShellyError("Unknown Gen")


async def init_device(device: BlockDevice | RpcDevice) -> bool:
    """Initialize Shelly device."""
    port = getattr(device, "port", DEFAULT_HTTP_PORT)
    try:
        await device.initialize()
    except InvalidAuthError as err:
        print(f"Invalid or missing authorization, error: {err!r}")
    except DeviceConnectionError as err:
        print(f"Error connecting to {device.ip_address}:{port}, " f"error: {err!r}")
    except MacAddressMismatchError as err:
        print(f"MAC address mismatch, error: {err!r}")
    except WrongShellyGen:
        print(f"Wrong Shelly generation for device {device.ip_address}:{port}")
    except CustomPortNotSupported:
        print("Custom port not supported for Gen1")
    else:
        return True

    return False


async def connect_and_print_device(
    aiohttp_session: ClientSession,
    options: ConnectionOptions,
    init: bool,
    gen: int | None,
) -> bool:
    """Connect and print device data."""
    device = await create_device(aiohttp_session, options, gen)

    if init and not await init_device(device):
        return False

    print_device(device)
    device.subscribe_updates(partial(device_updated, action=print_device))

    return True


def device_updated(
    cb_device: BlockDevice | RpcDevice,
    update_type: BlockUpdateType | RpcUpdateType,
    action: Callable[[BlockDevice | RpcDevice], None],
) -> None:
    """Device updated callback."""
    print()
    print(
        f"{datetime.now(tz=UTC).strftime('%H:%M:%S')} "
        f"Device updated! ({update_type})"
    )

    if update_type in (BlockUpdateType.ONLINE, RpcUpdateType.ONLINE):
        loop = asyncio.get_running_loop()
        init_task = loop.create_task(init_device(cb_device))
        init_tasks_ref.add(init_task)
        init_task.add_done_callback(init_tasks_ref.remove)
        return

    action(cb_device)


def print_device(device: BlockDevice | RpcDevice) -> None:
    """Print device data."""
    port = getattr(device, "port", DEFAULT_HTTP_PORT)
    if not device.initialized:
        print()
        print(f"** Device @ {device.ip_address}:{port} not initialized **")
        print()
        return

    model_name = MODEL_NAMES.get(device.model) or f"Unknown ({device.model})"
    print(f"** {device.name} - {model_name}  @ {device.ip_address}:{port} **")
    print()

    if not device.firmware_supported:
        print(f"Device firmware not supported: {device.firmware_version}")

    if device.gen in BLOCK_GENERATIONS:
        print_block_device(cast(BlockDevice, device))
    elif device.gen in RPC_GENERATIONS:
        print_rpc_device(cast(RpcDevice, device))


def print_block_device(device: BlockDevice) -> None:
    """Print block (GEN1) device data."""
    if TYPE_CHECKING:
        assert device.blocks

    for block in device.blocks:
        print(block)
        for attr, value in block.current_values().items():
            info = block.info(attr)

            _value = value if value is not None else "-"

            unit = " " + info[BLOCK_VALUE_UNIT] if BLOCK_VALUE_UNIT in info else ""

            print(f"{attr.ljust(16)}{_value}{unit}")
        print()


def print_rpc_device(device: RpcDevice) -> None:
    """Print RPC (GEN2/3) device data."""
    print(f"Status: {device.status}")
    print(f"Event: {device.event}")
    print(f"Connected: {device.connected}")


def close_connections(_exit_code: int = 0) -> None:
    """Close all connections before exiting."""
    coap_context.close()
    ws_context.close()
    sys.exit(_exit_code)


async def update_outbound_ws(
    options: ConnectionOptions, init: bool, ws_url: str
) -> None:
    """Update outbound WebSocket URL (Gen2/3)."""
    async with ClientSession() as aiohttp_session:
        device: RpcDevice = await create_device(aiohttp_session, options, init, 2)
        print(f"Updating outbound weboskcet URL to {ws_url}")
        print(f"Restart required: {await device.update_outbound_websocket(ws_url)}")


async def wait_for_keyboard_interrupt() -> None:
    """Wait for keyboard interrupt (Ctrl-C)."""
    sig_event = asyncio.Event()
    signal.signal(signal.SIGINT, lambda _exit_code, _frame: sig_event.set())
    await sig_event.wait()