1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
|
# Module: test_workers
# Date: 7th October 2008
# Author: James Mills, prologic at shortcircuit dot net dot au
"""Workers Tests"""
import pytest
from circuits import task, Worker
@pytest.fixture(scope="module")
def worker(request):
worker = Worker()
def finalizer():
worker.stop()
request.addfinalizer(finalizer)
if request.config.option.verbose:
from circuits import Debugger
Debugger().register(worker)
waiter = pytest.WaitEvent(worker, "started")
worker.start()
assert waiter.wait()
return worker
def f():
x = 0
i = 0
while i < 1000000:
x += 1
i += 1
return x
def add(a, b):
return a + b
def test(worker):
x = worker.fire(task(f))
assert pytest.wait_for(x, "result")
assert x.result
assert x.value == 1000000
def test_args(worker):
x = worker.fire(task(add, 1, 2))
assert pytest.wait_for(x, "result")
assert x.result
assert x.value == 3
|