1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326
|
"""Utility functions for Z-Wave JS nodes."""
from __future__ import annotations
import logging
from typing import cast
from ..const import CommandClass, CommandStatus, ConfigurationValueType, SetValueStatus
from ..exceptions import (
BulkSetConfigParameterFailed,
InvalidNewValue,
NotFoundError,
SetValueFailed,
ValueTypeError,
)
from ..model.node import Node
from ..model.value import (
ConfigurationValue,
SetConfigParameterResult,
SetValueResult,
get_value_id_str,
)
_LOGGER = logging.getLogger(__name__)
def dump_node_state(node: Node) -> dict:
"""Get state from a node."""
return {
**node.data,
"values": {value_id: value.data for value_id, value in node.values.items()},
"endpoints": {idx: endpoint.data for idx, endpoint in node.endpoints.items()},
}
def partial_param_bit_shift(property_key: int) -> int:
"""Get the number of bits to shift the value for a given property key."""
# We can get the binary representation of the property key, reverse it,
# and find the first 1
return bin(property_key)[::-1].index("1")
async def async_set_config_parameter(
node: Node,
new_value: int | str,
property_or_property_name: int | str,
property_key: int | str | None = None,
endpoint: int = 0,
) -> tuple[ConfigurationValue, SetConfigParameterResult]:
"""Set a value for a config parameter on this node.
new_value and property_ can be provided as labels, so we need to resolve them to
the appropriate key
"""
config_values = node.get_configuration_values()
# If a property name is provided, we have to search for the correct value since
# we can't use value ID
if isinstance(property_or_property_name, str):
try:
zwave_value = next(
config_value
for config_value in config_values.values()
if config_value.property_name == property_or_property_name
and config_value.endpoint == endpoint
)
except StopIteration:
raise NotFoundError(
"Configuration parameter with parameter name "
f"{property_or_property_name} on node {node} endpoint {endpoint} "
"could not be found"
) from None
else:
value_id = get_value_id_str(
node,
CommandClass.CONFIGURATION,
property_or_property_name,
endpoint=endpoint,
property_key=property_key,
)
if value_id not in config_values:
raise NotFoundError(
f"Configuration parameter with value ID {value_id} could not be "
"found"
) from None
zwave_value = config_values[value_id]
new_value = _validate_and_transform_new_value(zwave_value, new_value)
# Finally attempt to set the value and return the Value object if successful
result = await node.async_set_value(zwave_value, new_value)
if result and result.status not in (
SetValueStatus.WORKING,
SetValueStatus.SUCCESS,
SetValueStatus.SUCCESS_UNSUPERVISED,
):
raise SetValueFailed(str(result))
status = (
SetConfigParameterResult(CommandStatus.ACCEPTED, result)
if result is not None
else SetConfigParameterResult(CommandStatus.QUEUED)
)
return zwave_value, status
async def async_bulk_set_partial_config_parameters(
node: Node,
property_: int,
new_value: int | dict[int | str, int | str],
endpoint: int = 0,
) -> SetConfigParameterResult:
"""Bulk set partial configuration values on this node."""
config_values = node.get_configuration_values()
partial_param_values = {
value_id: value
for value_id, value in config_values.items()
if value.property_ == property_
and value.endpoint == endpoint
and value.property_key is not None
}
if not partial_param_values:
# If we find a value with this property_, we know this value isn't split
# into partial params
if (
get_value_id_str(
node, CommandClass.CONFIGURATION, property_, endpoint=endpoint
)
in config_values
):
# If the new value is provided as a dict, we don't have enough information
# to set the parameter.
if isinstance(new_value, dict):
raise ValueTypeError(
f"Configuration parameter {property_} for node {node.node_id} "
f"endpoint {endpoint} does not have partials"
)
# If the new value is provided as an int, we may as well try to set it
# using the standard utility function
_LOGGER.info(
"Falling back to async_set_config_parameter because no partials "
"were found"
)
_, cmd_status = await async_set_config_parameter(
node, new_value, property_, endpoint=endpoint
)
return cmd_status
# Otherwise if we can't find any values with this property, this config
# parameter does not exist
raise NotFoundError(
f"Configuration parameter {property_} for node {node.node_id} endpoint "
f"{endpoint} not found"
)
# If new_value is a dictionary, we need to calculate the full value to send
if isinstance(new_value, dict):
new_value = _get_int_from_partials_dict(
node, partial_param_values, property_, new_value, endpoint=endpoint
)
else:
_validate_raw_int(partial_param_values, new_value)
cmd_response = await node.async_send_command(
"set_value",
valueId={
"commandClass": CommandClass.CONFIGURATION.value,
"endpoint": endpoint,
"property": property_,
},
value=new_value,
require_schema=29,
)
# If we didn't wait for a response, we assume the command has been queued
if cmd_response is None:
return SetConfigParameterResult(CommandStatus.QUEUED)
result = SetValueResult(cmd_response["result"])
if result.status not in (
SetValueStatus.WORKING,
SetValueStatus.SUCCESS,
SetValueStatus.SUCCESS_UNSUPERVISED,
):
raise SetValueFailed(str(result))
return SetConfigParameterResult(CommandStatus.ACCEPTED, result)
def _validate_and_transform_new_value(
zwave_value: ConfigurationValue, new_value: int | str
) -> int:
"""Validate a new value and return the integer value to set."""
# If needed, convert a state label to its key. We know the state exists because
# of the validation above.
if isinstance(new_value, str):
try:
new_value = int(
next(
key
for key, label in zwave_value.metadata.states.items()
if label == new_value
)
)
except StopIteration:
raise InvalidNewValue(
f"State '{new_value}' not found for parameter {zwave_value.value_id}"
) from None
if zwave_value.configuration_value_type == ConfigurationValueType.UNDEFINED:
# We need to use the Configuration CC API to set the value for this type
raise NotImplementedError("Configuration values of undefined type can't be set")
return new_value
def _bulk_set_validate_and_transform_new_value(
zwave_value: ConfigurationValue, property_key: int, new_partial_value: int | str
) -> int:
"""
Validate and transform new value for a bulk set function call.
Returns a bulk set friendly error if validation fails.
"""
try:
return _validate_and_transform_new_value(zwave_value, new_partial_value)
except (InvalidNewValue, NotImplementedError) as err:
raise BulkSetConfigParameterFailed(
f"Config parameter {zwave_value.value_id} failed validation on partial "
f"parameter {property_key}"
) from err
def _get_int_from_partials_dict(
node: Node,
partial_param_values: dict[str, ConfigurationValue],
property_: int,
new_value: dict[int | str, int | str],
endpoint: int = 0,
) -> int:
"""Take an input dict for a set of partial values and compute the raw int value."""
int_value = 0
provided_partial_values = []
# For each property key provided, we bit shift the partial value using the
# property_key
for property_key_or_name, partial_value in new_value.items():
# If the dict key is a property key, we can generate the value ID to find the
# partial value
if isinstance(property_key_or_name, int):
value_id = get_value_id_str(
node,
CommandClass.CONFIGURATION,
property_,
property_key=property_key_or_name,
endpoint=endpoint,
)
if value_id not in partial_param_values:
raise NotFoundError(
f"Bitmask {property_key_or_name} ({hex(property_key_or_name)}) "
f"not found for parameter {property_} on node {node} endpoint "
f"{endpoint}"
)
zwave_value = partial_param_values[value_id]
# If the dict key is a property name, we have to find the value from the list
# of partial param values
else:
try:
zwave_value = next(
value
for value in partial_param_values.values()
if value.property_name == property_key_or_name
and value.endpoint == endpoint
)
except StopIteration:
raise NotFoundError(
f"Partial parameter with label '{property_key_or_name}'"
f"not found for parameter {property_} on node {node} endpoint "
f"{endpoint}"
) from None
provided_partial_values.append(zwave_value)
property_key = cast(int, zwave_value.property_key)
partial_value = _bulk_set_validate_and_transform_new_value(
zwave_value, property_key, partial_value
)
int_value += partial_value << partial_param_bit_shift(property_key)
# To set partial parameters in bulk, we also have to include cached values for
# property keys that haven't been specified
missing_values = set(partial_param_values.values()) - set(provided_partial_values)
int_value += sum(
cast(int, property_value.value)
<< partial_param_bit_shift(cast(int, property_value.property_key))
for property_value in missing_values
)
return int_value
def _validate_raw_int(
partial_param_values: dict[str, ConfigurationValue], new_value: int
) -> None:
"""
Validate raw value against all partial values.
Raises if a partial value in the raw value is invalid.
"""
# Break down the bulk value into partial values and validate them against
# each partial parameter's metadata by looping through the property values
# starting with the highest property key
for zwave_value in sorted(
partial_param_values.values(),
key=lambda val: cast(int, val.property_key),
reverse=True,
):
property_key = cast(int, zwave_value.property_key)
multiplication_factor = 2 ** partial_param_bit_shift(property_key)
partial_value = int(new_value / multiplication_factor)
new_value = new_value % multiplication_factor
_bulk_set_validate_and_transform_new_value(
zwave_value, property_key, partial_value
)
|