1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
|
"""Workers Tests"""
from os import getpid
import pytest
from circuits import Worker, task
@pytest.fixture()
def worker(request, manager):
worker = Worker().register(manager)
def finalizer():
worker.unregister()
request.addfinalizer(finalizer)
return worker
def err():
return x * 2 # noqa: F821
def foo():
x = 0
i = 0
while i < 1000000:
x += 1
i += 1
return x
def pid():
return f'Hello from {getpid():d}'
def add(a, b):
return a + b
def test_failure(manager, watcher, worker):
e = task(err)
e.failure = True
x = worker.fire(e)
assert watcher.wait('task_failure')
assert isinstance(x.value[1], Exception)
def test_success(manager, watcher, worker):
e = task(foo)
e.success = True
x = worker.fire(e)
assert watcher.wait('task_success')
assert x.value == 1000000
def test_args(manager, watcher, worker):
e = task(add, 1, 2)
e.success = True
x = worker.fire(e)
assert watcher.wait('task_success')
assert x.value == 3
|