1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155
|
"""Unit test for task registry."""
import asyncio
import sys
from xknx import XKNX
from xknx.core import XknxConnectionState
from ..conftest import EventLoopClockAdvancer
class TestTaskRegistry:
"""Test class for task registry."""
#
# TEST REGISTER/UNREGISTER
#
async def test_register(self) -> None:
"""Test register."""
xknx = XKNX()
async def callback() -> None:
"""Reset tasks."""
xknx.task_registry.tasks = []
task = xknx.task_registry.register(
name="test",
async_func=callback,
)
assert len(xknx.task_registry.tasks) == 1
task.start()
assert not task.done()
await xknx.task_registry.block_till_done()
assert task.done()
assert len(xknx.task_registry.tasks) == 0
async def test_unregister(self) -> None:
"""Test unregister after register."""
xknx = XKNX()
async def callback() -> None:
"""Do nothing."""
task = xknx.task_registry.register(
name="test",
async_func=callback,
)
assert len(xknx.task_registry.tasks) == 1
task.start()
xknx.task_registry.unregister(task.name)
assert len(xknx.task_registry.tasks) == 0
assert task.done()
#
# TEST START/STOP
#
async def test_stop(self) -> None:
"""Test stop."""
xknx = XKNX()
async def callback() -> None:
"""Reset tasks."""
await asyncio.sleep(100)
task = xknx.task_registry.register(
name="test",
async_func=callback,
)
assert len(xknx.task_registry.tasks) == 1
task.start()
xknx.task_registry.stop()
assert len(xknx.task_registry.tasks) == 0
#
# TEST CONNECTION HANDLING
#
async def test_reconnect_handling(
self, time_travel: EventLoopClockAdvancer
) -> None:
"""Test reconnect handling."""
xknx = XKNX()
xknx.task_registry.start()
assert len(xknx.connection_manager._connection_state_changed_cbs) == 1
xknx.connection_manager.connection_state_changed(XknxConnectionState.CONNECTED)
# pylint: disable=attribute-defined-outside-init
self.test = 0
async def callback() -> None:
"""Reset tasks."""
try:
while True:
await asyncio.sleep(100)
self.test += 1
except asyncio.CancelledError:
self.test -= 1
task = xknx.task_registry.register(
name="test", async_func=callback, restart_after_reconnect=True
)
assert len(xknx.task_registry.tasks) == 1
task.start()
assert task._task is not None
await time_travel(100)
assert self.test == 1
xknx.connection_manager.connection_state_changed(
XknxConnectionState.DISCONNECTED
)
await asyncio.sleep(0) # iterate loop to cancel task
assert task._task is None
assert self.test == 0
xknx.connection_manager.connection_state_changed(XknxConnectionState.CONNECTED)
assert task._task is not None
assert self.test == 0
await time_travel(100)
assert self.test == 1
assert len(xknx.task_registry.tasks) == 1
xknx.task_registry.stop()
assert len(xknx.task_registry.tasks) == 0
assert task._task is None
await asyncio.sleep(0) # iterate loop to cancel task
assert self.test == 0
assert len(xknx.connection_manager._connection_state_changed_cbs) == 0
async def test_background(self, time_travel: EventLoopClockAdvancer) -> None:
"""Test running background task."""
test_time = 10
async def callback() -> None:
"""Do nothing."""
await asyncio.sleep(test_time)
xknx = XKNX()
xknx.task_registry.background(callback())
assert len(xknx.task_registry._background_task) == 1
task = next(iter(xknx.task_registry._background_task))
refs = sys.getrefcount(task)
assert refs == 4
assert not task.done()
# after task is finished it should remove itself from the background registry
await time_travel(test_time)
assert len(xknx.task_registry._background_task) == 0
assert task.done()
refs = sys.getrefcount(task)
assert refs == 2
|