File: test.py

package info (click to toggle)
tarantool 1.7.2.385.g952d79e-1
  • links: PTS, VCS
  • area: main
  • in suites: stretch
  • size: 21,556 kB
  • ctags: 28,405
  • sloc: ansic: 180,313; cpp: 26,044; sh: 15,513; python: 4,893; makefile: 1,412
file content (218 lines) | stat: -rw-r--r-- 8,351 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
import os
import re
import sys
import time
import filecmp
import difflib
import traceback
import gevent

try:
    from cStringIO import StringIO
except ImportError:
    from StringIO import StringIO

from lib.colorer import Colorer
from lib.utils import check_valgrind_log, print_tail_n
color_stdout = Colorer()


class TestRunGreenlet(gevent.Greenlet):
    def __init__(self, green_callable, *args, **kwargs):
        self.callable = green_callable
        self.callable_args = args
        self.callable_kwargs = kwargs
        super(TestRunGreenlet, self).__init__()

    def _run(self, *args, **kwargs):
        self.callable(*self.callable_args, **self.callable_kwargs)

    def __repr__(self):
            return "<TestRunGreenlet at %s info='%s'>" % (hex(id(self)), getattr(self, "info", None))

class FilteredStream:
    """Helper class to filter .result file output"""
    def __init__(self, filename):
        #
        # always open the output stream in line-buffered mode,
        # to see partial results of a failed test
        #
        self.stream = open(filename, "w+", 1)
        self.filters = []
        self.inspector = None

    def write(self, fragment):
        """Apply all filters, then write result to the undelrying stream.
        Do line-oriented filtering: the fragment doesn't have to represent
        just one line."""
        fragment_stream = StringIO(fragment)
        skipped = False
        for line in fragment_stream:
            original_len = len(line.strip())
            for pattern, replacement in self.filters:
                line = re.sub(pattern, replacement, line)
                # don't write lines that are completely filtered out:
                skipped = original_len and not line.strip()
                if skipped:
                    break
            if not skipped:
                self.stream.write(line)

    def push_filter(self, pattern, replacement):
        self.filters.append([pattern, replacement])

    def pop_filter(self):
        self.filters.pop()

    def clear_all_filters(self):
        self.filters = []

    def close(self):
        self.clear_all_filters()
        self.stream.close()

    def flush(self):
        self.stream.flush()


class Test:
    """An individual test file. A test object can run itself
    and remembers completion state of the run.

    If file <test_name>.skipcond is exists it will be executed before
    test and if it sets self.skip to True value the test will be skipped.
    """
    rg = re.compile('\.test.*')

    def __init__(self, name, args, suite_ini, params={}, conf_name=None):
        """Initialize test properties: path to test file, path to
        temporary result file, path to the client program, test status."""
        self.name = name
        self.args = args
        self.suite_ini = suite_ini
        self.result = os.path.join(suite_ini['suite'],
                os.path.basename(self.rg.sub('.result', name)))
        self.skip_cond = os.path.join(suite_ini['suite'],
                os.path.basename(self.rg.sub('.skipcond', name)))
        self.tmp_result = os.path.join(self.args.vardir,
                                       os.path.basename(self.result))
        self.reject = self.rg.sub('.reject', name)
        self.is_executed = False
        self.is_executed_ok = None
        self.is_equal_result = None
        self.is_valgrind_clean = True
        self.is_terminated = False
        self.run_params = params
        self.conf_name = conf_name

    def passed(self):
        """Return true if this test was run successfully."""
        return self.is_executed and self.is_executed_ok and self.is_equal_result

    def execute(self, server):
        pass

    def run(self, server):
        """Execute the test assuming it's a python program.
        If the test aborts, print its output to stdout, and raise
        an exception. Else, comprare result and reject files.
        If there is a difference, print it to stdout and raise an
        exception. The exception is raised only if is_force flag is
        not set."""
        diagnostics = "unknown"
        save_stdout = sys.stdout
        try:
            self.skip = False
            if os.path.exists(self.skip_cond):
                sys.stdout = FilteredStream(self.tmp_result)
                stdout_fileno = sys.stdout.stream.fileno()
                execfile(self.skip_cond, dict(locals(), **server.__dict__))
                sys.stdout.close()
                sys.stdout = save_stdout
            if not self.skip:
                sys.stdout = FilteredStream(self.tmp_result)
                stdout_fileno = sys.stdout.stream.fileno()
                self.execute(server)
                sys.stdout.flush()
            self.is_executed_ok = True
        except Exception as e:
            traceback.print_exc(e)
            diagnostics = str(e)
        finally:
            if sys.stdout and sys.stdout != save_stdout:
                sys.stdout.close()
            sys.stdout = save_stdout
        self.is_executed = True
        sys.stdout.flush()

        if not self.skip:
            if self.is_executed_ok and os.path.isfile(self.result):
                self.is_equal_result = filecmp.cmp(self.result, self.tmp_result)
        else:
            self.is_equal_result = 1

        if self.args.valgrind:
            self.is_valgrind_clean = (check_valgrind_log(server.valgrind_log) == False)

        if self.skip:
            color_stdout("[ skip ]\n", schema='test_skip')
            if os.path.exists(self.tmp_result):
                os.remove(self.tmp_result)
        elif self.is_executed_ok and self.is_equal_result and self.is_valgrind_clean:
            color_stdout("[ pass ]\n", schema='test_pass')
            if os.path.exists(self.tmp_result):
                os.remove(self.tmp_result)
        elif (self.is_executed_ok and not self.is_equal_result and not
              os.path.isfile(self.result)):
            os.rename(self.tmp_result, self.result)
            color_stdout("[ new ]\n", schema='test_new')
        else:
            os.rename(self.tmp_result, self.reject)
            color_stdout("[ fail ]\n", schema='test_fail')

            where = ""
            if not self.is_executed_ok:
                self.print_diagnostics(self.reject, "Test failed! Last 10 lines of the result file:\n")
                server.print_log(15)
                where = ": test execution aborted, reason '{0}'".format(diagnostics)
            elif not self.is_equal_result:
                self.print_unidiff()
                server.print_log(15)
                where = ": wrong test output"
            elif not self.is_valgrind_clean:
                os.remove(self.reject)
                self.print_diagnostics(server.valgrind_log, "Test failed! Last 10 lines of valgrind.log:\n")
                where = ": there were warnings in valgrind.log"

            if not self.args.is_force:
                # gh-1026
                # stop and cleanup tarantool instance for incorrect tests
                server.stop()
                server.cleanup()
                raise RuntimeError("Failed to run test " + self.name + where)

    def print_diagnostics(self, logfile, message):
        """Print 10 lines of client program output leading to test
        failure. Used to diagnose a failure of the client program"""

        color_stdout(message, schema='error')
        print_tail_n(logfile, 10)

    def print_unidiff(self):
        """Print a unified diff between .test and .result files. Used
        to establish the cause of a failure when .test differs
        from .result."""

        color_stdout("\nTest failed! Result content mismatch:\n", schema='error')
        with open(self.result, "r") as result:
            with open(self.reject, "r") as reject:
                result_time = time.ctime(os.stat(self.result).st_mtime)
                reject_time = time.ctime(os.stat(self.reject).st_mtime)
                diff = difflib.unified_diff(result.readlines(),
                                            reject.readlines(),
                                            self.result,
                                            self.reject,
                                            result_time,
                                            reject_time)

                color_stdout.writeout_unidiff(diff)