1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105
|
import os
import subprocess
import sys
import tempfile
import threading
import time
here = os.path.abspath(os.path.dirname(__file__))
class TestApp(threading.Thread):
name = None
args = None
stdin = None
daemon = True
def __init__(self):
super(TestApp, self).__init__()
self.exitcode = None
self.process = None
self.tmpfile = None
self.tmpsize = 0
self.response = None
self.stdout, self.stderr = b'', b''
def start(self, name, args):
self.name = name
self.args = args or []
fd, self.tmpfile = tempfile.mkstemp()
os.close(fd)
touch(self.tmpfile)
self.tmpsize = os.path.getsize(self.tmpfile)
self.response = readfile(self.tmpfile)
super(TestApp, self).start()
def run(self):
cmd = [sys.executable, '-m', 'tests.' + self.name]
if self.tmpfile:
cmd += ['--callback-file', self.tmpfile]
cmd += self.args
env = os.environ.copy()
env['PYTHONUNBUFFERED'] = '1'
self.process = subprocess.Popen(
cmd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=env,
universal_newlines=True,
)
try:
self.stdout, self.stderr = self.process.communicate(self.stdin)
finally:
self.exitcode = self.process.wait()
def is_alive(self):
return self.process is not None and self.exitcode is None
def stop(self):
if self.is_alive():
self.process.kill()
self.join()
if self.tmpfile:
os.unlink(self.tmpfile)
self.tmpfile = None
def wait_for_response(self, timeout=5, interval=0.1):
self.tmpsize = wait_for_change(
self.tmpfile,
last_size=self.tmpsize,
timeout=timeout,
interval=interval,
)
self.response = readfile(self.tmpfile)
def touch(fname, times=None):
with open(fname, 'a'):
os.utime(fname, times)
def readfile(path):
with open(path, 'rb') as fp:
return fp.readlines()
def wait_for_change(path, last_size=0, timeout=5, interval=0.1):
start = time.time()
size = os.path.getsize(path)
while size == last_size:
duration = time.time() - start
sleepfor = interval
if timeout is not None: # pragma: no cover
if duration >= timeout:
raise RuntimeError(
'timeout waiting for change to file=%s' % (path,)
)
sleepfor = min(timeout - duration, sleepfor)
time.sleep(sleepfor)
size = os.path.getsize(path)
return size
|