1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104
|
import os
import sys
import unittest
this_dir = os.path.dirname(os.path.abspath(__file__))
test_script = os.path.join(
this_dir,
'fixtures',
'execute_process',
'stdout_stderr_ordering.py')
python = sys.executable
def convert_file_linesep_with_pty_linesep(string):
return string.replace(b"""
""", b"\r\n")
@unittest.skipIf(sys.platform.startswith("win"), "Windows not supported")
class TestProcessUtilsExecuteNoPty(unittest.TestCase):
def test__execute_process_pty_combined_unbuffered(self):
from osrf_pycommon.process_utils import execute_process_pty
exc_pty = execute_process_pty._execute_process_pty
# Test ordering with stdout and stderr combined and Python unbuffered
cmd = [python, "-u", test_script]
result = b""
for out, err, ret in exc_pty(cmd, None, None, False, True):
if out is not None:
result += out
if err is not None:
result += err
if ret is not None:
break
expected = convert_file_linesep_with_pty_linesep(b"""\
out 1
err 1
out 2
""")
self.assertEqual(expected, result)
def test__execute_process_pty_unbuffered(self):
from osrf_pycommon.process_utils import execute_process_pty
exc_pty = execute_process_pty._execute_process_pty
# Test ordering with stdout and stderr combined and Python unbuffered
cmd = [python, "-u", test_script]
result = b""
for out, err, ret in exc_pty(cmd, None, None, False, False):
if out is not None:
result += out
if err is not None:
result += err
if ret is not None:
break
expected = convert_file_linesep_with_pty_linesep(b"""\
out 1
err 1
out 2
""")
self.assertEqual(expected, result)
def test__execute_process_pty_combined(self):
from osrf_pycommon.process_utils import execute_process_pty
exc_pty = execute_process_pty._execute_process_pty
# Test ordering with stdout and stderr combined
cmd = [python, test_script]
result = b""
for out, err, ret in exc_pty(cmd, None, None, False, True):
if out is not None:
result += out
if err is not None:
result += err
if ret is not None:
break
expected = convert_file_linesep_with_pty_linesep(b"""\
out 1
err 1
out 2
""")
self.assertEqual(expected, result)
def test__execute_process_pty(self):
from osrf_pycommon.process_utils import execute_process_pty
exc_pty = execute_process_pty._execute_process_pty
# Test ordering with stdout and stderr separate
cmd = [python, test_script]
result = b""
for out, err, ret in exc_pty(cmd, None, None, False, False):
if out is not None:
result += out
if err is not None:
result += err
if ret is not None:
break
expected = convert_file_linesep_with_pty_linesep(b"""\
out 1
err 1
out 2
""")
self.assertEqual(expected, result)
|