File: task_registry_test.py

package info (click to toggle)
python-xknx 3.6.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 4,012 kB
  • sloc: python: 39,710; javascript: 8,556; makefile: 27; sh: 12
file content (155 lines) | stat: -rw-r--r-- 4,512 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
"""Unit test for task registry."""

import asyncio
import sys

from xknx import XKNX
from xknx.core import XknxConnectionState

from ..conftest import EventLoopClockAdvancer


class TestTaskRegistry:
    """Test class for task registry."""

    #
    # TEST REGISTER/UNREGISTER
    #
    async def test_register(self) -> None:
        """Test register."""

        xknx = XKNX()

        async def callback() -> None:
            """Reset tasks."""
            xknx.task_registry.tasks = []

        task = xknx.task_registry.register(
            name="test",
            async_func=callback,
        )
        assert len(xknx.task_registry.tasks) == 1

        task.start()
        assert not task.done()

        await xknx.task_registry.block_till_done()
        assert task.done()
        assert len(xknx.task_registry.tasks) == 0

    async def test_unregister(self) -> None:
        """Test unregister after register."""

        xknx = XKNX()

        async def callback() -> None:
            """Do nothing."""

        task = xknx.task_registry.register(
            name="test",
            async_func=callback,
        )
        assert len(xknx.task_registry.tasks) == 1
        task.start()
        xknx.task_registry.unregister(task.name)
        assert len(xknx.task_registry.tasks) == 0
        assert task.done()

    #
    # TEST START/STOP
    #
    async def test_stop(self) -> None:
        """Test stop."""

        xknx = XKNX()

        async def callback() -> None:
            """Reset tasks."""
            await asyncio.sleep(100)

        task = xknx.task_registry.register(
            name="test",
            async_func=callback,
        )
        assert len(xknx.task_registry.tasks) == 1
        task.start()
        xknx.task_registry.stop()
        assert len(xknx.task_registry.tasks) == 0

    #
    # TEST CONNECTION HANDLING
    #
    async def test_reconnect_handling(
        self, time_travel: EventLoopClockAdvancer
    ) -> None:
        """Test reconnect handling."""

        xknx = XKNX()
        xknx.task_registry.start()
        assert len(xknx.connection_manager._connection_state_changed_cbs) == 1
        xknx.connection_manager.connection_state_changed(XknxConnectionState.CONNECTED)
        # pylint: disable=attribute-defined-outside-init
        self.test = 0

        async def callback() -> None:
            """Reset tasks."""
            try:
                while True:
                    await asyncio.sleep(100)
                    self.test += 1
            except asyncio.CancelledError:
                self.test -= 1

        task = xknx.task_registry.register(
            name="test", async_func=callback, restart_after_reconnect=True
        )
        assert len(xknx.task_registry.tasks) == 1
        task.start()
        assert task._task is not None
        await time_travel(100)
        assert self.test == 1

        xknx.connection_manager.connection_state_changed(
            XknxConnectionState.DISCONNECTED
        )
        await asyncio.sleep(0)  # iterate loop to cancel task
        assert task._task is None
        assert self.test == 0

        xknx.connection_manager.connection_state_changed(XknxConnectionState.CONNECTED)
        assert task._task is not None
        assert self.test == 0

        await time_travel(100)
        assert self.test == 1
        assert len(xknx.task_registry.tasks) == 1

        xknx.task_registry.stop()
        assert len(xknx.task_registry.tasks) == 0
        assert task._task is None
        await asyncio.sleep(0)  # iterate loop to cancel task
        assert self.test == 0
        assert len(xknx.connection_manager._connection_state_changed_cbs) == 0

    async def test_background(self, time_travel: EventLoopClockAdvancer) -> None:
        """Test running background task."""
        test_time = 10

        async def callback() -> None:
            """Do nothing."""
            await asyncio.sleep(test_time)

        xknx = XKNX()
        xknx.task_registry.background(callback())
        assert len(xknx.task_registry._background_task) == 1
        task = next(iter(xknx.task_registry._background_task))
        refs = sys.getrefcount(task)
        assert refs == 4
        assert not task.done()

        # after task is finished it should remove itself from the background registry
        await time_travel(test_time)
        assert len(xknx.task_registry._background_task) == 0
        assert task.done()
        refs = sys.getrefcount(task)
        assert refs == 2