File: conftest.py

package info (click to toggle)
python-hypothesis 6.138.0-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 15,272 kB
  • sloc: python: 62,853; ruby: 1,107; sh: 253; makefile: 41; javascript: 6
file content (311 lines) | stat: -rw-r--r-- 11,092 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
# This file is part of Hypothesis, which may be found at
# https://github.com/HypothesisWorks/hypothesis/
#
# Copyright the Hypothesis Authors.
# Individual contributors are listed in AUTHORS.rst and the git log.
#
# This Source Code Form is subject to the terms of the Mozilla Public License,
# v. 2.0. If a copy of the MPL was not distributed with this file, You can
# obtain one at https://mozilla.org/MPL/2.0/.

import gc
import inspect
import json
import os
import random
import sys
import time as time_module
from functools import wraps
from pathlib import Path

import pytest
from _pytest.monkeypatch import MonkeyPatch

from hypothesis import is_hypothesis_test, settings
from hypothesis._settings import is_in_ci
from hypothesis.errors import NonInteractiveExampleWarning
from hypothesis.internal.compat import add_note
from hypothesis.internal.conjecture import junkdrawer

from tests.common import TIME_INCREMENT
from tests.common.setup import run
from tests.common.utils import raises_warning

run()

# Skip collection of tests which require the Django test runner,
# or that don't work on the current version of Python.
collect_ignore_glob = ["django/*"]
# this checks up until py319
for minor_increment in range(10):
    minor = 10 + minor_increment
    if sys.version_info < (3, minor):
        collect_ignore_glob.append(f"cover/*py3{minor}*.py")

if sys.version_info >= (3, 11):
    collect_ignore_glob.append("cover/test_asyncio.py")  # @asyncio.coroutine removed


in_shrinking_benchmark = False


def pytest_configure(config):
    config.addinivalue_line("markers", "slow: pandas expects this marker to exist.")
    config.addinivalue_line(
        "markers",
        "xp_min_version(api_version): run when greater or equal to api_version",
    )
    config.addinivalue_line("markers", "xf_crosshair: selection for xfailing symbolics")

    if config.getoption("--hypothesis-benchmark-shrinks"):
        # we'd like to support xdist here, but a session-scope fixture won't
        # be enough: https://github.com/pytest-dev/pytest-xdist/issues/271.
        # Need a lockfile or equivalent.

        assert config.getoption(
            "--hypothesis-benchmark-output"
        ), "must specify shrinking output file"

        global in_shrinking_benchmark
        in_shrinking_benchmark = True


def pytest_addoption(parser):
    parser.addoption("--hypothesis-update-outputs", action="store_true")
    parser.addoption("--hypothesis-benchmark-shrinks", type=str, choices=["new", "old"])
    parser.addoption("--hypothesis-benchmark-output", type=str)

    # New in pytest 6, so we add a shim on old versions to avoid missing-arg errors
    arg = "--durations-min"
    if arg not in sum((a._long_opts for g in parser._groups for a in g.options), []):
        parser.addoption(arg, action="store", default=1.0)


@pytest.fixture(scope="function", params=["warns", "raises"])
def warns_or_raises(request):
    """This runs the test twice: first to check that a warning is emitted
    and execution continues successfully despite the warning; then to check
    that the raised warning is handled properly.
    """
    if request.param == "raises":
        return raises_warning
    else:
        return pytest.warns


# crosshair needs actual time for its path timeouts; load it before patching
try:
    import hypothesis_crosshair_provider.crosshair_provider  # noqa: F401
except ImportError:
    pass


# monkeypatch is not thread-safe, so pytest-run-parallel will skip all our tests
# if we define this.
if settings._current_profile != "threading":

    @pytest.fixture(scope="function", autouse=True)
    def _consistently_increment_time(monkeypatch):
        """Rather than rely on real system time we monkey patch time.time so that
        it passes at a consistent rate between calls.

        The reason for this is that when these tests run in CI, their performance is
        extremely variable and the VM the tests are on might go to sleep for a bit,
        introducing arbitrary delays. This can cause a number of tests to fail
        flakily.

        Replacing time with a fake version under our control avoids this problem.
        """
        frozen = False

        current_time = time_module.time()

        def time():
            nonlocal current_time
            if not frozen:
                current_time += TIME_INCREMENT
            return current_time

        def sleep(naptime):
            nonlocal current_time
            current_time += naptime

        def freeze():
            nonlocal frozen
            frozen = True

        def _patch(name, fn):
            monkeypatch.setattr(
                time_module, name, wraps(getattr(time_module, name))(fn)
            )

        _patch("time", time)
        _patch("monotonic", time)
        _patch("perf_counter", time)
        _patch("sleep", sleep)
        monkeypatch.setattr(time_module, "freeze", freeze, raising=False)

        # In the patched time regime, observing it causes it to increment. To avoid reintroducing
        # non-determinism due to GC running at arbitrary times, we patch the GC observer
        # to NOT increment time.

        monkeypatch.setattr(junkdrawer, "_perf_counter", time)

        if hasattr(gc, "callbacks"):
            # ensure timer callback is added, then bracket it by freeze/unfreeze below
            junkdrawer.gc_cumulative_time()

            _was_frozen = False

            def _freezer(*_):
                nonlocal _was_frozen, frozen
                _was_frozen = frozen
                frozen = True

            def _unfreezer(*_):
                nonlocal _was_frozen, frozen
                frozen = _was_frozen

            gc.callbacks.insert(0, _freezer)  # freeze before gc callback
            gc.callbacks.append(_unfreezer)  # unfreeze after

            yield

            assert gc.callbacks.pop(0) == _freezer
            assert gc.callbacks.pop() == _unfreezer
        else:  # pragma: no cover # branch never taken in CPython
            yield


random_states_after_tests = {}
independent_random = random.Random()


@pytest.hookimpl(hookwrapper=True)
def pytest_runtest_call(item):
    if item.config.getoption("--hypothesis-benchmark-shrinks"):
        yield from _benchmark_shrinks(item)
        # ideally benchmark shrinking would not be mutually exclusive with the
        # other checks in this function, but it's cleaner to early-return here,
        # and in practice they will error in normal tests before one runs a
        # benchmark.
        return
    # This hookwrapper checks for PRNG state leaks from Hypothesis tests.
    # See: https://github.com/HypothesisWorks/hypothesis/issues/1919
    if (
        not (hasattr(item, "obj") and is_hypothesis_test(item.obj))
        # we disable this check on the threading job, due to races in the global
        # state.
        or settings._current_profile == "threading"
    ):
        outcome = yield
    elif "pytest_randomly" in sys.modules:
        # See https://github.com/HypothesisWorks/hypothesis/issues/3041 - this
        # branch exists to make it easier on external contributors, but should
        # never run in our CI (because that would disable the check entirely).
        assert not is_in_ci()
        outcome = yield
    else:
        # We start by peturbing the state of the PRNG, because repeatedly
        # leaking PRNG state resets state_after to the (previously leaked)
        # state_before, and that just shows as "no use of random".
        random.seed(independent_random.randrange(2**32))
        before = random.getstate()
        outcome = yield
        after = random.getstate()
        if before != after:
            if after in random_states_after_tests:
                raise Exception(
                    f"{item.nodeid!r} and {random_states_after_tests[after]!r} "
                    "both used the `random` module, and finished with the "
                    "same global `random.getstate()`; this is probably a nasty bug!"
                )
            random_states_after_tests[after] = item.nodeid

    # Annotate usage of .example() with a hint about alternatives
    if isinstance(getattr(outcome, "exception", None), NonInteractiveExampleWarning):
        add_note(
            outcome.exception,
            "For hypothesis' own test suite, consider using one of the helper "
            "methods in tests.common.debug instead.",
        )


timer = time_module.process_time


def _worker_path(session: pytest.Session) -> Path:
    return (
        Path(session.config.getoption("--hypothesis-benchmark-output")).parent
        # https://pytest-xdist.readthedocs.io/en/stable/how-to.html#envvar-PYTEST_XDIST_WORKER
        / f"shrinking_results_{os.environ['PYTEST_XDIST_WORKER']}.json"
    )


def _benchmark_shrinks(item: pytest.Function) -> None:
    from hypothesis.internal.conjecture.shrinker import Shrinker

    # this isn't perfect, but it is cheap!
    if "minimal(" not in inspect.getsource(item.function):
        pytest.skip("(probably) does not call minimal()")

    actual_shrink = Shrinker.shrink
    shrink_calls = []
    shrink_time = []

    def shrink(self, *args, **kwargs):
        nonlocal shrink_calls
        nonlocal shrink_time
        start_t = timer()
        result = actual_shrink(self, *args, **kwargs)
        shrink_calls.append(self.engine.call_count - self.initial_calls)
        shrink_time.append(timer() - start_t)
        return result

    monkeypatch = MonkeyPatch()
    monkeypatch.setattr(Shrinker, "shrink", shrink)

    # pytest_runtest_call must yield at some point. This executes the test function
    # n - 1 times, and then yields for the final execution.
    for _ in range(5 - 1):
        item.runtest()
    yield

    monkeypatch.undo()

    # remove leading hypothesis-python/tests/...
    nodeid = item.nodeid.rsplit("/", 1)[1]

    results_p = _worker_path(item.session)
    if not results_p.exists():
        results_p.write_text(json.dumps({"calls": {}, "time": {}}))

    data = json.loads(results_p.read_text())
    data["calls"][nodeid] = shrink_calls
    data["time"][nodeid] = shrink_time
    results_p.write_text(json.dumps(data))


def pytest_sessionfinish(session, exitstatus):
    if not (mode := session.config.getoption("--hypothesis-benchmark-shrinks")):
        return
    # only run on the controller process, not the workers
    if hasattr(session.config, "workerinput"):
        return

    results = {"calls": {}, "time": {}}
    output_p = Path(session.config.getoption("--hypothesis-benchmark-output"))
    for p in output_p.parent.iterdir():
        if p.name.startswith("shrinking_results_"):
            worker_results = json.loads(p.read_text())
            results["calls"] |= worker_results["calls"]
            results["time"] |= worker_results["time"]
            p.unlink()

    results = {mode: results}
    if not output_p.exists():
        output_p.write_text(json.dumps(results))
    else:
        data = json.loads(output_p.read_text())
        data[mode] = results[mode]
        output_p.write_text(json.dumps(data))