File: test_threadworker.py

package info (click to toggle)
superqt 0.7.7-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 1,320 kB
  • sloc: python: 9,108; makefile: 16; sh: 12
file content (300 lines) | stat: -rw-r--r-- 7,972 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
import inspect
import threading
import time
import warnings
from functools import partial
from operator import eq
from unittest.mock import Mock

import pytest

import superqt.utils._qthreading as qthreading

equals_1 = partial(eq, 1)
equals_3 = partial(eq, 3)
skip = pytest.mark.skipif(True, reason="testing")


def test_as_generator_function():
    """Test we can convert a regular function to a generator function."""

    def func():
        return

    assert not inspect.isgeneratorfunction(func)

    newfunc = qthreading.as_generator_function(func)
    assert inspect.isgeneratorfunction(newfunc)
    assert list(newfunc()) == [None]


# qtbot is necessary for qthreading here.
# note: pytest-cov cannot check coverage of code run in the other thread.
def test_thread_worker(qtbot):
    """Test basic threadworker on a function"""

    @qthreading.thread_worker
    def func():
        return 1

    wrkr = func()
    assert isinstance(wrkr, qthreading.FunctionWorker)

    signals = [wrkr.returned, wrkr.finished]
    checks = [equals_1, lambda: True]
    with qtbot.waitSignals(signals, check_params_cbs=checks, order="strict"):
        wrkr.start()


def test_thread_generator_worker(qtbot):
    """Test basic threadworker on a generator"""

    @qthreading.thread_worker
    def func():
        yield 1
        yield 1
        return 3

    wrkr = func()
    assert isinstance(wrkr, qthreading.GeneratorWorker)

    signals = [wrkr.yielded, wrkr.yielded, wrkr.returned, wrkr.finished]
    checks = [equals_1, equals_1, equals_3, lambda: True]
    with qtbot.waitSignals(signals, check_params_cbs=checks, order="strict"):
        wrkr.start()

    qtbot.wait(500)


def test_thread_raises2(qtbot):
    handle_val = [0]

    def handle_raise(e):
        handle_val[0] = 1
        assert isinstance(e, ValueError)
        assert str(e) == "whoops"

    @qthreading.thread_worker(connect={"errored": handle_raise}, start_thread=False)
    def func():
        yield 1
        yield 1
        raise ValueError("whoops")

    wrkr = func()
    assert isinstance(wrkr, qthreading.GeneratorWorker)

    signals = [wrkr.yielded, wrkr.yielded, wrkr.errored, wrkr.finished]
    checks = [equals_1, equals_1, None, None]
    with qtbot.waitSignals(signals, check_params_cbs=checks):
        wrkr.start()
    assert handle_val[0] == 1


def test_thread_warns(qtbot):
    """Test warnings get returned to main thread"""

    def check_warning(w):
        return str(w) == "hey!"

    @qthreading.thread_worker(connect={"warned": check_warning}, start_thread=False)
    def func():
        yield 1
        warnings.warn("hey!")  # noqa: B028
        yield 3
        warnings.warn("hey!")  # noqa: B028
        return 1

    wrkr = func()
    assert isinstance(wrkr, qthreading.GeneratorWorker)

    signals = [wrkr.yielded, wrkr.warned, wrkr.yielded, wrkr.returned]
    checks = [equals_1, None, equals_3, equals_1]
    with qtbot.waitSignals(signals, check_params_cbs=checks):
        wrkr.start()


def test_multiple_connections(qtbot):
    """Test the connect dict accepts a list of functions, and type checks"""

    test1_val = [0]
    test2_val = [0]

    def func():
        return 1

    def test1(v):
        test1_val[0] = 1
        assert v == 1

    def test2(v):
        test2_val[0] = 1
        assert v == 1

    thread_func = qthreading.thread_worker(
        func, connect={"returned": [test1, test2]}, start_thread=False
    )
    worker = thread_func()
    assert isinstance(worker, qthreading.FunctionWorker)
    with qtbot.waitSignal(worker.finished):
        worker.start()

    assert test1_val[0] == 1
    assert test2_val[0] == 1

    # they must all be functions
    with pytest.raises(TypeError):
        qthreading.thread_worker(func, connect={"returned": ["test1", test2]})()

    # they must all be functions
    with pytest.raises(TypeError):
        qthreading.thread_worker(func, connect=test1)()


def test_create_worker(qapp):
    """Test directly calling create_worker."""

    def func(x, y):
        return x + y

    worker = qthreading.create_worker(func, 1, 2)
    assert isinstance(worker, qthreading.WorkerBase)

    with pytest.raises(TypeError):
        _ = qthreading.create_worker(func, 1, 2, _worker_class=object)


# note: pytest-cov cannot check coverage of code run in the other thread.
# this is just for the sake of coverage
def test_thread_worker_in_main_thread(qapp):
    """Test basic threadworker on a function"""

    def func(x):
        return x

    thread_func = qthreading.thread_worker(func)
    worker = thread_func(2)
    # NOTE: you shouldn't normally call worker.work()!  If you do, it will NOT
    # be run in a separate thread (as it would for worker.start().
    # This is for the sake of testing it in the main thread.
    assert worker.work() == 2


# note: pytest-cov cannot check coverage of code run in the other thread.
# this is just for the sake of coverage
def test_thread_generator_worker_in_main_thread(qapp):
    """Test basic threadworker on a generator in the main thread with methods."""

    def func():
        i = 0
        while i < 10:
            i += 1
            incoming = yield i
            i = incoming if incoming is not None else i
        return 3

    worker = qthreading.thread_worker(func, start_thread=False)()
    counter = 0

    def handle_pause():
        time.sleep(0.1)
        assert worker.is_paused
        worker.toggle_pause()

    def test_yield(v):
        nonlocal counter
        counter += 1
        if v == 2:
            assert not worker.is_paused
            worker.pause()
            assert not worker.is_paused
        if v == 3:
            worker.send(7)
        if v == 9:
            worker.quit()

    def handle_abort():
        assert counter == 5  # because we skipped a few by sending in 7

    worker.paused.connect(handle_pause)
    assert isinstance(worker, qthreading.GeneratorWorker)
    worker.yielded.connect(test_yield)
    worker.aborted.connect(handle_abort)
    # NOTE: you shouldn't normally call worker.work()!  If you do, it will NOT
    # be run in a separate thread (as it would for worker.start().
    # This is for the sake of testing it in the main thread.
    assert worker.work() is None  # because we aborted it
    assert not worker.is_paused
    assert counter == 5

    worker2 = qthreading.thread_worker(func, start_thread=False)()
    assert worker2.work() == 3


def test_worker_base_attribute(qapp):
    obj = qthreading.WorkerBase()
    assert obj.started is not None
    assert obj.finished is not None
    assert obj.returned is not None
    assert obj.errored is not None
    with pytest.raises(AttributeError):
        _ = obj.aa


def test_abort_does_not_return(qtbot):
    loop_counter = 0

    def long_running_func():
        nonlocal loop_counter

        for _ in range(5):
            yield loop_counter
            time.sleep(0.1)
            loop_counter += 1

    abort_counter = 0

    def count_abort():
        nonlocal abort_counter
        abort_counter += 1

    return_counter = 0

    def returned_handler(value):
        nonlocal return_counter
        return_counter += 1

    threaded_function = qthreading.thread_worker(
        long_running_func,
        connect={
            "returned": returned_handler,
            "aborted": count_abort,
        },
    )
    worker = threaded_function()
    worker.quit()
    qtbot.wait(600)
    assert loop_counter < 4
    assert abort_counter == 1
    assert return_counter == 0


def test_nested_threads_start(qtbot):
    mock1 = Mock()
    mock2 = Mock()
    event = threading.Event()

    def call_mock(_e=event):
        def nested_func():
            mock2()
            _e.set()

        mock1()
        worker2 = qthreading.create_worker(nested_func)
        worker2.start()

    worker = qthreading.create_worker(call_mock)
    worker.start()

    event.wait(timeout=2)
    mock1.assert_called_once()
    mock2.assert_called_once()