File: subprocess_test.py

package info (click to toggle)
python-eventlet 0.40.1-2
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 3,200 kB
  • sloc: python: 25,101; sh: 78; makefile: 31
file content (81 lines) | stat: -rw-r--r-- 2,417 bytes parent folder | download | duplicates (5)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
import sys
import time

import eventlet
from eventlet.green import subprocess
import eventlet.patcher
import tests
original_subprocess = eventlet.patcher.original('subprocess')


def test_subprocess_wait():
    # https://bitbucket.org/eventlet/eventlet/issue/89
    # In Python 3.3 subprocess.Popen.wait() method acquired `timeout`
    # argument.
    # RHEL backported it to their Python 2.6 package.
    cmd = [sys.executable, "-c", "import time; time.sleep(0.5)"]
    p = subprocess.Popen(cmd)
    ok = False
    t1 = time.time()
    try:
        p.wait(timeout=0.1)
    except subprocess.TimeoutExpired as e:
        str(e)  # make sure it doesn't throw
        assert e.cmd == cmd
        assert e.timeout == 0.1
        ok = True
    tdiff = time.time() - t1
    assert ok, 'did not raise subprocess.TimeoutExpired'
    assert 0.1 <= tdiff <= 0.2, 'did not stop within allowed time'


def test_close_popen_stdin_with_close_fds():
    p = subprocess.Popen(
        ['ls'],
        stdin=subprocess.PIPE,
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,
        close_fds=True,
        shell=False,
        cwd=None,
        env=None)

    p.communicate(None)

    try:
        p.stdin.close()
    except Exception as e:
        assert False, "Exception should not be raised, got %r instead" % e


def test_universal_lines():
    p = subprocess.Popen(
        [sys.executable, '--version'],
        shell=False,
        stdout=subprocess.PIPE,
        universal_newlines=True)
    p.communicate(None)


def test_patched_communicate_290():
    # https://github.com/eventlet/eventlet/issues/290
    # Certain order of import and monkey_patch breaks subprocess communicate()
    # with AttributeError module `select` has no `poll` on Linux
    # unpatched methods are removed for safety reasons in commit f63165c0e3
    tests.run_isolated('subprocess_patched_communicate.py')


def test_check_call_without_timeout_works():
    # There was a regression that'd result in the following exception:
    # TypeError: check_call() missing 1 required keyword-only argument: 'timeout'
    subprocess.check_call(
        ['ls'],
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,
    )


def test_exception_identity():
    # https://github.com/eventlet/eventlet/issues/413
    # green module must keep exceptions classes as stdlib version
    tests.run_isolated('subprocess_exception_identity.py')