1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218
|
import os
import re
import sys
import time
import filecmp
import difflib
import traceback
import gevent
try:
from cStringIO import StringIO
except ImportError:
from StringIO import StringIO
from lib.colorer import Colorer
from lib.utils import check_valgrind_log, print_tail_n
color_stdout = Colorer()
class TestRunGreenlet(gevent.Greenlet):
def __init__(self, green_callable, *args, **kwargs):
self.callable = green_callable
self.callable_args = args
self.callable_kwargs = kwargs
super(TestRunGreenlet, self).__init__()
def _run(self, *args, **kwargs):
self.callable(*self.callable_args, **self.callable_kwargs)
def __repr__(self):
return "<TestRunGreenlet at %s info='%s'>" % (hex(id(self)), getattr(self, "info", None))
class FilteredStream:
"""Helper class to filter .result file output"""
def __init__(self, filename):
#
# always open the output stream in line-buffered mode,
# to see partial results of a failed test
#
self.stream = open(filename, "w+", 1)
self.filters = []
self.inspector = None
def write(self, fragment):
"""Apply all filters, then write result to the undelrying stream.
Do line-oriented filtering: the fragment doesn't have to represent
just one line."""
fragment_stream = StringIO(fragment)
skipped = False
for line in fragment_stream:
original_len = len(line.strip())
for pattern, replacement in self.filters:
line = re.sub(pattern, replacement, line)
# don't write lines that are completely filtered out:
skipped = original_len and not line.strip()
if skipped:
break
if not skipped:
self.stream.write(line)
def push_filter(self, pattern, replacement):
self.filters.append([pattern, replacement])
def pop_filter(self):
self.filters.pop()
def clear_all_filters(self):
self.filters = []
def close(self):
self.clear_all_filters()
self.stream.close()
def flush(self):
self.stream.flush()
class Test:
"""An individual test file. A test object can run itself
and remembers completion state of the run.
If file <test_name>.skipcond is exists it will be executed before
test and if it sets self.skip to True value the test will be skipped.
"""
rg = re.compile('\.test.*')
def __init__(self, name, args, suite_ini, params={}, conf_name=None):
"""Initialize test properties: path to test file, path to
temporary result file, path to the client program, test status."""
self.name = name
self.args = args
self.suite_ini = suite_ini
self.result = os.path.join(suite_ini['suite'],
os.path.basename(self.rg.sub('.result', name)))
self.skip_cond = os.path.join(suite_ini['suite'],
os.path.basename(self.rg.sub('.skipcond', name)))
self.tmp_result = os.path.join(self.args.vardir,
os.path.basename(self.result))
self.reject = self.rg.sub('.reject', name)
self.is_executed = False
self.is_executed_ok = None
self.is_equal_result = None
self.is_valgrind_clean = True
self.is_terminated = False
self.run_params = params
self.conf_name = conf_name
def passed(self):
"""Return true if this test was run successfully."""
return self.is_executed and self.is_executed_ok and self.is_equal_result
def execute(self, server):
pass
def run(self, server):
"""Execute the test assuming it's a python program.
If the test aborts, print its output to stdout, and raise
an exception. Else, comprare result and reject files.
If there is a difference, print it to stdout and raise an
exception. The exception is raised only if is_force flag is
not set."""
diagnostics = "unknown"
save_stdout = sys.stdout
try:
self.skip = False
if os.path.exists(self.skip_cond):
sys.stdout = FilteredStream(self.tmp_result)
stdout_fileno = sys.stdout.stream.fileno()
execfile(self.skip_cond, dict(locals(), **server.__dict__))
sys.stdout.close()
sys.stdout = save_stdout
if not self.skip:
sys.stdout = FilteredStream(self.tmp_result)
stdout_fileno = sys.stdout.stream.fileno()
self.execute(server)
sys.stdout.flush()
self.is_executed_ok = True
except Exception as e:
traceback.print_exc(e)
diagnostics = str(e)
finally:
if sys.stdout and sys.stdout != save_stdout:
sys.stdout.close()
sys.stdout = save_stdout
self.is_executed = True
sys.stdout.flush()
if not self.skip:
if self.is_executed_ok and os.path.isfile(self.result):
self.is_equal_result = filecmp.cmp(self.result, self.tmp_result)
else:
self.is_equal_result = 1
if self.args.valgrind:
self.is_valgrind_clean = (check_valgrind_log(server.valgrind_log) == False)
if self.skip:
color_stdout("[ skip ]\n", schema='test_skip')
if os.path.exists(self.tmp_result):
os.remove(self.tmp_result)
elif self.is_executed_ok and self.is_equal_result and self.is_valgrind_clean:
color_stdout("[ pass ]\n", schema='test_pass')
if os.path.exists(self.tmp_result):
os.remove(self.tmp_result)
elif (self.is_executed_ok and not self.is_equal_result and not
os.path.isfile(self.result)):
os.rename(self.tmp_result, self.result)
color_stdout("[ new ]\n", schema='test_new')
else:
os.rename(self.tmp_result, self.reject)
color_stdout("[ fail ]\n", schema='test_fail')
where = ""
if not self.is_executed_ok:
self.print_diagnostics(self.reject, "Test failed! Last 10 lines of the result file:\n")
server.print_log(15)
where = ": test execution aborted, reason '{0}'".format(diagnostics)
elif not self.is_equal_result:
self.print_unidiff()
server.print_log(15)
where = ": wrong test output"
elif not self.is_valgrind_clean:
os.remove(self.reject)
self.print_diagnostics(server.valgrind_log, "Test failed! Last 10 lines of valgrind.log:\n")
where = ": there were warnings in valgrind.log"
if not self.args.is_force:
# gh-1026
# stop and cleanup tarantool instance for incorrect tests
server.stop()
server.cleanup()
raise RuntimeError("Failed to run test " + self.name + where)
def print_diagnostics(self, logfile, message):
"""Print 10 lines of client program output leading to test
failure. Used to diagnose a failure of the client program"""
color_stdout(message, schema='error')
print_tail_n(logfile, 10)
def print_unidiff(self):
"""Print a unified diff between .test and .result files. Used
to establish the cause of a failure when .test differs
from .result."""
color_stdout("\nTest failed! Result content mismatch:\n", schema='error')
with open(self.result, "r") as result:
with open(self.reject, "r") as reject:
result_time = time.ctime(os.stat(self.result).st_mtime)
reject_time = time.ctime(os.stat(self.reject).st_mtime)
diff = difflib.unified_diff(result.readlines(),
reject.readlines(),
self.result,
self.reject,
result_time,
reject_time)
color_stdout.writeout_unidiff(diff)
|