File: test_pipe.py

package info (click to toggle)
circuits 3.2.3-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 2,980 kB
  • sloc: python: 17,583; javascript: 3,226; makefile: 100
file content (47 lines) | stat: -rw-r--r-- 1,058 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
#!/usr/bin/env python
import pytest

from circuits import Manager
from circuits.core.pollers import Select
from circuits.net.events import close, write
from circuits.net.sockets import Pipe

from .client import Client


pytestmark = pytest.mark.skipif(pytest.PLATFORM == 'win32', reason='Unsupported Platform')


def pytest_generate_tests(metafunc):
    metafunc.parametrize('Poller', [Select])


def test_pipe(Poller):
    m = Manager() + Poller()

    a, b = Pipe('a', 'b')
    a.register(m)
    b.register(m)

    a = Client(channel=a.channel).register(m)
    b = Client(channel=b.channel).register(m)

    m.start()

    try:
        assert pytest.wait_for(a, 'ready')
        assert pytest.wait_for(b, 'ready')

        a.fire(write(b'foo'))
        assert pytest.wait_for(b, 'data', b'foo')

        b.fire(write(b'foo'))
        assert pytest.wait_for(a, 'data', b'foo')

        a.fire(close())
        assert pytest.wait_for(a, 'disconnected')

        b.fire(close())
        assert pytest.wait_for(b, 'disconnected')
    finally:
        m.stop()