1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
|
#!/usr/bin/python -i
import pytest
if pytest.PLATFORM == "win32":
pytest.skip("Unsupported Platform")
pytest.importorskip("multiprocessing")
from os import getpid
from circuits import Component, Event
class hello(Event):
"""hello Event"""
class App(Component):
def hello(self):
return "Hello from {0:d}".format(getpid())
def test(manager, watcher):
app = App()
process, bridge = app.start(process=True, link=manager)
assert watcher.wait("ready", timeout=30)
x = manager.fire(hello())
assert pytest.wait_for(x, "result")
assert x.value == "Hello from {0:d}".format(app.pid)
app.stop()
app.join()
bridge.unregister()
watcher.wait("unregistered")
|