1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167
|
import logging
from functools import cached_property
from functools import partial
from typing import Any
from typing import Iterator
from typing import Optional
from jsonschema.exceptions import FormatError
from jsonschema.protocols import Validator
from jsonschema_path import SchemaPath
from openapi_core.validation.schemas.datatypes import FormatValidator
from openapi_core.validation.schemas.exceptions import InvalidSchemaValue
from openapi_core.validation.schemas.exceptions import ValidateError
log = logging.getLogger(__name__)
class SchemaValidator:
def __init__(
self,
schema: SchemaPath,
validator: Validator,
):
self.schema = schema
self.validator = validator
def __contains__(self, schema_format: str) -> bool:
return schema_format in self.validator.format_checker.checkers
def validate(self, value: Any) -> None:
errors_iter = self.validator.iter_errors(value)
errors = tuple(errors_iter)
if errors:
schema_type = self.schema.getkey("type", "any")
raise InvalidSchemaValue(value, schema_type, schema_errors=errors)
def evolve(self, schema: SchemaPath) -> "SchemaValidator":
cls = self.__class__
with schema.resolve() as resolved:
validator = self.validator.evolve(
schema=resolved.contents, _resolver=resolved.resolver
)
return cls(schema, validator)
def type_validator(
self, value: Any, type_override: Optional[str] = None
) -> bool:
callable = self.get_type_validator_callable(
type_override=type_override
)
return callable(value)
def format_validator(self, value: Any) -> bool:
try:
self.format_validator_callable(value)
except FormatError:
return False
else:
return True
def get_type_validator_callable(
self, type_override: Optional[str] = None
) -> FormatValidator:
schema_type = type_override or self.schema.getkey("type")
if schema_type in self.validator.TYPE_CHECKER._type_checkers:
return partial(
self.validator.TYPE_CHECKER.is_type, type=schema_type
)
return lambda x: True
@cached_property
def format_validator_callable(self) -> FormatValidator:
schema_format = self.schema.getkey("format")
if schema_format in self.validator.format_checker.checkers:
return partial(
self.validator.format_checker.check, format=schema_format
)
return lambda x: True
def get_primitive_type(self, value: Any) -> Optional[str]:
schema_types = self.schema.getkey("type")
if isinstance(schema_types, str):
return schema_types
if schema_types is None:
schema_types = sorted(self.validator.TYPE_CHECKER._type_checkers)
assert isinstance(schema_types, list)
for schema_type in schema_types:
result = self.type_validator(value, type_override=schema_type)
if not result:
continue
result = self.format_validator(value)
if not result:
continue
assert isinstance(schema_type, (str, type(None)))
return schema_type
# OpenAPI 3.0: None is not a primitive type so None value will not find any type
return None
def iter_valid_schemas(self, value: Any) -> Iterator[SchemaPath]:
yield self.schema
one_of_schema = self.get_one_of_schema(value)
if one_of_schema is not None:
yield one_of_schema
yield from self.iter_any_of_schemas(value)
yield from self.iter_all_of_schemas(value)
def get_one_of_schema(
self,
value: Any,
) -> Optional[SchemaPath]:
if "oneOf" not in self.schema:
return None
one_of_schemas = self.schema / "oneOf"
for subschema in one_of_schemas:
validator = self.evolve(subschema)
try:
validator.validate(value)
except ValidateError:
continue
else:
return subschema
log.warning("valid oneOf schema not found")
return None
def iter_any_of_schemas(
self,
value: Any,
) -> Iterator[SchemaPath]:
if "anyOf" not in self.schema:
return
any_of_schemas = self.schema / "anyOf"
for subschema in any_of_schemas:
validator = self.evolve(subschema)
try:
validator.validate(value)
except ValidateError:
continue
else:
yield subschema
def iter_all_of_schemas(
self,
value: Any,
) -> Iterator[SchemaPath]:
if "allOf" not in self.schema:
return
all_of_schemas = self.schema / "allOf"
for subschema in all_of_schemas:
if "type" not in subschema:
continue
validator = self.evolve(subschema)
try:
validator.validate(value)
except ValidateError:
log.warning("invalid allOf schema found")
else:
yield subschema
|