File: testlib.py

package info (click to toggle)
apparmor 4.1.3-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 30,096 kB
  • sloc: ansic: 24,943; python: 24,914; cpp: 9,074; sh: 8,166; yacc: 2,061; makefile: 1,923; lex: 1,215; pascal: 1,147; perl: 1,033; ruby: 365; lisp: 282; exp: 250; java: 212; xml: 159
file content (205 lines) | stat: -rw-r--r-- 6,879 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
#!/usr/bin/env python3
# ------------------------------------------------------------------
#
#   Copyright (C) 2013 Canonical Ltd.
#   Author: Steve Beattie <steve@nxnw.org>
#
#   This program is free software; you can redistribute it and/or
#   modify it under the terms of version 2 of the GNU General Public
#   License published by the Free Software Foundation.
#
# ------------------------------------------------------------------

import os
import shutil
import signal
import subprocess
import tempfile
import time
import unittest

TIMEOUT_ERROR_CODE = 152
DEFAULT_PARSER = '../apparmor_parser'


# http://www.chiark.greenend.org.uk/ucgi/~cjwatson/blosxom/2009-07-02-python-sigpipe.html
# This is needed so that the subprocesses that produce endless output
# actually quit when the reader goes away.
def subprocess_setup():
    # Python installs a SIGPIPE handler by default. This is usually not
    # what non-Python subprocesses expect.
    signal.signal(signal.SIGPIPE, signal.SIG_DFL)


class AANoCleanupMetaClass(type):
    def __new__(cls, name, bases, attrs):

        for attr_name, attr_value in attrs.items():
            if attr_name.startswith("test_"):
                attrs[attr_name] = cls.keep_on_fail(attr_value)
        return super(AANoCleanupMetaClass, cls).__new__(cls, name, bases, attrs)

    @classmethod
    def keep_on_fail(cls, unittest_func):
        """wrapping function for unittest testcases to detect failure
           and leave behind test files in tearDown(); to be used as
           a decorator"""

        def new_unittest_func(self):
            try:
                return unittest_func(self)
            except unittest.SkipTest:
                raise
            except Exception:
                self.do_cleanup = False
                raise

        return new_unittest_func


class AATestTemplate(unittest.TestCase, metaclass=AANoCleanupMetaClass):
    """Stub class for use by test scripts"""
    debug = False
    do_cleanup = True

    def run_cmd_check(self, command, input=None, stderr=subprocess.STDOUT, stdout=subprocess.PIPE,
                      stdin=None, timeout=120, expected_rc=0, expected_string=None):
        """Wrapper around run_cmd that checks the rc code against
           expected_rc and for expected strings in the output if
           passed. The valgrind tests generally don't care what the
           rc is as long as it's not a specific set of return codes,
           so can't push the check directly into run_cmd()."""
        rc, report = self.run_cmd(command, input, stderr, stdout, stdin, timeout)
        self.assertEqual(rc, expected_rc, "Got return code {}, expected {}\nCommand run: {}\nOutput: {}".format(rc, expected_rc, ' '.join(command), report))
        if expected_string:
            self.assertIn(expected_string, report, 'Expected message "{}", got: \n{}'.format(expected_string, report))
        return report

    def run_cmd(self, command, input=None, stderr=subprocess.PIPE, stdout=subprocess.PIPE,
                stdin=None, timeout=120):
        """Try to execute given command (array) and return its stdout, or
           return a textual error if it failed."""

        if self.debug:
            print("\n===> Running command: '{}'".format(' '.join(command)))

        (rc, out, outerr) = self._run_cmd(command, input, stderr, stdout, stdin, timeout)
        report = out + outerr

        return rc, report

    def _run_cmd(self, command, input=None, stderr=subprocess.PIPE, stdout=subprocess.PIPE,
                 stdin=None, timeout=120):
        """Try to execute given command (array) and return its rc, stdout, and stderr as a tuple"""

        try:
            sp = subprocess.Popen(command, stdin=stdin, stdout=stdout, stderr=stderr,
                                  close_fds=True, preexec_fn=subprocess_setup, universal_newlines=True)
        except OSError as e:
            return 127, str(e), ''

        timeout_communicate = TimeoutFunction(sp.communicate, timeout)
        out, outerr = (None, None)
        try:
            out, outerr = timeout_communicate(input)
            rc = sp.returncode
        except TimeoutFunctionException:
            sp.terminate()
            outerr = 'test timed out, killed'
            rc = TIMEOUT_ERROR_CODE

        # Handle redirection of stdout
        if out is None:
            out = ''
        # Handle redirection of stderr
        if outerr is None:
            outerr = ''

        return rc, out, outerr


# Timeout handler using alarm() from John P. Speno's Pythonic Avocado
class TimeoutFunctionException(Exception):
    """Exception to raise on a timeout"""


class TimeoutFunction:
    def __init__(self, function, timeout):
        self.timeout = timeout
        self.function = function

    def handle_timeout(self, signum, frame):
        raise TimeoutFunctionException()

    def __call__(self, *args, **kwargs):
        old = signal.signal(signal.SIGALRM, self.handle_timeout)
        signal.alarm(self.timeout)
        try:
            result = self.function(*args, **kwargs)
        finally:
            signal.signal(signal.SIGALRM, old)
        signal.alarm(0)
        return result


def filesystem_time_resolution():
    """detect whether the filesystem stores subsecond timestamps"""

    default_diff = 0.1
    result = (True, default_diff)

    tmp_dir = tempfile.mkdtemp(prefix='aa-caching-nanostamp-')
    try:
        last_stamp = None
        for i in range(10):
            s = None

            with open(os.path.join(tmp_dir, 'test.{}'.format(i)), 'w+') as f:
                s = os.fstat(f.fileno())

            if (s.st_mtime == last_stamp):
                print('\n===> WARNING: TMPDIR lacks subsecond timestamp resolution, falling back to slower test')
                result = (False, 1.0)
                break

            last_stamp = s.st_mtime
            time.sleep(default_diff)
    except Exception:
        pass
    finally:
        if os.path.exists(tmp_dir):
            shutil.rmtree(tmp_dir)

    return result


def read_features_dir(path):

    result = ''
    if not os.path.exists(path) or not os.path.isdir(path):
        return result

    for name in sorted(os.listdir(path)):
        entry = os.path.join(path, name)
        result += name + ' {'
        if os.path.isfile(entry):
            with open(entry, 'r') as f:
                # don't need extra '\n' here as features file contains it
                result += f.read()
        elif os.path.isdir(entry):
            result += read_features_dir(entry)
        result += '}\n'

    return result


def touch(path):
    return os.utime(path, None)


def write_file(directory, file, contents):
    """construct path, write contents to it, and return the constructed path"""
    path = os.path.join(directory, file)
    with open(path, 'w+') as f:
        f.write(contents)
    return path