1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246
|
"""Unit test for StateUpdater."""
from typing import Any
from unittest.mock import Mock, patch
import pytest
from xknx import XKNX
from xknx.core import XknxConnectionState
from xknx.core.state_updater import StateTrackerType, _StateTracker
from xknx.remote_value import RemoteValue
from xknx.telegram import GroupAddress
@patch.multiple(RemoteValue, __abstractmethods__=set())
class TestStateUpdater:
"""Test class for state updater."""
def test_register_unregister(self) -> None:
"""Test register and unregister."""
xknx = XKNX()
assert len(xknx.state_updater._workers) == 0
# register when state address and sync_state is set
remote_value_1: RemoteValue[Any] = RemoteValue(
xknx, sync_state=True, group_address_state=GroupAddress("1/1/1")
)
assert len(xknx.state_updater._workers) == 0
remote_value_1.register_state_updater()
assert len(xknx.state_updater._workers) == 1
# don't register when sync_state is False
remote_value_2: RemoteValue[Any] = RemoteValue(
xknx, sync_state=False, group_address_state=GroupAddress("1/1/1")
)
assert len(xknx.state_updater._workers) == 1
remote_value_2.register_state_updater()
assert len(xknx.state_updater._workers) == 1
# manual registration
xknx.state_updater.register_remote_value(remote_value_2)
assert len(xknx.state_updater._workers) == 2
# manual unregister
xknx.state_updater.unregister_remote_value(remote_value_1)
# only remote_value_2 remaining
assert len(xknx.state_updater._workers) == 1
assert next(iter(xknx.state_updater._workers.keys())) == id(remote_value_2)
# unregister
remote_value_2.unregister_state_updater()
assert len(xknx.state_updater._workers) == 0
def test_tracker_parser(self) -> None:
"""Test parsing tracker options."""
xknx = XKNX()
def _get_only_tracker() -> _StateTracker:
# _workers is unordered so it just works with 1 item
assert len(xknx.state_updater._workers) == 1
_tracker = next(iter(xknx.state_updater._workers.values()))
return _tracker
# INIT
remote_value_init: RemoteValue[Any] = RemoteValue(
xknx, sync_state="init", group_address_state=GroupAddress("1/1/1")
)
remote_value_init.register_state_updater()
assert _get_only_tracker().tracker_type == StateTrackerType.INIT
remote_value_init.unregister_state_updater()
# DEFAULT with int
remote_value_expire: RemoteValue[Any] = RemoteValue(
xknx, sync_state=2, group_address_state=GroupAddress("1/1/1")
)
remote_value_expire.register_state_updater()
assert _get_only_tracker().tracker_type == StateTrackerType.EXPIRE
assert _get_only_tracker().update_interval == 2 * 60
remote_value_expire.unregister_state_updater()
# DEFAULT with float
remote_value_expire = RemoteValue(
xknx, sync_state=6.9, group_address_state=GroupAddress("1/1/1")
)
remote_value_expire.register_state_updater()
assert _get_only_tracker().tracker_type == StateTrackerType.EXPIRE
assert _get_only_tracker().update_interval == 6.9 * 60
remote_value_expire.unregister_state_updater()
# EXPIRE with default time
remote_value_expire = RemoteValue(
xknx, sync_state="expire", group_address_state=GroupAddress("1/1/1")
)
remote_value_expire.register_state_updater()
assert _get_only_tracker().tracker_type == StateTrackerType.EXPIRE
assert _get_only_tracker().update_interval == 60 * 60
remote_value_expire.unregister_state_updater()
# EXPIRE with 30 minutes
remote_value_expire = RemoteValue(
xknx, sync_state="expire 30", group_address_state=GroupAddress("1/1/1")
)
remote_value_expire.register_state_updater()
assert _get_only_tracker().tracker_type == StateTrackerType.EXPIRE
assert _get_only_tracker().update_interval == 30 * 60
remote_value_expire.unregister_state_updater()
# PERIODICALLY with default time
remote_value_every: RemoteValue[Any] = RemoteValue(
xknx, sync_state="every", group_address_state=GroupAddress("1/1/1")
)
remote_value_every.register_state_updater()
assert _get_only_tracker().tracker_type == StateTrackerType.PERIODICALLY
assert _get_only_tracker().update_interval == 60 * 60
remote_value_every.unregister_state_updater()
# PERIODICALLY 10 * 60 seconds
remote_value_every = RemoteValue(
xknx, sync_state="every 10", group_address_state=GroupAddress("1/1/1")
)
remote_value_every.register_state_updater()
assert _get_only_tracker().tracker_type == StateTrackerType.PERIODICALLY
assert _get_only_tracker().update_interval == 10 * 60
remote_value_every.unregister_state_updater()
@patch("logging.Logger.warning")
def test_tracker_parser_invalid_options(self, logging_warning_mock: Mock) -> None:
"""Test parsing invalid tracker options."""
xknx = XKNX()
def _get_only_tracker() -> _StateTracker:
# _workers is unordered so it just works with 1 item
assert len(xknx.state_updater._workers) == 1
_tracker = next(iter(xknx.state_updater._workers.values()))
return _tracker
# INVALID string
remote_value_invalid: RemoteValue[Any] = RemoteValue(
xknx, sync_state="invalid", group_address_state=GroupAddress("1/1/1")
)
remote_value_invalid.register_state_updater()
logging_warning_mock.assert_called_once_with(
'Could not parse StateUpdater tracker_options "%s" for %s. Using default %s %s minutes.',
"invalid",
str(remote_value_invalid),
StateTrackerType.EXPIRE,
60,
)
assert _get_only_tracker().tracker_type == StateTrackerType.EXPIRE
assert _get_only_tracker().update_interval == 60 * 60
remote_value_invalid.unregister_state_updater()
logging_warning_mock.reset_mock()
# interval too long
remote_value_long: RemoteValue[Any] = RemoteValue(
xknx, sync_state=1441, group_address_state=GroupAddress("1/1/1")
)
remote_value_long.register_state_updater()
logging_warning_mock.assert_called_once_with(
"StateUpdater interval of %s to long for %s. Using maximum of %s minutes (1 day)",
1441,
str(remote_value_long),
1440,
)
remote_value_long.unregister_state_updater()
def test_state_updater_start_update_stop(self) -> None:
"""Test start, update_received and stop of StateUpdater."""
xknx = XKNX()
remote_value_1: RemoteValue[Any] = RemoteValue(
xknx, sync_state=True, group_address_state=GroupAddress("1/1/1")
)
remote_value_2: RemoteValue[Any] = RemoteValue(
xknx, sync_state=True, group_address_state=GroupAddress("1/1/2")
)
xknx.state_updater._workers[id(remote_value_1)] = Mock()
xknx.state_updater._workers[id(remote_value_2)] = Mock()
assert not xknx.state_updater.started
xknx.connection_manager._state = XknxConnectionState.CONNECTED
xknx.state_updater.start()
assert xknx.state_updater.started
# start
xknx.state_updater._workers[id(remote_value_1)].start.assert_called_once_with()
xknx.state_updater._workers[id(remote_value_2)].start.assert_called_once_with()
# update
xknx.state_updater.update_received(remote_value_2)
xknx.state_updater._workers[
id(remote_value_1)
].update_received.assert_not_called()
xknx.state_updater._workers[
id(remote_value_2)
].update_received.assert_called_once_with()
# stop
xknx.state_updater.stop()
assert not xknx.state_updater.started
xknx.state_updater._workers[id(remote_value_1)].stop.assert_called_once_with()
xknx.state_updater._workers[id(remote_value_2)].stop.assert_called_once_with()
# don't update when not started
xknx.state_updater.update_received(remote_value_1)
xknx.state_updater._workers[
id(remote_value_1)
].update_received.assert_not_called()
async def test_stop_start_state_updater_when_reconnecting(self) -> None:
"""Test start/stop state updater after reconnect."""
xknx = XKNX()
assert not xknx.state_updater.started
xknx.connection_manager._state = XknxConnectionState.CONNECTED
xknx.state_updater.start()
assert xknx.state_updater.started
xknx.connection_manager.connection_state_changed(
XknxConnectionState.DISCONNECTED
)
assert not xknx.state_updater.started
xknx.connection_manager.connection_state_changed(XknxConnectionState.CONNECTED)
assert xknx.state_updater.started
@pytest.mark.parametrize(
("default", "sync_state_value", "expected_interval", "expected_tracker_type"),
[
(90, True, 90, StateTrackerType.EXPIRE),
(False, True, 60, StateTrackerType.EXPIRE),
(True, None, 60, StateTrackerType.EXPIRE),
(40, None, 40, StateTrackerType.EXPIRE),
("every 70", None, 70, StateTrackerType.PERIODICALLY),
("init", True, 60, StateTrackerType.INIT),
("every 80", "expire 20", 20, StateTrackerType.EXPIRE),
],
)
async def test_stat_updater_default(
self,
default: int | str | bool,
sync_state_value: int | str | bool | None,
expected_interval: int,
expected_tracker_type: StateTrackerType,
) -> None:
"""Test setting a default for StateUpdater."""
xknx = XKNX(state_updater=default)
remote_value: RemoteValue[Any] = RemoteValue(
xknx, sync_state=sync_state_value, group_address_state=GroupAddress("1/1/1")
)
remote_value.register_state_updater()
assert (
xknx.state_updater._workers[id(remote_value)].update_interval
== expected_interval * 60
)
assert (
xknx.state_updater._workers[id(remote_value)].tracker_type
== expected_tracker_type
)
|