File: runtests.py

package info (click to toggle)
pgdbf 0.6.3%2Bgit20180121.4e84775-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 1,040 kB
  • sloc: sh: 3,884; ansic: 1,099; python: 198; makefile: 35
file content (209 lines) | stat: -rwxr-xr-x 5,586 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
#!/usr/bin/env python

# pylint: disable=superfluous-parens

"""Run a suite of PgDBF test cases"""

import argparse
from glob import glob
from hashlib import md5
from json import load
from logging import basicConfig, getLogger, DEBUG, INFO
from os import chdir, getcwd
from os.path import abspath, join, split
from subprocess import Popen, PIPE, STDOUT

LOGGER = getLogger('')

class TestError(ValueError):
    """A test failed"""


def check_head(expected):
    """Check that the start of the file is as expected"""

    LOGGER.debug('opened a header check')
    body = bytes()
    length = len(expected)
    while True:
        data = yield()
        if not data:
            raise ValueError({'error': 'short read', 'expected': length, 'actual': len(body)})
        body += data
        if len(body) >= length:
            actual = body[:length].decode()
            if expected != actual:
                raise TestError('unequal head', expected, actual)
            LOGGER.info('passed the header check')
            break

    while True:
        data = yield()
        if data is None:
            LOGGER.debug('closed the header check')
            return


def check_length(expected):
    """Check that the file has the expected length"""

    LOGGER.debug('opened a length check')
    actual = 0
    while True:
        data = yield()
        if not data:
            if expected != actual:
                raise TestError('incorrect length', expected, actual)
            LOGGER.info('passed the length check')
            LOGGER.debug('closed the length check')
            return
        actual += len(data)


def check_md5(expected):
    """Check that the file has the expected MD5 hash"""

    LOGGER.debug('opened an md5 check')
    hasher = md5()
    while True:
        data = yield()
        if data is None:
            actual = hasher.hexdigest()
            if expected != actual:
                raise TestError('bad md5 hash', expected, actual)
            LOGGER.info('passed the md5 check')
            LOGGER.debug('closed the md5 check')
            return
        hasher.update(data)


def check_tail(expected):
    """Check that the end of the file is as expected"""

    LOGGER.debug('opened a tail check')
    actual = bytes()
    length = len(expected)
    while True:
        data = yield()
        if data is None:
            if expected != actual.decode():
                raise TestError('incorrect tail', actual, expected)
            LOGGER.info('passed the tail check')
            LOGGER.debug('closed the tail check')
            return
        actual = (actual + data)[-length:]


def build_tests(config):
    """Build a list of tests from the test case config"""

    tests = []
    for key, value in config.items():
        try:
            test_func = {
                'head': check_head,
                'length': check_length,
                'md5': check_md5,
                'tail': check_tail,
            }[key]
        except KeyError:
            pass
        else:
            test = test_func(value)
            next(test)
            tests.append(test)

    if not tests:
        raise ValueError('No tests are configured')

    return tests


def handle_exception(exc):
    """Print information about a failed test case"""

    print("""\
    Failure : {0.args[0]}
    Expected: {0.args[1]!r}
    Actual  : {0.args[2]!r}
    """.format(exc))


def run_test(pgdbf_path, config):
    """Run a test case with the given pgdbf executable"""

    tests = build_tests(config)

    args = config['cmd_args']
    if not isinstance(args, list):
        args = [args]

    args.insert(0, pgdbf_path)
    LOGGER.debug('running %s', args)
    command = Popen(args, stdout=PIPE, stderr=STDOUT)
    while True:
        chunk = command.stdout.read(128 * 1024)
        if not chunk:
            break
        for test in tests:
            try:
                test.send(chunk)
            except TestError as exc:
                handle_exception(exc)

    for test in tests:
        try:
            test.send(None)
        except StopIteration:
            pass
        except TestError as exc:
            handle_exception(exc)
        else:
            raise ValueError('test {} did not close cleanly'.format(test))


def handle_command_line():
    """Evaluate the command line arguments and run tests"""

    parser = argparse.ArgumentParser(description=__doc__)
    parser.add_argument('--pgdbf', '-p', help='Path to the pgdbf executable')
    parser.add_argument('--verbose', '-v', action='count', default=0,
                        help='Increase debugging verbosity')
    parser.add_argument(
        'testcase', nargs='*',
        help='The name of one or more test case files. If given, only run these cases.')

    args = parser.parse_args()

    if args.verbose >= 2:
        basicConfig(level=DEBUG)
    elif args.verbose == 1:
        basicConfig(level=INFO)
    else:
        basicConfig()

    orig_dir = getcwd()
    if args.pgdbf:
        pgdbf_path = abspath(args.pgdbf)
    else:
        pgdbf_path = 'pgdbf'

    if args.testcase:
        cases = args.testcase
    else:
        cases = []
        for test_dir in ('cases', 'privatecases'):
            cases.extend(glob(join(test_dir, '*.json')))

    for case in cases:
        test_dir, test_name = split(case)
        if test_dir:
            chdir(test_dir)
        print('Running {}'.format(case))
        with open(test_name) as infile:
            run_test(pgdbf_path, load(infile))
        chdir(orig_dir)


if __name__ == '__main__':
    handle_command_line()