File: message_resolver.py

package info (click to toggle)
python-yolink-api 0.6.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 184 kB
  • sloc: python: 1,147; makefile: 2
file content (230 lines) | stat: -rw-r--r-- 9,378 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
"""YoLink cloud message resolver."""

from __future__ import annotations

from typing import TYPE_CHECKING, Any
from math import log2
from decimal import Decimal, ROUND_DOWN

from .unit_helper import UnitOfVolume, VolumeConverter
from .const import (
    ATTR_DEVICE_SMART_REMOTER,
    ATTR_DEVICE_WATER_DEPTH_SENSOR,
    ATTR_DEVICE_WATER_METER_CONTROLLER,
    ATTR_DEVICE_MULTI_WATER_METER_CONTROLLER,
    ATTR_DEVICE_SOIL_TH_SENSOR,
    ATTR_DEVICE_SPRINKLER,
    ATTR_DEVICE_SPRINKLER_V2,
)

if TYPE_CHECKING:
    from .device import YoLinkDevice


def smart_remoter_message_resolve(msg_data: dict[str, Any], event_type: str) -> None:
    """SmartRemoter message resolve."""
    if msg_data is not None:
        btn_press_event = msg_data.get("event")
        if btn_press_event is not None:
            if event_type == "Report":
                msg_data["event"] = None
            else:
                key_mask = btn_press_event["keyMask"]
                button_sequence = 0 if key_mask == 0 else (int(log2(key_mask)) + 1)
                # replace with button sequence
                msg_data["event"]["keyMask"] = button_sequence


def water_depth_sensor_message_resolve(
    msg_data: dict[str, Any], dev_attrs: dict[str, Any]
) -> None:
    """WaterDepthSensor message resolve."""
    if msg_data is not None:
        depth_value = msg_data.get("waterDepth")
        if depth_value is not None:
            # default range settings if range and desity was not set.
            dev_range = 5
            dev_density = 1
            if (
                dev_attrs is not None
                and (range_attrs := dev_attrs.get("range")) is not None
            ):
                dev_range = range_attrs["range"]
                dev_density = range_attrs["density"]
            msg_data["waterDepth"] = round(
                (dev_range * (depth_value / 1000)) / dev_density, 3
            )


def water_meter_controller_message_resolve(
    msg_data: dict[str, Any], device_model: str | None
) -> None:
    """WaterMeterController message resolve."""
    if msg_data is not None and ((meter_state := msg_data.get("state")) is not None):
        meter_step_factor: int = 10
        # for some reason meter value can't be read
        meter_value = meter_state.get("meter")
        if meter_value is not None:
            meter_unit = UnitOfVolume.GALLONS
            if (meter_attrs := msg_data.get("attributes")) is not None:
                if device_model is not None and device_model.startswith("YS5009"):
                    meter_step_factor = (
                        1 / (_meter_step_factor / (1000 * 100))
                        if (_meter_step_factor := meter_attrs.get("meterStepFactor"))
                        is not None
                        else 10
                    )
                else:
                    meter_step_factor = (
                        _meter_step_factor
                        if (_meter_step_factor := meter_attrs.get("meterStepFactor"))
                        is not None
                        else 10
                    )
                meter_unit = (
                    UnitOfVolume(_meter_unit)
                    if (_meter_unit := meter_attrs.get("meterUnit")) is not None
                    else UnitOfVolume.GALLONS
                )
            _meter_reading = None
            if meter_step_factor < 0:
                _meter_reading = meter_value * abs(meter_step_factor)
            else:
                _meter_reading = meter_value / meter_step_factor
            meter_value = VolumeConverter.convert(
                _meter_reading, meter_unit, UnitOfVolume.CUBIC_METERS
            )
            msg_data["meter_reading"] = float(
                Decimal(meter_value).quantize(Decimal(".00000"), rounding=ROUND_DOWN)
            )
        msg_data["valve_state"] = meter_state["valve"]


def multi_water_meter_controller_message_resolve(
    msg_data: dict[str, Any],
    device_model: str | None,
) -> None:
    """MultiWaterMeterController message resolve."""
    if msg_data is not None and ((meter_state := msg_data.get("state")) is not None):
        meter_step_factor: int = 10
        meter_reading_values: dict = meter_state.get("meters")
        if meter_reading_values is not None:
            meter_unit = UnitOfVolume.GALLONS
            if (meter_attrs := msg_data.get("attributes")) is not None:
                if device_model is not None and device_model.startswith("YS5029"):
                    meter_step_factor = (
                        1 / (_meter_step_factor / (1000 * 100))
                        if (_meter_step_factor := meter_attrs.get("meterStepFactor"))
                        is not None
                        else 10
                    )
                else:
                    meter_step_factor = (
                        _meter_step_factor
                        if (_meter_step_factor := meter_attrs.get("meterStepFactor"))
                        is not None
                        else 10
                    )
                meter_unit = (
                    UnitOfVolume(_meter_unit)
                    if (_meter_unit := meter_attrs.get("meterUnit")) is not None
                    else UnitOfVolume.GALLONS
                )
            _meter_1_reading = None
            if meter_step_factor < 0:
                _meter_1_reading = meter_reading_values["0"] * abs(meter_step_factor)
            else:
                _meter_1_reading = meter_reading_values["0"] / meter_step_factor
            meter_reading_values["0"] = VolumeConverter.convert(
                _meter_1_reading,
                meter_unit,
                UnitOfVolume.CUBIC_METERS,
            )
            _meter_2_reading = None
            if meter_step_factor < 0:
                _meter_2_reading = meter_reading_values["1"] * abs(meter_step_factor)
            else:
                _meter_2_reading = meter_reading_values["1"] / meter_step_factor
            meter_reading_values["1"] = VolumeConverter.convert(
                _meter_2_reading,
                meter_unit,
                UnitOfVolume.CUBIC_METERS,
            )
            msg_data["meter_1_reading"] = float(
                Decimal(meter_reading_values["0"]).quantize(
                    Decimal(".00000"), rounding=ROUND_DOWN
                )
            )
            msg_data["meter_2_reading"] = float(
                Decimal(meter_reading_values["1"]).quantize(
                    Decimal(".00000"), rounding=ROUND_DOWN
                )
            )
        # for some reason meter value can't be read
        if (meter_valves := meter_state.get("valves")) is not None:
            msg_data["valve_1_state"] = meter_valves["0"]
            msg_data["valve_2_state"] = meter_valves["1"]


def soil_thc_sensor_message_resolve(
    msg_data: dict[str, Any],
) -> None:
    """SoilThcSensor message resolve."""
    if msg_data is not None and ((state := msg_data.get("state")) is not None):
        msg_data["temperature"] = state.get("temperature")
        msg_data["humidity"] = state.get("humidity")
        msg_data["conductivity"] = state.get("conductivity")


def sprinkler_message_resolve(
    device: YoLinkDevice,
    msg_data: dict[str, Any],
    msg_type: str | None = None,
) -> None:
    """Sprinkler message resolve."""
    if msg_data is not None:
        if (state := msg_data.get("state")) is not None:
            device._state = {"mode": state.get("mode")}
            if (watering_data := state.get("watering")) is not None:
                msg_data["valve"] = watering_data["left"] != watering_data["total"]
        if msg_type == "waterReport":
            if device._state is not None:
                msg_data["state"] = {"mode": device._state.get("mode")}
            if (event := msg_data.get("event")) is not None:
                msg_data["valve"] = event == "start"


def sprinkler_v2_message_resolve(
    msg_data: dict[str, Any],
) -> None:
    """Sprinkler V2 message resolve."""
    if msg_data is not None and ((state := msg_data.get("state")) is not None):
        msg_data["valve"] = state.get("running")


def resolve_message(
    device: YoLinkDevice, msg_data: dict[str, Any], msg_type: str | None
) -> None:
    """Resolve device message."""
    if device.device_type == ATTR_DEVICE_WATER_DEPTH_SENSOR:
        water_depth_sensor_message_resolve(msg_data, device.device_attrs)
    elif device.device_type == ATTR_DEVICE_WATER_METER_CONTROLLER:
        water_meter_controller_message_resolve(msg_data, device.device_model_name)
    elif device.device_type == ATTR_DEVICE_MULTI_WATER_METER_CONTROLLER:
        multi_water_meter_controller_message_resolve(msg_data, device.device_model_name)
    elif device.device_type == ATTR_DEVICE_SOIL_TH_SENSOR:
        soil_thc_sensor_message_resolve(msg_data)
    elif device.device_type == ATTR_DEVICE_SPRINKLER:
        sprinkler_message_resolve(device, msg_data, msg_type)
    elif device.device_type == ATTR_DEVICE_SPRINKLER_V2:
        sprinkler_v2_message_resolve(msg_data)


def resolve_sub_message(
    device: YoLinkDevice, msg_data: dict[str, Any], msg_type: str
) -> None:
    """Resolve device pushing message."""
    if device.device_type == ATTR_DEVICE_SMART_REMOTER:
        smart_remoter_message_resolve(msg_data, msg_type)
    else:
        resolve_message(device, msg_data, msg_type)