File: multicast.py

package info (click to toggle)
zwave-js-server-python 0.67.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 1,820 kB
  • sloc: python: 15,886; sh: 21; javascript: 16; makefile: 2
file content (136 lines) | stat: -rw-r--r-- 3,615 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
"""Support for multicast commands."""

from __future__ import annotations

from typing import Any, cast

from ..client import Client
from ..const import CommandClass
from ..model.node import Node, _get_value_id_dict_from_value_data
from ..model.value import SetValueResult, ValueDataType


async def _async_send_command(
    client: Client,
    command: str,
    nodes: list[Node] | None = None,
    require_schema: int | None = None,
    **kwargs: Any,
) -> dict:
    """Send a multicast command."""
    if nodes:
        cmd = {
            "command": f"multicast_group.{command}",
            "nodeIDs": [node.node_id for node in nodes],
            **kwargs,
        }
    else:
        cmd = {"command": f"broadcast_node.{command}", **kwargs}

    return await client.async_send_command(cmd, require_schema)


async def async_multicast_set_value(
    client: Client,
    new_value: Any,
    value_data: ValueDataType,
    nodes: list[Node] | None = None,
    options: dict | None = None,
) -> SetValueResult:
    """Send a multicast set_value command."""
    result = await _async_send_command(
        client,
        "set_value",
        nodes,
        valueId=_get_value_id_dict_from_value_data(value_data),
        value=new_value,
        options=options,
        require_schema=29,
    )
    return SetValueResult(result["result"])


async def async_multicast_get_endpoint_count(
    client: Client, nodes: list[Node] | None = None
) -> int:
    """Send a multicast get_endpoint_count command."""
    result = await _async_send_command(
        client, "get_endpoint_count", nodes, require_schema=5
    )
    return cast(int, result["count"])


async def async_multicast_endpoint_supports_cc(
    client: Client,
    endpoint: int,
    command_class: CommandClass,
    nodes: list[Node] | None = None,
) -> bool:
    """Send a supports_cc command to a multicast endpoint."""
    result = await _async_send_command(
        client,
        "supports_cc",
        nodes,
        index=endpoint,
        commandClass=command_class,
        require_schema=5,
    )
    return cast(bool, result["supported"])


async def async_multicast_endpoint_get_cc_version(
    client: Client,
    endpoint: int,
    command_class: CommandClass,
    nodes: list[Node] | None = None,
) -> int:
    """Send a get_cc_version command to a multicast endpoint."""
    result = await _async_send_command(
        client,
        "get_cc_version",
        nodes,
        index=endpoint,
        commandClass=command_class,
        require_schema=5,
    )
    return cast(int, result["version"])


async def async_multicast_endpoint_invoke_cc_api(
    client: Client,
    endpoint: int,
    command_class: CommandClass,
    method_name: str,
    args: list[Any] | None = None,
    nodes: list[Node] | None = None,
) -> Any:
    """Send a invoke_cc_api command to a multicast endpoint."""
    result = await _async_send_command(
        client,
        "invoke_cc_api",
        nodes,
        index=endpoint,
        commandClass=command_class,
        methodName=method_name,
        args=args,
        require_schema=5,
    )
    return result["response"]


async def async_multicast_endpoint_supports_cc_api(
    client: Client,
    endpoint: int,
    command_class: CommandClass,
    nodes: list[Node] | None = None,
) -> bool:
    """Send a supports_cc_api command to a multicast endpoint."""
    result = await _async_send_command(
        client,
        "supports_cc_api",
        nodes,
        index=endpoint,
        commandClass=command_class,
        require_schema=5,
    )
    return cast(bool, result["supported"])