1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300
|
import inspect
import threading
import time
import warnings
from functools import partial
from operator import eq
from unittest.mock import Mock
import pytest
import superqt.utils._qthreading as qthreading
equals_1 = partial(eq, 1)
equals_3 = partial(eq, 3)
skip = pytest.mark.skipif(True, reason="testing")
def test_as_generator_function():
"""Test we can convert a regular function to a generator function."""
def func():
return
assert not inspect.isgeneratorfunction(func)
newfunc = qthreading.as_generator_function(func)
assert inspect.isgeneratorfunction(newfunc)
assert list(newfunc()) == [None]
# qtbot is necessary for qthreading here.
# note: pytest-cov cannot check coverage of code run in the other thread.
def test_thread_worker(qtbot):
"""Test basic threadworker on a function"""
@qthreading.thread_worker
def func():
return 1
wrkr = func()
assert isinstance(wrkr, qthreading.FunctionWorker)
signals = [wrkr.returned, wrkr.finished]
checks = [equals_1, lambda: True]
with qtbot.waitSignals(signals, check_params_cbs=checks, order="strict"):
wrkr.start()
def test_thread_generator_worker(qtbot):
"""Test basic threadworker on a generator"""
@qthreading.thread_worker
def func():
yield 1
yield 1
return 3
wrkr = func()
assert isinstance(wrkr, qthreading.GeneratorWorker)
signals = [wrkr.yielded, wrkr.yielded, wrkr.returned, wrkr.finished]
checks = [equals_1, equals_1, equals_3, lambda: True]
with qtbot.waitSignals(signals, check_params_cbs=checks, order="strict"):
wrkr.start()
qtbot.wait(500)
def test_thread_raises2(qtbot):
handle_val = [0]
def handle_raise(e):
handle_val[0] = 1
assert isinstance(e, ValueError)
assert str(e) == "whoops"
@qthreading.thread_worker(connect={"errored": handle_raise}, start_thread=False)
def func():
yield 1
yield 1
raise ValueError("whoops")
wrkr = func()
assert isinstance(wrkr, qthreading.GeneratorWorker)
signals = [wrkr.yielded, wrkr.yielded, wrkr.errored, wrkr.finished]
checks = [equals_1, equals_1, None, None]
with qtbot.waitSignals(signals, check_params_cbs=checks):
wrkr.start()
assert handle_val[0] == 1
def test_thread_warns(qtbot):
"""Test warnings get returned to main thread"""
def check_warning(w):
return str(w) == "hey!"
@qthreading.thread_worker(connect={"warned": check_warning}, start_thread=False)
def func():
yield 1
warnings.warn("hey!") # noqa: B028
yield 3
warnings.warn("hey!") # noqa: B028
return 1
wrkr = func()
assert isinstance(wrkr, qthreading.GeneratorWorker)
signals = [wrkr.yielded, wrkr.warned, wrkr.yielded, wrkr.returned]
checks = [equals_1, None, equals_3, equals_1]
with qtbot.waitSignals(signals, check_params_cbs=checks):
wrkr.start()
def test_multiple_connections(qtbot):
"""Test the connect dict accepts a list of functions, and type checks"""
test1_val = [0]
test2_val = [0]
def func():
return 1
def test1(v):
test1_val[0] = 1
assert v == 1
def test2(v):
test2_val[0] = 1
assert v == 1
thread_func = qthreading.thread_worker(
func, connect={"returned": [test1, test2]}, start_thread=False
)
worker = thread_func()
assert isinstance(worker, qthreading.FunctionWorker)
with qtbot.waitSignal(worker.finished):
worker.start()
assert test1_val[0] == 1
assert test2_val[0] == 1
# they must all be functions
with pytest.raises(TypeError):
qthreading.thread_worker(func, connect={"returned": ["test1", test2]})()
# they must all be functions
with pytest.raises(TypeError):
qthreading.thread_worker(func, connect=test1)()
def test_create_worker(qapp):
"""Test directly calling create_worker."""
def func(x, y):
return x + y
worker = qthreading.create_worker(func, 1, 2)
assert isinstance(worker, qthreading.WorkerBase)
with pytest.raises(TypeError):
_ = qthreading.create_worker(func, 1, 2, _worker_class=object)
# note: pytest-cov cannot check coverage of code run in the other thread.
# this is just for the sake of coverage
def test_thread_worker_in_main_thread(qapp):
"""Test basic threadworker on a function"""
def func(x):
return x
thread_func = qthreading.thread_worker(func)
worker = thread_func(2)
# NOTE: you shouldn't normally call worker.work()! If you do, it will NOT
# be run in a separate thread (as it would for worker.start().
# This is for the sake of testing it in the main thread.
assert worker.work() == 2
# note: pytest-cov cannot check coverage of code run in the other thread.
# this is just for the sake of coverage
def test_thread_generator_worker_in_main_thread(qapp):
"""Test basic threadworker on a generator in the main thread with methods."""
def func():
i = 0
while i < 10:
i += 1
incoming = yield i
i = incoming if incoming is not None else i
return 3
worker = qthreading.thread_worker(func, start_thread=False)()
counter = 0
def handle_pause():
time.sleep(0.1)
assert worker.is_paused
worker.toggle_pause()
def test_yield(v):
nonlocal counter
counter += 1
if v == 2:
assert not worker.is_paused
worker.pause()
assert not worker.is_paused
if v == 3:
worker.send(7)
if v == 9:
worker.quit()
def handle_abort():
assert counter == 5 # because we skipped a few by sending in 7
worker.paused.connect(handle_pause)
assert isinstance(worker, qthreading.GeneratorWorker)
worker.yielded.connect(test_yield)
worker.aborted.connect(handle_abort)
# NOTE: you shouldn't normally call worker.work()! If you do, it will NOT
# be run in a separate thread (as it would for worker.start().
# This is for the sake of testing it in the main thread.
assert worker.work() is None # because we aborted it
assert not worker.is_paused
assert counter == 5
worker2 = qthreading.thread_worker(func, start_thread=False)()
assert worker2.work() == 3
def test_worker_base_attribute(qapp):
obj = qthreading.WorkerBase()
assert obj.started is not None
assert obj.finished is not None
assert obj.returned is not None
assert obj.errored is not None
with pytest.raises(AttributeError):
_ = obj.aa
def test_abort_does_not_return(qtbot):
loop_counter = 0
def long_running_func():
nonlocal loop_counter
for _ in range(5):
yield loop_counter
time.sleep(0.1)
loop_counter += 1
abort_counter = 0
def count_abort():
nonlocal abort_counter
abort_counter += 1
return_counter = 0
def returned_handler(value):
nonlocal return_counter
return_counter += 1
threaded_function = qthreading.thread_worker(
long_running_func,
connect={
"returned": returned_handler,
"aborted": count_abort,
},
)
worker = threaded_function()
worker.quit()
qtbot.wait(600)
assert loop_counter < 4
assert abort_counter == 1
assert return_counter == 0
def test_nested_threads_start(qtbot):
mock1 = Mock()
mock2 = Mock()
event = threading.Event()
def call_mock(_e=event):
def nested_func():
mock2()
_e.set()
mock1()
worker2 = qthreading.create_worker(nested_func)
worker2.start()
worker = qthreading.create_worker(call_mock)
worker.start()
event.wait(timeout=2)
mock1.assert_called_once()
mock2.assert_called_once()
|