1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131
|
import ctypes
import signal
import threading
class BaseTimeoutException(Exception):
"""Base exception for timeouts."""
pass
class JobTimeoutException(BaseTimeoutException):
"""Raised when a job takes longer to complete than the allowed maximum
timeout value.
"""
pass
class HorseMonitorTimeoutException(BaseTimeoutException):
"""Raised when waiting for a horse exiting takes longer than the maximum
timeout value.
"""
pass
class BaseDeathPenalty:
"""Base class to setup job timeouts."""
def __init__(self, timeout, exception=BaseTimeoutException, **kwargs):
self._timeout = timeout
self._exception = exception
def __enter__(self):
self.setup_death_penalty()
def __exit__(self, type, value, traceback):
# Always cancel immediately, since we're done
try:
self.cancel_death_penalty()
except BaseTimeoutException:
# Weird case: we're done with the with body, but now the alarm is
# fired. We may safely ignore this situation and consider the
# body done.
pass
# __exit__ may return True to suppress further exception handling. We
# don't want to suppress any exceptions here, since all errors should
# just pass through, BaseTimeoutException being handled normally to the
# invoking context.
return False
def setup_death_penalty(self):
raise NotImplementedError()
def cancel_death_penalty(self):
raise NotImplementedError()
class UnixSignalDeathPenalty(BaseDeathPenalty):
def handle_death_penalty(self, signum, frame):
raise self._exception(f'Task exceeded maximum timeout value ({self._timeout} seconds)')
def setup_death_penalty(self):
"""Sets up an alarm signal and a signal handler that raises
an exception after the timeout amount (expressed in seconds).
"""
signal.signal(signal.SIGALRM, self.handle_death_penalty)
signal.alarm(self._timeout)
def cancel_death_penalty(self):
"""Removes the death penalty alarm and puts back the system into
default signal handling.
"""
signal.alarm(0)
signal.signal(signal.SIGALRM, signal.SIG_DFL)
class TimerDeathPenalty(BaseDeathPenalty):
def __init__(self, timeout, exception=JobTimeoutException, **kwargs):
super().__init__(timeout, exception, **kwargs)
self._target_thread_id = threading.current_thread().ident
self._timer = None
# Monkey-patch exception with the message ahead of time
# since PyThreadState_SetAsyncExc can only take a class
def init_with_message(self, *args, **kwargs): # noqa
super(exception, self).__init__(f'Task exceeded maximum timeout value ({timeout} seconds)')
self._exception.__init__ = init_with_message
def new_timer(self):
"""Returns a new timer since timers can only be used once."""
return threading.Timer(self._timeout, self.handle_death_penalty)
def handle_death_penalty(self):
"""Raises an asynchronous exception in another thread.
Reference http://docs.python.org/c-api/init.html#PyThreadState_SetAsyncExc for more info.
"""
ret = ctypes.pythonapi.PyThreadState_SetAsyncExc(
ctypes.c_long(self._target_thread_id), ctypes.py_object(self._exception)
)
if ret == 0:
raise ValueError(f'Invalid thread ID {self._target_thread_id}')
elif ret > 1:
ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(self._target_thread_id), 0)
raise SystemError('PyThreadState_SetAsyncExc failed')
def setup_death_penalty(self):
"""Starts the timer."""
if self._timeout <= 0:
return
self._timer = self.new_timer()
self._timer.start()
def cancel_death_penalty(self):
"""Cancels the timer."""
if self._timeout <= 0:
return
self._timer.cancel()
self._timer = None
def get_default_death_penalty_class() -> type[BaseDeathPenalty]:
"""Returns the default death penalty class based on the platform."""
if hasattr(signal, 'SIGALRM'):
return UnixSignalDeathPenalty
else:
return TimerDeathPenalty
|