1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166
|
"""
Implements run_in_thread_with_timeout decorator for running tests that might
deadlock.
"""
from __future__ import print_function
import functools
import os
import sys
import threading
import traceback
import unittest
MODULE_PID = os.getpid()
DEFAULT_TEST_TIMEOUT = 15
def create_run_in_thread_decorator(test_timeout=None):
"""Create a decorator that will run the decorated method in a thread via
`_ThreadedTestWrapper` and return the value that is returned by the
given function, unless it exits with exception or times out, in which
case AssertionError will be raised
:param int | float | None test_timeout: maximum number of seconds to wait
for test to complete. If None, `DEFAULT_TEST_TIMEOUT` will be used.
NOTE: we handle default this way to facilitate patching of the timeout
in our self-tests.
:return: decorator
"""
def run_in_thread_with_timeout_decorator(fun):
"""Create a wrapper that will run the decorated method in a thread via
`_ThreadedTestWrapper` and return the value that is returned by the
given function, unless it exits with exception or times out, in which
case AssertionError will be raised
:param fun: function to run in thread
:return: wrapper function
"""
@functools.wraps(fun)
def run_in_thread_with_timeout_wrapper(*args, **kwargs):
"""
:param args: positional args to pass to wrapped function
:param kwargs: keyword args to pass to wrapped function
:return: value returned by the function, unless it exits with
exception or times out
:raises AssertionError: if wrapped function exits with exception or
times out
"""
runner = _ThreadedTestWrapper(
functools.partial(fun, *args, **kwargs),
test_timeout)
return runner.kick_off()
return run_in_thread_with_timeout_wrapper
return run_in_thread_with_timeout_decorator
run_in_thread_with_timeout = create_run_in_thread_decorator() # pylint: disable=C0103
class _ThreadedTestWrapper(object):
"""Runs user's function in a thread. Then wait on the
thread to terminate up to the given `test_timeout` seconds, raising
`AssertionError` if user's function exits with exception or times out.
"""
# We use the saved member when printing to facilitate patching by our
# self-tests
_stderr = sys.stderr
def __init__(self, fun, test_timeout):
"""
:param callable fun: the function to run in thread, no args.
:param int | float test_timeout: maximum number of seconds to wait for
thread to exit.
"""
self._fun = fun
if test_timeout is None:
# NOTE: we handle default here to facilitate patching of
# DEFAULT_TEST_TIMEOUT in our self-tests
self._test_timeout = DEFAULT_TEST_TIMEOUT
else:
self._test_timeout = test_timeout
# Save possibly-patched class-level _stderr value in instance so in case
# user's function times out and later exits with exception, our
# exception handler in `_thread_entry` won't inadvertently output to the
# wrong object.
self._stderr = self._stderr
self._fun_result = None # result returned by function being run
self._exc_info = None
def kick_off(self):
"""Run user's function in a thread. Then wait on the
thread to terminate up to self._test_timeout seconds, raising
`AssertionError` if user's function exits with exception or times out.
:return: the value returned by function if function exited without
exception and didn't time out
:raises AssertionError: if user's function timed out or exited with
exception.
"""
try:
runner = threading.Thread(target=self._thread_entry)
# `daemon = True` so that the script won't wait for thread's exit
runner.daemon = True
runner.start()
runner.join(self._test_timeout)
if runner.is_alive():
raise AssertionError('The test timed out.')
if self._exc_info is not None:
if isinstance(self._exc_info[1], unittest.SkipTest):
raise self._exc_info[1]
# Fail the test because the thread running the test's start()
# failed
raise AssertionError(self._exc_info_to_str(self._exc_info))
return self._fun_result
finally:
# Facilitate garbage collection
self._exc_info = None
self._fun = None
def _thread_entry(self):
"""Our test-execution thread entry point that calls the test's `start()`
method.
Here, we catch all exceptions from `start()`, save the `exc_info` for
processing by `_kick_off()`, and print the stack trace to `sys.stderr`.
"""
try:
self._fun_result = self._fun()
except: # pylint: disable=W0702
self._exc_info = sys.exc_info()
del self._fun_result # to force exception on inadvertent access
if not isinstance(self._exc_info[1], unittest.SkipTest):
print(
'ERROR start() of test {} failed:\n{}'.format(
self,
self._exc_info_to_str(self._exc_info)),
end='',
file=self._stderr)
@staticmethod
def _exc_info_to_str(exc_info):
"""Convenience method for converting the value returned by
`sys.exc_info()` to a string.
:param tuple exc_info: Value returned by `sys.exc_info()`.
:return: A string representation of the given `exc_info`.
:rtype: str
"""
return ''.join(traceback.format_exception(*exc_info))
|