1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81
|
import sys
import time
import eventlet
from eventlet.green import subprocess
import eventlet.patcher
import tests
original_subprocess = eventlet.patcher.original('subprocess')
def test_subprocess_wait():
# https://bitbucket.org/eventlet/eventlet/issue/89
# In Python 3.3 subprocess.Popen.wait() method acquired `timeout`
# argument.
# RHEL backported it to their Python 2.6 package.
cmd = [sys.executable, "-c", "import time; time.sleep(0.5)"]
p = subprocess.Popen(cmd)
ok = False
t1 = time.time()
try:
p.wait(timeout=0.1)
except subprocess.TimeoutExpired as e:
str(e) # make sure it doesn't throw
assert e.cmd == cmd
assert e.timeout == 0.1
ok = True
tdiff = time.time() - t1
assert ok, 'did not raise subprocess.TimeoutExpired'
assert 0.1 <= tdiff <= 0.2, 'did not stop within allowed time'
def test_close_popen_stdin_with_close_fds():
p = subprocess.Popen(
['ls'],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
close_fds=True,
shell=False,
cwd=None,
env=None)
p.communicate(None)
try:
p.stdin.close()
except Exception as e:
assert False, "Exception should not be raised, got %r instead" % e
def test_universal_lines():
p = subprocess.Popen(
[sys.executable, '--version'],
shell=False,
stdout=subprocess.PIPE,
universal_newlines=True)
p.communicate(None)
def test_patched_communicate_290():
# https://github.com/eventlet/eventlet/issues/290
# Certain order of import and monkey_patch breaks subprocess communicate()
# with AttributeError module `select` has no `poll` on Linux
# unpatched methods are removed for safety reasons in commit f63165c0e3
tests.run_isolated('subprocess_patched_communicate.py')
def test_check_call_without_timeout_works():
# There was a regression that'd result in the following exception:
# TypeError: check_call() missing 1 required keyword-only argument: 'timeout'
subprocess.check_call(
['ls'],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
def test_exception_identity():
# https://github.com/eventlet/eventlet/issues/413
# green module must keep exceptions classes as stdlib version
tests.run_isolated('subprocess_exception_identity.py')
|