1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311
|
# This file is part of Hypothesis, which may be found at
# https://github.com/HypothesisWorks/hypothesis/
#
# Copyright the Hypothesis Authors.
# Individual contributors are listed in AUTHORS.rst and the git log.
#
# This Source Code Form is subject to the terms of the Mozilla Public License,
# v. 2.0. If a copy of the MPL was not distributed with this file, You can
# obtain one at https://mozilla.org/MPL/2.0/.
import gc
import inspect
import json
import os
import random
import sys
import time as time_module
from functools import wraps
from pathlib import Path
import pytest
from _pytest.monkeypatch import MonkeyPatch
from hypothesis import is_hypothesis_test, settings
from hypothesis._settings import is_in_ci
from hypothesis.errors import NonInteractiveExampleWarning
from hypothesis.internal.compat import add_note
from hypothesis.internal.conjecture import junkdrawer
from tests.common import TIME_INCREMENT
from tests.common.setup import run
from tests.common.utils import raises_warning
run()
# Skip collection of tests which require the Django test runner,
# or that don't work on the current version of Python.
collect_ignore_glob = ["django/*"]
# this checks up until py319
for minor_increment in range(10):
minor = 10 + minor_increment
if sys.version_info < (3, minor):
collect_ignore_glob.append(f"cover/*py3{minor}*.py")
if sys.version_info >= (3, 11):
collect_ignore_glob.append("cover/test_asyncio.py") # @asyncio.coroutine removed
in_shrinking_benchmark = False
def pytest_configure(config):
config.addinivalue_line("markers", "slow: pandas expects this marker to exist.")
config.addinivalue_line(
"markers",
"xp_min_version(api_version): run when greater or equal to api_version",
)
config.addinivalue_line("markers", "xf_crosshair: selection for xfailing symbolics")
if config.getoption("--hypothesis-benchmark-shrinks"):
# we'd like to support xdist here, but a session-scope fixture won't
# be enough: https://github.com/pytest-dev/pytest-xdist/issues/271.
# Need a lockfile or equivalent.
assert config.getoption(
"--hypothesis-benchmark-output"
), "must specify shrinking output file"
global in_shrinking_benchmark
in_shrinking_benchmark = True
def pytest_addoption(parser):
parser.addoption("--hypothesis-update-outputs", action="store_true")
parser.addoption("--hypothesis-benchmark-shrinks", type=str, choices=["new", "old"])
parser.addoption("--hypothesis-benchmark-output", type=str)
# New in pytest 6, so we add a shim on old versions to avoid missing-arg errors
arg = "--durations-min"
if arg not in sum((a._long_opts for g in parser._groups for a in g.options), []):
parser.addoption(arg, action="store", default=1.0)
@pytest.fixture(scope="function", params=["warns", "raises"])
def warns_or_raises(request):
"""This runs the test twice: first to check that a warning is emitted
and execution continues successfully despite the warning; then to check
that the raised warning is handled properly.
"""
if request.param == "raises":
return raises_warning
else:
return pytest.warns
# crosshair needs actual time for its path timeouts; load it before patching
try:
import hypothesis_crosshair_provider.crosshair_provider # noqa: F401
except ImportError:
pass
# monkeypatch is not thread-safe, so pytest-run-parallel will skip all our tests
# if we define this.
if settings._current_profile != "threading":
@pytest.fixture(scope="function", autouse=True)
def _consistently_increment_time(monkeypatch):
"""Rather than rely on real system time we monkey patch time.time so that
it passes at a consistent rate between calls.
The reason for this is that when these tests run in CI, their performance is
extremely variable and the VM the tests are on might go to sleep for a bit,
introducing arbitrary delays. This can cause a number of tests to fail
flakily.
Replacing time with a fake version under our control avoids this problem.
"""
frozen = False
current_time = time_module.time()
def time():
nonlocal current_time
if not frozen:
current_time += TIME_INCREMENT
return current_time
def sleep(naptime):
nonlocal current_time
current_time += naptime
def freeze():
nonlocal frozen
frozen = True
def _patch(name, fn):
monkeypatch.setattr(
time_module, name, wraps(getattr(time_module, name))(fn)
)
_patch("time", time)
_patch("monotonic", time)
_patch("perf_counter", time)
_patch("sleep", sleep)
monkeypatch.setattr(time_module, "freeze", freeze, raising=False)
# In the patched time regime, observing it causes it to increment. To avoid reintroducing
# non-determinism due to GC running at arbitrary times, we patch the GC observer
# to NOT increment time.
monkeypatch.setattr(junkdrawer, "_perf_counter", time)
if hasattr(gc, "callbacks"):
# ensure timer callback is added, then bracket it by freeze/unfreeze below
junkdrawer.gc_cumulative_time()
_was_frozen = False
def _freezer(*_):
nonlocal _was_frozen, frozen
_was_frozen = frozen
frozen = True
def _unfreezer(*_):
nonlocal _was_frozen, frozen
frozen = _was_frozen
gc.callbacks.insert(0, _freezer) # freeze before gc callback
gc.callbacks.append(_unfreezer) # unfreeze after
yield
assert gc.callbacks.pop(0) == _freezer
assert gc.callbacks.pop() == _unfreezer
else: # pragma: no cover # branch never taken in CPython
yield
random_states_after_tests = {}
independent_random = random.Random()
@pytest.hookimpl(hookwrapper=True)
def pytest_runtest_call(item):
if item.config.getoption("--hypothesis-benchmark-shrinks"):
yield from _benchmark_shrinks(item)
# ideally benchmark shrinking would not be mutually exclusive with the
# other checks in this function, but it's cleaner to early-return here,
# and in practice they will error in normal tests before one runs a
# benchmark.
return
# This hookwrapper checks for PRNG state leaks from Hypothesis tests.
# See: https://github.com/HypothesisWorks/hypothesis/issues/1919
if (
not (hasattr(item, "obj") and is_hypothesis_test(item.obj))
# we disable this check on the threading job, due to races in the global
# state.
or settings._current_profile == "threading"
):
outcome = yield
elif "pytest_randomly" in sys.modules:
# See https://github.com/HypothesisWorks/hypothesis/issues/3041 - this
# branch exists to make it easier on external contributors, but should
# never run in our CI (because that would disable the check entirely).
assert not is_in_ci()
outcome = yield
else:
# We start by peturbing the state of the PRNG, because repeatedly
# leaking PRNG state resets state_after to the (previously leaked)
# state_before, and that just shows as "no use of random".
random.seed(independent_random.randrange(2**32))
before = random.getstate()
outcome = yield
after = random.getstate()
if before != after:
if after in random_states_after_tests:
raise Exception(
f"{item.nodeid!r} and {random_states_after_tests[after]!r} "
"both used the `random` module, and finished with the "
"same global `random.getstate()`; this is probably a nasty bug!"
)
random_states_after_tests[after] = item.nodeid
# Annotate usage of .example() with a hint about alternatives
if isinstance(getattr(outcome, "exception", None), NonInteractiveExampleWarning):
add_note(
outcome.exception,
"For hypothesis' own test suite, consider using one of the helper "
"methods in tests.common.debug instead.",
)
timer = time_module.process_time
def _worker_path(session: pytest.Session) -> Path:
return (
Path(session.config.getoption("--hypothesis-benchmark-output")).parent
# https://pytest-xdist.readthedocs.io/en/stable/how-to.html#envvar-PYTEST_XDIST_WORKER
/ f"shrinking_results_{os.environ['PYTEST_XDIST_WORKER']}.json"
)
def _benchmark_shrinks(item: pytest.Function) -> None:
from hypothesis.internal.conjecture.shrinker import Shrinker
# this isn't perfect, but it is cheap!
if "minimal(" not in inspect.getsource(item.function):
pytest.skip("(probably) does not call minimal()")
actual_shrink = Shrinker.shrink
shrink_calls = []
shrink_time = []
def shrink(self, *args, **kwargs):
nonlocal shrink_calls
nonlocal shrink_time
start_t = timer()
result = actual_shrink(self, *args, **kwargs)
shrink_calls.append(self.engine.call_count - self.initial_calls)
shrink_time.append(timer() - start_t)
return result
monkeypatch = MonkeyPatch()
monkeypatch.setattr(Shrinker, "shrink", shrink)
# pytest_runtest_call must yield at some point. This executes the test function
# n - 1 times, and then yields for the final execution.
for _ in range(5 - 1):
item.runtest()
yield
monkeypatch.undo()
# remove leading hypothesis-python/tests/...
nodeid = item.nodeid.rsplit("/", 1)[1]
results_p = _worker_path(item.session)
if not results_p.exists():
results_p.write_text(json.dumps({"calls": {}, "time": {}}))
data = json.loads(results_p.read_text())
data["calls"][nodeid] = shrink_calls
data["time"][nodeid] = shrink_time
results_p.write_text(json.dumps(data))
def pytest_sessionfinish(session, exitstatus):
if not (mode := session.config.getoption("--hypothesis-benchmark-shrinks")):
return
# only run on the controller process, not the workers
if hasattr(session.config, "workerinput"):
return
results = {"calls": {}, "time": {}}
output_p = Path(session.config.getoption("--hypothesis-benchmark-output"))
for p in output_p.parent.iterdir():
if p.name.startswith("shrinking_results_"):
worker_results = json.loads(p.read_text())
results["calls"] |= worker_results["calls"]
results["time"] |= worker_results["time"]
p.unlink()
results = {mode: results}
if not output_p.exists():
output_p.write_text(json.dumps(results))
else:
data = json.loads(output_p.read_text())
data[mode] = results[mode]
output_p.write_text(json.dumps(data))
|