File: test_process.py

package info (click to toggle)
circuits 3.2.3-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 2,980 kB
  • sloc: python: 17,583; javascript: 3,226; makefile: 100
file content (68 lines) | stat: -rw-r--r-- 1,580 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
#!/usr/bin/env python
import pytest

from circuits.io import Process, write


if pytest.PLATFORM == 'win32':
    pytest.skip('Unsupported Platform')


def test(manager, watcher):
    p = Process(['echo', 'Hello World!']).register(manager)
    assert watcher.wait('registered')

    p.start()
    assert watcher.wait('started', p.channel)

    assert watcher.wait('terminated', p.channel)

    s = p.stdout.getvalue()
    assert s == b'Hello World!\n'


def test2(manager, watcher, tmpdir):
    foo = tmpdir.ensure('foo.txt')

    p = Process([f'cat - > {foo!s:s}'], shell=True).register(manager)
    assert watcher.wait('registered')

    p.start()
    assert watcher.wait('started', p.channel)

    p.fire(write('Hello World!'), p._stdin)
    assert watcher.wait('write', p._stdin)

    p.stop()

    assert watcher.wait('eof', p._stdout.channel)

    with foo.open('r') as f:
        assert f.read() == 'Hello World!'


def test_two_procs(manager, watcher):
    p1 = Process(['echo', '1']).register(manager)
    p2 = Process('echo 2 ; sleep 1', shell=True).register(manager)

    p1.start()
    p2.start()

    assert watcher.wait('terminated', p1.channel)
    assert p1._terminated
    assert not p2._terminated
    assert not p2._stdout_closed
    assert not p2._stderr_closed

    watcher.clear()  # Get rid of first terminated()

    s1 = p1.stdout.getvalue()
    assert s1 == b'1\n'

    assert watcher.wait('terminated', p2.channel)
    assert p2._terminated
    assert p2._stdout_closed
    assert p2._stderr_closed

    s2 = p2.stdout.getvalue()
    assert s2 == b'2\n'