1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130
|
"""Timeout management for tests."""
from __future__ import annotations
import dataclasses
import datetime
import functools
import os
import signal
import time
import typing as t
from .io import (
read_json_file,
)
from .config import (
CommonConfig,
TestConfig,
)
from .util import (
display,
TimeoutExpiredError,
)
from .thread import (
WrappedThread,
)
from .constants import (
TIMEOUT_PATH,
)
from .test import (
TestTimeout,
)
@dataclasses.dataclass(frozen=True)
class TimeoutDetail:
"""Details required to enforce a timeout on test execution."""
_DEADLINE_FORMAT = '%Y-%m-%dT%H:%M:%SZ' # format used to maintain backwards compatibility with previous versions of ansible-test
deadline: datetime.datetime
duration: int | float # minutes
@property
def remaining(self) -> datetime.timedelta:
"""The amount of time remaining before the timeout occurs. If the timeout has passed, this will be a negative duration."""
return self.deadline - datetime.datetime.now(tz=datetime.timezone.utc).replace(microsecond=0)
def to_dict(self) -> dict[str, t.Any]:
"""Return timeout details as a dictionary suitable for JSON serialization."""
return dict(
deadline=self.deadline.strftime(self._DEADLINE_FORMAT),
duration=self.duration,
)
@staticmethod
def from_dict(value: dict[str, t.Any]) -> TimeoutDetail:
"""Return a TimeoutDetail instance using the value previously returned by to_dict."""
return TimeoutDetail(
deadline=datetime.datetime.strptime(value['deadline'], TimeoutDetail._DEADLINE_FORMAT).replace(tzinfo=datetime.timezone.utc),
duration=value['duration'],
)
@staticmethod
def create(duration: int | float) -> TimeoutDetail | None:
"""Return a new TimeoutDetail instance for the specified duration (in minutes), or None if the duration is zero."""
if not duration:
return None
if duration == int(duration):
duration = int(duration)
return TimeoutDetail(
deadline=datetime.datetime.now(datetime.timezone.utc).replace(microsecond=0) + datetime.timedelta(seconds=int(duration * 60)),
duration=duration,
)
def get_timeout() -> TimeoutDetail | None:
"""Return details about the currently set timeout, if any, otherwise return None."""
try:
return TimeoutDetail.from_dict(read_json_file(TIMEOUT_PATH))
except FileNotFoundError:
return None
def configure_timeout(args: CommonConfig) -> None:
"""Configure the timeout."""
if isinstance(args, TestConfig):
configure_test_timeout(args) # only tests are subject to the timeout
def configure_test_timeout(args: TestConfig) -> None:
"""Configure the test timeout."""
timeout = get_timeout()
if not timeout:
return
timeout_remaining = timeout.remaining
test_timeout = TestTimeout(timeout.duration)
if timeout_remaining <= datetime.timedelta():
test_timeout.write(args)
raise TimeoutExpiredError(f'The {timeout.duration} minute test timeout expired {timeout_remaining * -1} ago at {timeout.deadline}.')
display.info(f'The {timeout.duration} minute test timeout expires in {timeout_remaining} at {timeout.deadline}.', verbosity=1)
def timeout_handler(_dummy1: t.Any, _dummy2: t.Any) -> None:
"""Runs when SIGUSR1 is received."""
test_timeout.write(args)
raise TimeoutExpiredError(f'Tests aborted after exceeding the {timeout.duration} minute time limit.')
def timeout_waiter(timeout_seconds: int) -> None:
"""Background thread which will kill the current process if the timeout elapses."""
time.sleep(timeout_seconds)
os.kill(os.getpid(), signal.SIGUSR1)
signal.signal(signal.SIGUSR1, timeout_handler)
instance = WrappedThread(functools.partial(timeout_waiter, timeout_remaining.total_seconds()))
instance.daemon = True
instance.start()
|