File: test_subprocess.py

package info (click to toggle)
qpid-proton 0.37.0-7
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 18,384 kB
  • sloc: ansic: 37,828; cpp: 37,140; python: 15,302; ruby: 6,018; xml: 477; sh: 320; pascal: 52; makefile: 18
file content (122 lines) | stat: -rw-r--r-- 4,462 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License
#

# Extends the subprocess module to use runtime checkers, and report stderr output.

import os
import re
import subprocess
import tempfile
from subprocess import PIPE


def in_path(name):
    """Look for name in the PATH"""
    for path in os.environ["PATH"].split(os.pathsep):
        f = os.path.join(path, name)
        if os.path.isfile(f) and os.access(f, os.X_OK):
            return f


class TestProcessError(Exception):
    def __init__(self, proc, what, output=None):
        self.output = output
        sep = "\n%s stderr(%s) %s\n" % ("_" * 32, proc.pid, "_" * 32)
        error = sep + proc.error + sep if proc.error else ""
        super(TestProcessError, self).__init__("%s pid=%s exit=%s: %s%s" % (
            proc.cmd, proc.pid, proc.returncode, what, error))


class Popen(subprocess.Popen):
    """
    Add TEST_EXE_PREFIX to the command, check stderr for runtime checker output.
    In a 'with' statement it runs check_wait() on exit from the block, or
    check_kill() if initialized with kill_me=True
    """

    def __init__(self, *args, **kwargs):
        """
        Takes all args and kwargs of subprocess.Popen except stdout, stderr, universal_newlines
        kill_me=True runs check_kill() in __exit__() instead of check_wait()
        """
        self.on_exit = self.check_kill if kwargs.pop('kill_me', False) else self.check_wait
        self.errfile = tempfile.NamedTemporaryFile(delete=False)
        kwargs.update({'universal_newlines': True, 'stdout': PIPE, 'stderr': self.errfile})
        prefix = os.environ.get("TEST_EXE_PREFIX")
        if prefix:
            args = [prefix.split() + args[0]] + list(args[1:])
        self.cmd = args[0]
        super(Popen, self).__init__(*args, **kwargs)

    def check_wait(self):
        if self.wait() or self.error:
            raise TestProcessError(self, "check_wait")

    def communicate(self, *args, **kwargs):
        result = super(Popen, self).communicate(*args, **kwargs)
        if self.returncode or self.error:
            raise TestProcessError(self, "check_communicate", result[0])
        return result

    def check_kill(self):
        """Raise if process has already exited, kill and raise if self.error is not empty"""
        if self.poll() is None:
            self.kill()
            self.wait()
            self.stdout.close()     # Doesn't get closed if killed
            if self.error:
                raise TestProcessError(self, "check_kill found error output")
        else:
            raise TestProcessError(self, "check_kill process not running")

    def expect(self, pattern):
        line = self.stdout.readline()
        match = re.search(pattern, line)
        if not match:
            raise TestProcessError(self, "can't find '%s' in '%s'" % (pattern, line))
        return match

    @property
    def error(self):
        """Return stderr as string, may only be used after process has terminated."""
        assert(self.poll is not None)
        if not hasattr(self, "_error"):
            self.errfile.close()  # Not auto-deleted
            with open(self.errfile.name) as f:  # Re-open to read
                self._error = f.read().strip()
            os.unlink(self.errfile.name)
        return self._error

    def __enter__(self):
        return self

    def __exit__(self, *args):
        self.on_exit()


def check_output(*args, **kwargs):
    return Popen(*args, **kwargs).communicate()[0]


class Server(Popen):
    """A process that prints 'listening on <port>' to stdout"""

    def __init__(self, *args, **kwargs):
        super(Server, self).__init__(*args, **kwargs)
        self.port = self.expect("listening on ([0-9]+)$").group(1)