1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365
|
import pytest
import bdb
from time import sleep
from logging import LogRecord
from traceback import format_exception
from typing import Any, Generator, Optional
from collections.abc import Iterable
from pytest_retry.configs import Defaults
from pytest_retry.server import ReportHandler, OfflineReporter, ReportServer, ClientReporter
from _pytest.terminal import TerminalReporter
from _pytest.logging import caplog_records_key
outcome_key = pytest.StashKey[str]()
attempts_key = pytest.StashKey[int]()
duration_key = pytest.StashKey[float]()
server_port_key = pytest.StashKey[int]()
stages = ("setup", "call", "teardown")
RETRY = 0
FAIL = 1
EXIT = 2
PASS = 3
class ConfigurationError(Exception):
pass
class ExceptionFilter:
"""
Helper class which returns a bool when called based on the filter type (expected or excluded)
and whether the exception exists within the list
"""
def __init__(self, expected_exceptions: Iterable, excluded_exceptions: Iterable):
if expected_exceptions and excluded_exceptions:
raise ConfigurationError(
"filtered_exceptions and excluded_exceptions are exclusive and cannot "
"be defined simultaneously."
)
self.list_type = bool(expected_exceptions)
self.filter = expected_exceptions or excluded_exceptions or []
def __call__(self, exception_type: Optional[type[BaseException]]) -> bool:
try:
return not self.filter or bool(self.list_type == bool(exception_type in self.filter))
except TypeError:
raise ConfigurationError(
"Filtered or excluded exceptions must be passed as a collection. If using the "
"flaky mark, this means `only_on` or `exclude` args must be a collection too."
)
def __bool__(self) -> bool:
return bool(self.filter)
class RetryManager:
"""
Stores statistics and reports for flaky tests and fixtures which have
failed at least once during the test session and need to be retried
"""
def __init__(self) -> None:
self.reporter: ReportHandler = OfflineReporter()
self.trace_limit: Optional[int] = 1
self.node_stats: dict[str, dict] = {}
self.messages = (
" failed on attempt {attempt}! Retrying!\n\t",
" failed after {attempt} attempts!\n\t",
" teardown failed on attempt {attempt}! Exiting immediately!\n\t",
" passed on attempt {attempt}!\n\t",
)
def log_attempt(
self, attempt: int, name: str, exc: Optional[pytest.ExceptionInfo], result: int
) -> None:
message = self.messages[result].format(attempt=attempt)
formatted_trace = ""
if exc:
err = (exc.type, exc.value, exc.tb)
formatted_trace = (
formatted_trace.join(format_exception(*err, limit=self.trace_limit))
.replace("\n", "\n\t")
.rstrip()
)
self.reporter.record_attempt([f"\t{name}", message, formatted_trace, "\n\n"])
def build_retry_report(self, terminal_reporter: TerminalReporter) -> None:
contents = self.reporter.stream.getvalue()
if not contents:
return
terminal_reporter.write("\n")
terminal_reporter.section(
"the following tests were retried", sep="=", bold=True, yellow=True
)
terminal_reporter.write(contents)
terminal_reporter.section("end of test retry report", sep="=", bold=True, yellow=True)
terminal_reporter.write("\n")
def record_node_stats(self, report: pytest.TestReport) -> None:
self.node_stats[report.nodeid]["outcomes"][report.when].append(report.outcome)
self.node_stats[report.nodeid]["durations"][report.when].append(report.duration)
def simple_outcome(self, item: pytest.Item) -> str:
"""
Return failed if setup, teardown, or final call outcome is 'failed'
Return skipped if test was skipped
"""
test_outcomes = self.node_stats[item.nodeid]["outcomes"]
for outcome in ("skipped", "failed"):
if outcome in test_outcomes["setup"]:
return outcome
if not test_outcomes["call"] or test_outcomes["call"][-1] == "failed":
return "failed"
# can probably just simplify this to return test_outcomes["teardown"] as a fallthrough
if "failed" in test_outcomes["teardown"]:
return "failed"
return "passed"
def simple_duration(self, item: pytest.Item) -> float:
"""
Return total duration for test summing setup, teardown, and final call
"""
return sum(self.node_stats[item.nodeid]["durations"][stage][-1] for stage in stages)
def sum_attempts(self, item: pytest.Item) -> int:
return len(self.node_stats[item.nodeid]["outcomes"]["call"])
retry_manager = RetryManager()
def has_interactive_exception(call: pytest.CallInfo) -> bool:
if call.excinfo is None:
return False
if isinstance(call.excinfo.value, bdb.BdbQuit):
# Special control flow exception.
return False
return True
def should_handle_retry(call: pytest.CallInfo) -> bool:
if call.excinfo is None:
return False
# if teardown stage, don't retry
# may handle fixture setup retries in v2 if requested. For now, this is fine.
if call.when in {"setup", "teardown"}:
return False
# if test was skipped, don't retry
if call.excinfo.typename == "Skipped":
return False
return True
@pytest.hookimpl(hookwrapper=True)
def pytest_runtest_protocol(item: pytest.Item) -> Optional[object]:
retry_manager.node_stats[item.nodeid] = {
"outcomes": {k: [] for k in stages},
"durations": {k: [0.0] for k in stages},
}
yield
item.stash[outcome_key] = retry_manager.simple_outcome(item)
item.stash[duration_key] = retry_manager.simple_duration(item) # always overwrite, for now
item.stash[attempts_key] = retry_manager.sum_attempts(item)
@pytest.hookimpl(hookwrapper=True, tryfirst=True)
def pytest_runtest_makereport(
item: pytest.Item, call: pytest.CallInfo
) -> Generator[None, pytest.TestReport, None]:
outcome = yield
original_report: pytest.TestReport = outcome.get_result()
retry_manager.record_node_stats(original_report)
# Set dynamic outcome for each stage until runtest protocol has completed
item.stash[outcome_key] = original_report.outcome
if not should_handle_retry(call):
return
# xfail tests don't raise a Skipped exception if they fail, but are still marked as skipped
if original_report.skipped is True:
return
flake_mark = item.get_closest_marker("flaky")
if flake_mark is None:
return
condition = flake_mark.kwargs.get("condition")
if condition is False:
return
exception_filter = ExceptionFilter(
flake_mark.kwargs.get("only_on", []),
flake_mark.kwargs.get("exclude", []),
) or ExceptionFilter(Defaults.FILTERED_EXCEPTIONS, Defaults.EXCLUDED_EXCEPTIONS)
if not exception_filter(call.excinfo.type): # type: ignore
return
retries = flake_mark.kwargs.get("retries", Defaults.RETRIES)
delay = flake_mark.kwargs.get("delay", Defaults.RETRY_DELAY)
cumulative_timing = flake_mark.kwargs.get("cumulative_timing", Defaults.CUMULATIVE_TIMING)
attempts = 1
hook = item.ihook
while True:
# Default teardowns are already excluded, so this must be the `call` stage
# Try preliminary teardown using a fake class to ensure every local fixture (i.e.
# excluding session) is torn down. Yes, including module and class fixtures
t_call = pytest.CallInfo.from_call(
lambda: hook.pytest_runtest_teardown(
item=item,
nextitem=pytest.Class.from_parent(item.session, name="Fakeboi"),
),
when="teardown",
)
# If teardown fails, break. Flaky teardowns are unacceptable and should raise immediately
if t_call.excinfo:
item.stash[outcome_key] = "failed"
retry_manager.log_attempt(
attempt=attempts, name=item.name, exc=t_call.excinfo, result=EXIT
)
# Prevents a KeyError when an error during retry teardown causes a redundant teardown
empty: dict[str, list[LogRecord]] = {}
item.stash[caplog_records_key] = empty
break
# If teardown passes, send report that the test is being retried
if attempts == 1:
original_report.outcome = "retried" # type: ignore
hook.pytest_runtest_logreport(report=original_report)
original_report.outcome = "failed"
retry_manager.log_attempt(attempt=attempts, name=item.name, exc=call.excinfo, result=RETRY)
sleep(delay)
# Calling _initrequest() is required to reset fixtures for a retry. Make public pls?
item._initrequest() # type: ignore[attr-defined]
pytest.CallInfo.from_call(lambda: hook.pytest_runtest_setup(item=item), when="setup")
call = pytest.CallInfo.from_call(lambda: hook.pytest_runtest_call(item=item), when="call")
retry_report = pytest.TestReport.from_item_and_call(item, call)
retry_manager.record_node_stats(retry_report)
# Do the exception interaction step
# (may not bother to support this since this is designed for automated runs, not debugging)
if has_interactive_exception(call):
hook.pytest_exception_interact(node=item, call=call, report=retry_report)
attempts += 1
should_keep_retrying = (
not retry_report.passed
and attempts <= retries
and exception_filter(call.excinfo.type) # type: ignore
)
if not should_keep_retrying:
original_report.outcome = retry_report.outcome
original_report.longrepr = retry_report.longrepr
if cumulative_timing is False:
original_report.duration = retry_report.duration
else:
original_report.duration = sum(
retry_manager.node_stats[original_report.nodeid]["durations"]["call"]
)
retry_manager.log_attempt(
attempt=attempts,
name=item.name,
exc=call.excinfo,
result=FAIL if retry_report.failed else PASS,
)
break
def pytest_terminal_summary(terminalreporter: TerminalReporter) -> None:
retry_manager.build_retry_report(terminalreporter)
def pytest_report_teststatus(
report: pytest.TestReport,
) -> Optional[tuple[str, str, tuple[str, dict]]]:
if report.outcome == "retried":
return "retried", "R", ("RETRY", {"yellow": True})
return None
class XdistHook:
@staticmethod
def pytest_configure_node(node: Any) -> None: # Xdist WorkerController instance
# Tells each worker node which port was randomly assigned to the retry server
node.workerinput["server_port"] = node.config.stash[server_port_key]
def pytest_configure(config: pytest.Config) -> None:
config.addinivalue_line(
"markers",
"flaky(retries=1, delay=0, only_on=..., exclude=..., condition=...): indicate a flaky "
"test which will be retried the number of times specified with an (optional) specified "
"delay between each attempt. Collections of one or more exceptions can be passed so "
"that the test is retried only on those exceptions, or excluding those exceptions. "
"Any statement which returns a bool can be used as a condition",
)
verbosity = config.getoption("verbose")
if verbosity:
# set trace limit according to verbosity count, or unlimited if 5
retry_manager.trace_limit = verbosity if verbosity < 5 else None
Defaults.configure(config)
Defaults.add("FILTERED_EXCEPTIONS", config.hook.pytest_set_filtered_exceptions() or [])
Defaults.add("EXCLUDED_EXCEPTIONS", config.hook.pytest_set_excluded_exceptions() or [])
if config.pluginmanager.has_plugin("xdist") and config.getoption("numprocesses", False):
config.pluginmanager.register(XdistHook())
retry_manager.reporter = ReportServer()
config.stash[server_port_key] = retry_manager.reporter.initialize_server()
elif hasattr(config, "workerinput"):
# pytest-xdist doesn't use the config stash, so have to ignore a type error here
retry_manager.reporter = ClientReporter(config.workerinput["server_port"]) # type: ignore
RETRIES_HELP_TEXT = "number of times to retry failed tests. Defaults to 0."
DELAY_HELP_TEXT = "configure a delay (in seconds) between retries."
TIMING_HELP_TEXT = "if True, retry duration will be included in overall reported test duration"
def pytest_addoption(parser: pytest.Parser) -> None:
group = parser.getgroup(
"pytest-retry", "retry flaky tests to compensate for intermittent failures"
)
group.addoption(
"--retries",
action="store",
dest="retries",
type=int,
help=RETRIES_HELP_TEXT,
)
group.addoption(
"--retry-delay",
action="store",
dest="retry_delay",
type=float,
help=DELAY_HELP_TEXT,
)
group.addoption(
"--cumulative-timing",
action="store",
dest="cumulative_timing",
type=bool,
help=TIMING_HELP_TEXT,
)
parser.addini("retries", RETRIES_HELP_TEXT, default=0, type="string")
parser.addini("retry_delay", DELAY_HELP_TEXT, default=0, type="string")
parser.addini("cumulative_timing", TIMING_HELP_TEXT, default=False, type="bool")
def pytest_addhooks(pluginmanager: pytest.PytestPluginManager) -> None:
"""This example assumes the hooks are grouped in the 'sample_hook' module."""
from pytest_retry import hooks
pluginmanager.add_hookspecs(hooks)
def pytest_collection_modifyitems(config: pytest.Config, items: list[pytest.Item]) -> None:
if not (config.getoption("--retries") or config.getini("retries")):
return
flaky = pytest.mark.flaky(retries=Defaults.RETRIES)
for item in items:
if "flaky" not in item.keywords:
item.add_marker(flaky)
|