1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151
|
"""
Unit test library
"""
import os
import socket
import sys
import time
from tempfile import mkstemp
from tempfile import TemporaryFile, NamedTemporaryFile, TemporaryDirectory
try:
import configparser
except ImportError:
import ConfigParser as configparser
from io import BytesIO, StringIO
__all__ = ['HOSTNAME', 'load_cfg', 'make_temp_filename', 'make_temp_file',
'make_temp_dir', 'CLI_main']
# Get machine short hostname
HOSTNAME = socket.gethostname().split('.', 1)[0]
class TBytesIO(BytesIO):
"""Standard stream of in memory bytes for testing purpose."""
def __init__(self, initial_bytes=None):
if initial_bytes and type(initial_bytes) is not bytes:
initial_bytes = initial_bytes.encode()
BytesIO.__init__(self, initial_bytes)
def write(self, s):
BytesIO.write(self, s.encode())
def isatty(self):
return False
def load_cfg(name):
"""Load test configuration file as a new ConfigParser"""
cfgparser = configparser.ConfigParser()
cfgparser.read([ \
os.path.expanduser('~/.clustershell/tests/%s' % name),
'/etc/clustershell/tests/%s' % name])
return cfgparser
#
# Temp files and directories
#
def make_temp_filename(suffix=''):
"""Return a temporary name for a file."""
if len(suffix) > 0 and suffix[0] != '-':
suffix = '-' + suffix
fd, name = mkstemp(suffix, prefix='cs-test-')
os.close(fd) # don't leak open fd
return name
def make_temp_file(text, suffix='', dir=None):
"""Create a temporary file with the provided text."""
assert type(text) is bytes
tmp = NamedTemporaryFile(prefix='cs-test-',
suffix=suffix, dir=dir)
tmp.write(text)
tmp.flush()
return tmp
def make_temp_dir(suffix=''):
"""Create a temporary directory."""
if len(suffix) > 0 and suffix[0] != '-':
suffix = '-' + suffix
return TemporaryDirectory(suffix, prefix='cs-test-')
#
# CLI tests
#
def CLI_main(test, main, args, stdin, expected_stdout, expected_rc=0,
expected_stderr=None):
"""Generic CLI main() direct calling function that allows code coverage
checks."""
rc = -1
saved_stdin = sys.stdin
saved_stdout = sys.stdout
saved_stderr = sys.stderr
# Capture standard streams
# Input: if defined, the stdin argument specifies input data
if stdin is not None:
if type(stdin) is bytes:
# Use temporary file in Python 2 or with buffer (bytes) in Python 3
sys.stdin = TemporaryFile()
sys.stdin.write(stdin)
sys.stdin.seek(0) # ready to be read
else:
# If stdin is a string in Python 3, use StringIO as sys.stdin
# should be read in text mode for some tests (eg. Nodeset).
sys.stdin = StringIO(stdin)
# Output: ClusterShell writes to stdout/stderr using strings, but the tests
# expect bytes. TBytesIO is a wrapper that does the conversion until we
# migrate all tests to string.
sys.stdout = out = TBytesIO()
sys.stderr = err = TBytesIO()
sys.argv = args
try:
main()
except SystemExit as exc:
rc = int(str(exc))
finally:
sys.stdout = saved_stdout
sys.stderr = saved_stderr
# close temporary file if we used one for stdin
if saved_stdin != sys.stdin:
sys.stdin.close()
sys.stdin = saved_stdin
try:
if expected_stdout is not None:
# expected_stdout might be a compiled regexp or a string
try:
if not expected_stdout.search(out.getvalue()):
# search failed; use assertEqual() to display
# expected/output
test.assertEqual(out.getvalue(), expected_stdout.pattern)
except AttributeError:
# not a regexp
test.assertEqual(out.getvalue(), expected_stdout)
if expected_stderr is not None:
# expected_stderr might be a compiled regexp or a string
try:
if not expected_stderr.match(err.getvalue()):
# match failed; use assertEqual() to display expected/output
test.assertEqual(err.getvalue(), expected_stderr.pattern)
except AttributeError:
# check the end as stderr messages are often prefixed with
# argv[0]
test.assertTrue(err.getvalue().endswith(expected_stderr),
err.getvalue() + b' != ' + expected_stderr)
if expected_rc is not None:
test.assertEqual(rc, expected_rc,
"rc=%d err=%s" % (rc, err.getvalue()))
finally:
out.close()
err.close()
|