1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86
|
#!/usr/bin/env python
"""
Use to put a timeout on the length of time a script can run. This is
especially useful for checking for scripts that hang.
Usage
=====
NOTE: Make sure you run ``pip install -r requirements-dev.txt`` before running.
To use the script, run::
./timeout "./my-script-to-run" --timeout-after 5
"""
import argparse
import os
import subprocess
import sys
import time
import psutil
class TimeoutException(Exception):
def __init__(self, timeout_len):
msg = f'Script failed to complete within {timeout_len} seconds'
Exception.__init__(self, msg)
def timeout(args):
parent_pid = os.getpid()
child_p = run_script(args)
try:
run_timeout(child_p.pid, args.timeout_after)
except (TimeoutException, KeyboardInterrupt) as e:
proc = psutil.Process(parent_pid)
procs = proc.children(recursive=True)
for child in procs:
child.terminate()
gone, alive = psutil.wait_procs(procs, timeout=1)
for child in alive:
child.kill()
raise e
def run_timeout(pid, timeout_len):
p = psutil.Process(pid)
start_time = time.time()
while p.is_running():
if p.status() == psutil.STATUS_ZOMBIE:
p.kill()
break
current_time = time.time()
# Raise a timeout if the duration of the process is longer than
# the desired timeout.
if current_time - start_time > timeout_len:
raise TimeoutException(timeout_len)
time.sleep(1)
def run_script(args):
return subprocess.Popen(args.script, shell=True)
def main():
parser = argparse.ArgumentParser(usage=__doc__)
parser.add_argument('script', help='The script to run for benchmarking')
parser.add_argument(
'--timeout-after',
required=True,
type=float,
help=(
'The length of time in seconds allowed for the script to run '
'before it time\'s out.'
),
)
args = parser.parse_args()
return timeout(args)
if __name__ == '__main__':
sys.exit(main())
|