File: firmware.py

package info (click to toggle)
zwave-js-server-python 0.67.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 1,820 kB
  • sloc: python: 15,886; sh: 21; javascript: 16; makefile: 2
file content (77 lines) | stat: -rw-r--r-- 2,328 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
"""Firmware update helper."""

from __future__ import annotations

import asyncio

import aiohttp

from .client import Client
from .model.driver.firmware import DriverFirmwareUpdateData, DriverFirmwareUpdateResult
from .model.node import Node
from .model.node.firmware import NodeFirmwareUpdateData, NodeFirmwareUpdateResult


async def update_firmware(
    url: str,
    node: Node,
    updates: list[NodeFirmwareUpdateData],
    session: aiohttp.ClientSession,
    additional_user_agent_components: dict[str, str] | None = None,
) -> NodeFirmwareUpdateResult:
    """Send updateFirmware command to Node."""
    client = Client(
        url, session, additional_user_agent_components=additional_user_agent_components
    )
    await client.connect()
    await client.initialize()

    receive_task = asyncio.get_running_loop().create_task(client.receive_until_closed())

    cmd = {
        "command": "node.update_firmware",
        "nodeId": node.node_id,
        "updates": [update.to_dict() for update in updates],
    }

    data = await client.async_send_command(cmd, require_schema=29)
    await client.disconnect()
    if not receive_task.done():
        receive_task.cancel()

    return NodeFirmwareUpdateResult(node, data["result"])


async def driver_firmware_update_otw(
    url: str,
    firmware_file: DriverFirmwareUpdateData,
    session: aiohttp.ClientSession,
    additional_user_agent_components: dict[str, str] | None = None,
) -> DriverFirmwareUpdateResult:
    """
    Send firmwareUpdateOTW command to Driver.

    Sending the wrong firmware to a driver can brick it and make it unrecoverable.
    Consumers of this library should build mechanisms to ensure that users understand
    the risks.
    """
    client = Client(
        url, session, additional_user_agent_components=additional_user_agent_components
    )
    await client.connect()
    await client.initialize()

    receive_task = asyncio.get_running_loop().create_task(client.receive_until_closed())

    data = await client.async_send_command(
        {
            "command": "driver.firmware_update_otw",
            **firmware_file.to_dict(),
        },
        require_schema=29,
    )
    await client.disconnect()
    if not receive_task.done():
        receive_task.cancel()

    return DriverFirmwareUpdateResult(data["result"])