1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
|
#!/usr/bin/env python
import pytest
if pytest.PLATFORM == "win32":
pytest.skip("Unsupported Platform")
from circuits import Manager
from circuits.net.sockets import Pipe
from circuits.core.pollers import Select
from circuits.net.events import close, write
from .client import Client
def pytest_generate_tests(metafunc):
metafunc.addcall(funcargs={"Poller": Select})
def test_pipe(Poller):
m = Manager() + Poller()
a, b = Pipe("a", "b")
a.register(m)
b.register(m)
a = Client(channel=a.channel).register(m)
b = Client(channel=b.channel).register(m)
m.start()
try:
assert pytest.wait_for(a, "ready")
assert pytest.wait_for(b, "ready")
a.fire(write(b"foo"))
assert pytest.wait_for(b, "data", b"foo")
b.fire(write(b"foo"))
assert pytest.wait_for(a, "data", b"foo")
a.fire(close())
assert pytest.wait_for(a, "disconnected")
b.fire(close())
assert pytest.wait_for(b, "disconnected")
finally:
m.stop()
|