| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
 100
 101
 102
 103
 104
 105
 106
 107
 108
 109
 110
 111
 112
 113
 114
 115
 116
 117
 118
 119
 120
 121
 122
 123
 124
 125
 126
 127
 128
 129
 130
 131
 132
 133
 134
 135
 136
 137
 138
 139
 140
 141
 142
 143
 144
 145
 146
 147
 148
 149
 150
 151
 152
 153
 154
 155
 156
 157
 158
 159
 160
 161
 162
 163
 164
 165
 166
 167
 168
 169
 170
 171
 172
 173
 174
 175
 176
 177
 178
 179
 180
 181
 182
 183
 184
 185
 186
 187
 188
 189
 190
 191
 192
 193
 194
 195
 196
 
 | from __future__ import annotations as _annotations
import functools
import gc
import importlib.util
import json
import os
import re
import sys
from dataclasses import dataclass
from pathlib import Path
from time import sleep, time
from typing import Any, Callable, Literal
import hypothesis
import pytest
from pydantic_core import ArgsKwargs, CoreSchema, SchemaValidator, ValidationError
from pydantic_core.core_schema import CoreConfig, ExtraBehavior
__all__ = 'Err', 'PyAndJson', 'assert_gc', 'is_free_threaded', 'plain_repr', 'infinite_generator'
hypothesis.settings.register_profile('fast', max_examples=2)
hypothesis.settings.register_profile('slow', max_examples=1_000)
hypothesis.settings.load_profile(os.getenv('HYPOTHESIS_PROFILE', 'fast'))
try:
    is_free_threaded = not sys._is_gil_enabled()
except AttributeError:
    is_free_threaded = False
def plain_repr(obj):
    r = repr(obj)
    r = re.sub(r',\s*([)}])', r'\1', r)
    r = re.sub(r'\s+', '', r)
    return r
@dataclass
class Err:
    message: str
    errors: Any | None = None
    def __repr__(self):
        if self.errors:
            return f'Err({self.message!r}, errors={self.errors!r})'
        else:
            return f'Err({self.message!r})'
def json_default(obj):
    if isinstance(obj, ArgsKwargs):
        raise pytest.skip('JSON skipping ArgsKwargs')
    else:
        raise TypeError(f'Object of type {type(obj).__name__} is not JSON serializable')
class PyAndJsonValidator:
    def __init__(
        self,
        schema: CoreSchema,
        config: CoreConfig | None = None,
        *,
        validator_type: Literal['json', 'python'] | None = None,
    ):
        self.validator = SchemaValidator(schema, config)
        self.validator_type = validator_type
    def validate_python(self, py_input, strict: bool | None = None, context: Any = None):
        return self.validator.validate_python(py_input, strict=strict, context=context)
    def validate_json(self, json_str: str, strict: bool | None = None, context: Any = None):
        return self.validator.validate_json(json_str, strict=strict, context=context)
    def validate_test(
        self, py_input, strict: bool | None = None, context: Any = None, extra: ExtraBehavior | None = None
    ):
        if self.validator_type == 'json':
            return self.validator.validate_json(
                json.dumps(py_input, default=json_default),
                strict=strict,
                extra=extra,
                context=context,
            )
        else:
            assert self.validator_type == 'python', self.validator_type
            return self.validator.validate_python(py_input, strict=strict, context=context, extra=extra)
    def isinstance_test(self, py_input, strict: bool | None = None, context: Any = None):
        if self.validator_type == 'json':
            try:
                self.validator.validate_json(json.dumps(py_input), strict=strict, context=context)
                return True
            except ValidationError:
                return False
        else:
            assert self.validator_type == 'python', self.validator_type
            return self.validator.isinstance_python(py_input, strict=strict, context=context)
PyAndJson = type[PyAndJsonValidator]
@pytest.fixture(params=['python', 'json'])
def py_and_json(request) -> PyAndJson:
    class ChosenPyAndJsonValidator(PyAndJsonValidator):
        __init__ = functools.partialmethod(PyAndJsonValidator.__init__, validator_type=request.param)
    return ChosenPyAndJsonValidator
class StrictModeType:
    def __init__(self, schema: bool, extra: bool):
        assert schema or extra
        self.schema = schema
        self.validator_args = {'strict': True} if extra else {}
@pytest.fixture(
    params=[
        StrictModeType(schema=True, extra=False),
        StrictModeType(schema=False, extra=True),
        StrictModeType(schema=True, extra=True),
    ],
    ids=['strict-schema', 'strict-extra', 'strict-both'],
)
def strict_mode_type(request) -> StrictModeType:
    return request.param
@pytest.fixture
def tmp_work_path(tmp_path: Path):
    """
    Create a temporary working directory.
    """
    previous_cwd = Path.cwd()
    os.chdir(tmp_path)
    yield tmp_path
    os.chdir(previous_cwd)
@pytest.fixture
def import_execute(request, tmp_work_path: Path):
    def _import_execute(source: str, *, custom_module_name: str | None = None):
        module_name = custom_module_name or request.node.name
        module_path = tmp_work_path / f'{module_name}.py'
        module_path.write_text(source)
        spec = importlib.util.spec_from_file_location('__main__', str(module_path))
        module = importlib.util.module_from_spec(spec)
        try:
            spec.loader.exec_module(module)
        except KeyboardInterrupt:
            print('KeyboardInterrupt')
        else:
            return module
    return _import_execute
@pytest.fixture
def pydantic_version():
    try:
        import pydantic
        # include major and minor version only
        return '.'.join(pydantic.__version__.split('.')[:2])
    except ImportError:
        return 'latest'
def infinite_generator():
    i = 0
    while True:
        yield i
        i += 1
def assert_gc(test: Callable[[], bool], timeout: float = 10) -> None:
    """Helper to retry garbage collection until the test passes or timeout is
    reached.
    This is useful on free-threading where the GC collect call finishes before
    all cleanup is done.
    """
    start = now = time()
    while now - start < timeout:
        if test():
            return
        gc.collect()
        sleep(0.1)
        now = time()
    raise AssertionError('Timeout waiting for GC')
 |