File: test_utils.py

package info (click to toggle)
polyorb 2.11~20140418-3
  • links: PTS, VCS
  • area: main
  • in suites: jessie, jessie-kfreebsd
  • size: 30,012 kB
  • ctags: 465
  • sloc: ada: 273,015; sh: 4,507; makefile: 4,265; python: 1,332; cpp: 1,213; java: 507; ansic: 274; xml: 30; perl: 23; exp: 6
file content (272 lines) | stat: -rw-r--r-- 8,163 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
#!/usr/bin/env python

"""test utils

This module is imported by all testcase. It parse the command lines options
and provide some useful functions.

You should never call this module directly. To run a single testcase, use
 ./testsuite.py NAME_OF_TESTCASE
"""

from gnatpython.env import Env
from gnatpython.ex import Run, STDOUT
from gnatpython.fileutils import FileUtilsError, mkdir

from subprocess import Popen, PIPE
from time import sleep

import os
import re
import sys

POLYORB_CONF = "POLYORB_CONF"

RLIMIT = int(os.environ['RLIMIT'])
TEST_NAME = os.environ['TEST_NAME']

# Restore testsuite environment
Env().restore(os.environ['TEST_CONFIG'])

# If POLYORB_TEST_VERBOSE is set to true, then output more data
VERBOSE = Env().options.verbose  # set by testsuite.py

# Main testsuite source dir
SRC_DIR = Env().options.testsuite_src_dir

# All executable tests path are relative to PolyORB testsuite build dir
BASE_DIR = os.path.join(Env().options.build_dir, 'testsuite')

# Shared configuration files are in tests/conf
CONF_DIR = os.path.join(SRC_DIR, 'tests', 'confs')

EXE_EXT = Env().target.os.exeext

OUTPUT_FILENAME = os.path.join(Env().log_dir, TEST_NAME)

try:
    if not os.path.isdir(os.path.dirname(OUTPUT_FILENAME)):
        mkdir(os.path.dirname(OUTPUT_FILENAME))
except FileUtilsError:
    # Ignore errors, multiple tests can be run in parallel
    pass


def assert_exists(filename):
    """Assert that the given filename exists"""
    assert os.path.exists(filename), "%s not found" % filename


def terminate(handle):
    """Terminate safely a process spawned using Popen"""

    if sys.platform.startswith('win'):
        try:
            handle.terminate()
        except WindowsError:
            # We got a WindowsError exception. This might occurs when we try to
            # terminate a process that is already dead. In that case we check
            # if the process is still alive. If yes we reraise the exception.
            # Otherwise we ignore it.
            is_alive = True
            for index in (1, 2, 3):
                is_alive = handle.poll() is None
                if not is_alive:
                    break
                sleep(0.1)
            if is_alive:
                # Process is still not terminated so reraise the exception
                raise
    else:
        handle.terminate()

def get_conf_path(conf_filename):
    if not conf_filename:
        return None

    if os.path.isabs(conf_filename):
        assert_exists(conf_filename)
        return conf_filename

    for d in (CONF_DIR, os.path.join(SRC_DIR, TEST_NAME)):
        p = os.path.join (d, conf_filename)
        if os.path.exists(p):
            return p

    assert False, "%s not found" % (conf_filename)

def client_server(client_cmd, client_conf, server_cmd, server_conf):
    """Run a client server testcase

    Run server_cmd and extract the IOR string.
    Run client_cmd with the server IOR string
    Check for "END TESTS................   PASSED"
    if found return True
    """
    print "Running client %s (config=%s)\nserver %s (config=%s)" % (
        client_cmd, client_conf, server_cmd, server_conf)
    client = os.path.join(BASE_DIR, client_cmd + EXE_EXT)
    server = os.path.join(BASE_DIR, server_cmd + EXE_EXT)

    # Check that files exist
    assert_exists(client)
    assert_exists(server)

    server_env = os.environ.copy()
    if server_conf:
        server_env[POLYORB_CONF] = get_conf_path(server_conf)

    try:
        # Run the server command and retrieve the IOR string
        p_cmd_server = ['gnatpython-rlimit', str(RLIMIT), server]
        server_output_name = OUTPUT_FILENAME + '.server'
        server_handle = Popen(p_cmd_server,
	                      stdout=open(server_output_name, "wb"),
			      env=server_env)
        if server_conf:
            print 'RUN server: POLYORB_CONF=%s %s' % \
                (server_env[POLYORB_CONF], " ".join(p_cmd_server))
        else:
            print 'RUN server: %s' % " ".join(p_cmd_server)
	server_out = open(server_output_name, "r")
        while server_handle.returncode == None:
	    # Loop on readline() until we have a complete line
	    line = ""

	    while server_handle.returncode == None:
                server_handle.poll()
                line = line + server_out.readline()
                if len(line) > 0 and line[-1] in "\n\r":
                    break
                sleep(0.1)

            if "IOR:" in line:
	        try:
                  IOR_str = re.match(r".*(IOR:[a-z0-9]+)['|\n\r]",
                                   line).groups()[0]
                  break
		except:
		  print "Malformed IOR line <<%s>>" % (line)
		  raise

        if server_handle.returncode != None:
            print "server died"
            return

        server_out.close()
        # Remove eol and '
        IOR_str = IOR_str.strip()
        print IOR_str

        # Run the client with the IOR argument
        p_cmd_client = [client, IOR_str]

        if client_conf:
            client_env = os.environ.copy()
            client_env[POLYORB_CONF] = get_conf_path(client_conf)
            print 'RUN client: POLYORB_CONF=%s %s' % \
                (client_env[POLYORB_CONF], " ".join(p_cmd_client))
        else:
            client_env = None
            print "RUN client: %s" % " ".join(p_cmd_client)

        Run(make_run_cmd([client, IOR_str], Env().options.coverage),
            output=OUTPUT_FILENAME + '.client', error=STDOUT,
            timeout=RLIMIT, env=client_env)

        for elmt in [client, server]:
            if Env().options.coverage:
                run_coverage_analysis(elmt)

    except Exception, e:
        print e
    finally:
        terminate(server_handle)

    return _check_output(OUTPUT_FILENAME + '.client')


def local(cmd, config_file, args=None):
    """Run a local test

    Execute the given command.
    Check for "END TESTS................   PASSED"
    if found return True

    PARAMETERS:
        cmd: the command to execute
        config_file: to set POLYORB_CONF
        args: list of additional parameters
    """
    args = args or []
    print "Running %s %s (config=%s)" % (cmd, " ".join(args), config_file)
    if config_file:
        assert_exists(os.path.join(CONF_DIR, config_file))
    os.environ[POLYORB_CONF] = config_file

    command = os.path.join(BASE_DIR, cmd + EXE_EXT)
    assert_exists(command)

    p_cmd = [command] + args

    if VERBOSE:
        if config_file:
            print 'RUN: POLYORB_CONF=%s %s' % (config_file, " ".join(p_cmd))
        else:
            print 'RUN: %s' % " ".join(p_cmd)

    Run(make_run_cmd(p_cmd, Env().options.coverage),
        output=OUTPUT_FILENAME + 'local', error=STDOUT,
        timeout=RLIMIT)
    if Env().options.coverage:
        run_coverage_analysis(command)
    return _check_output(OUTPUT_FILENAME + 'local')


def _check_output(output_file):
    """Check that END TESTS....... PASSED is contained in the output"""
    if os.path.exists(output_file):
        test_outfile = open(output_file)
        test_out = test_outfile.read()
        test_outfile.close()

        if re.search(r"END TESTS.*PASSED", test_out):
            print "%s PASSED" % TEST_NAME
            return True
        else:
            print "%s FAILED" % TEST_NAME
            print test_out
            return False


def make_run_cmd(cmd, coverage=False):
    """Create a command line for Run in function of coverage

    Returns command and arguments list
    """
    L = []
    if coverage:
        L.extend(['xcov', '--run', '--target=i386-linux', '-o',
                  cmd[0] + '.trace', cmd[0]])
        if len(cmd) > 1:
            L.append('-eargs')
            L.extend(cmd[1:])
    else:
        L.extend(cmd)
    return L


def run_coverage_analysis(command):
    """Run xcov with appropriate arguments to retrieve coverage information

    Returns an object of type run
    """
    return Run(['xcov', '--coverage=branch', '--annotate=report',
                command + ".trace"],
               output=OUTPUT_FILENAME + '.trace', error=STDOUT,
               timeout=RLIMIT)


def fail():
    print "TEST FAILED"
    sys.exit(1)