1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109
|
import functools
import json
from multiprocess import Process
from multiprocess import Queue
import traceback
from mock import Mock
import requests
from six.moves import socketserver
class TestsTimeoutException(Exception):
pass
def time_limit(seconds, fp, func, *args, **kwargs):
if fp:
if not hasattr(fp, 'write'):
raise TypeError("Expected 'file-like' object, got '%s'" % fp)
else:
def record(msg):
fp.write(msg)
else:
def record(msg):
return
def capture_results(msg_queue, func, *args, **kwargs):
try:
result = func(*args, **kwargs)
except Exception as e:
msg_queue.put(
"Running function '%s' resulted in exception '%s' with "
"message: '%s'\n" % (func.__name__, e.__class__.__name__, e))
# no point re-raising an exception from the subprocess, instead
# return False
return False
else:
msg_queue.put(
"Running function '%s' finished with result '%s', and"
"stack:\n%s\n" % (func.__name__, result,
traceback.format_stack()))
return result
messages = Queue()
# although creating a separate process is expensive it's the only way to
# ensure cross platform that we can cleanly terminate after timeout
p = Process(target=functools.partial(capture_results, messages, func),
args=args, kwargs=kwargs)
p.start()
p.join(seconds)
if p.is_alive():
p.terminate()
while not messages.empty():
record(messages.get())
record("Running function '%s' did not finish\n" % func.__name__)
raise TestsTimeoutException
else:
while not messages.empty():
record(messages.get())
record("Running function '%s' finished with exit code '%s'\n"
% (func.__name__, p.exitcode))
class NullServer(socketserver.TCPServer):
request_queue_size = 1
def __init__(self, server_address, *args, **kwargs):
# TCPServer is old style in python 2.x so cannot use
# super() correctly, explicitly call __init__.
# simply init'ing is sufficient to open the port, which
# with the server not started creates a black hole server
socketserver.TCPServer.__init__(
self, server_address, socketserver.BaseRequestHandler,
*args, **kwargs)
def build_response_mock(status_code, json_body=None, headers=None,
add_content_length=True, **kwargs):
real_response = requests.Response()
real_response.status_code = status_code
text = None
if json_body is not None:
text = json.dumps(json_body)
if add_content_length and headers is not {}:
real_response.headers['content-length'] = len(text)
if headers is not None:
for k, v in headers.items():
real_response.headers[k] = v
for k, v in kwargs.items():
setattr(real_response, k, v)
response = Mock(wraps=real_response, autospec=True)
if text:
response.text = text
# for some reason, wraps cannot handle attributes which are dicts
# and accessed by key
response.headers = real_response.headers
response.content = text
response.json = lambda: json_body
return response
|