1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258
|
from __future__ import annotations
import contextlib
import functools
import logging
from collections.abc import Mapping
from typing import Any
from typing import Callable
from typing import Generator
from typing import TYPE_CHECKING
from jsonschema import validators
from jsonschema.validators import Draft4Validator
from jsonschema.validators import RefResolver
if TYPE_CHECKING:
from jsonschema.exceptions import _Error
from jsonschema.validators import _Validator
from swagger_spec_validator import common
log = logging.getLogger(__name__)
default_handlers = {
"http": common.read_url,
"https": common.read_url,
"file": common.read_url,
}
def validate(
instance: object,
schema: Mapping[str, Any],
instance_cls: type[_Validator],
cls: type[_Validator] | None = None,
*args: Any,
**kwargs: Any,
) -> None:
"""This is a carbon-copy of :method:`jsonschema.validate` except that it
takes two validator classes instead of just one. In the jsonschema
implementation, `cls` is used to validate both the schema and the
instance. This changes the behavior to have a separate validator for
each of schema and instance. Schema should not be validated with the
custom validator returned by :method:`create_dereffing_validator` because
it follows $refs.
:param instance: the instance to validate
:param schema: the schema to validate with
:param instance_cls: Validator class to validate instance.
:param cls: Validator class to validate schema.
:raises:
:exc:`ValidationError` if the instance is invalid
:exc:`SchemaError` if the schema itself is invalid
"""
if cls is None:
cls = validators.validator_for(schema)
cls.check_schema(schema)
instance_cls(schema, *args, **kwargs).validate(instance)
def create_dereffing_validator(instance_resolver: RefResolver) -> type[_Validator]:
"""Create a customized Draft4Validator that follows $refs in the schema
being validated (the Swagger spec for a service). This is not to be
confused with $refs that are in the schema that describes the Swagger 2.0
specification.
:param instance_resolver: resolver for the swagger service's spec
:type instance_resolver: :class:`jsonschema.RefResolver`
:rtype: Its complicated. See jsonschema.validators.create()
"""
visited_refs: dict[str, str] = {}
validators_to_bound = {
"$ref",
"additionalProperties",
"allOf",
"anyOf",
"dependencies",
"maxProperties",
"minProperties",
"not",
"oneOf",
"patternProperties",
"properties",
"required",
"type",
}
bound_validators = {
k: functools.partial(
validator_wrapper,
instance_resolver=instance_resolver,
visited_refs=visited_refs,
default_validator_callable=v,
)
if k in validators_to_bound
else v
for k, v in Draft4Validator.VALIDATORS.items()
}
return validators.extend(Draft4Validator, bound_validators)
def validate_schema_value(
schema: Mapping[str, Any],
value: Any,
swagger_resolver: RefResolver | None = None,
) -> None:
# pass resolver to avoid to refetch schema files
if swagger_resolver is None:
swagger_resolver = RefResolver.from_schema(schema)
create_dereffing_validator(swagger_resolver)(
schema, resolver=swagger_resolver
).validate(value)
@contextlib.contextmanager
def visiting(visited_refs: dict[str, str], ref: str) -> Generator[None, None, None]:
"""Context manager that keeps track of $refs that we've seen during
validation.
:param visited_refs: dict of $refs
:param ref: string $ref value
"""
visited_refs[ref] = ref
try:
yield
finally:
del visited_refs[ref]
def validator_wrapper(
validator: type[_Validator],
schema_element: Any,
instance: dict[str, Any],
schema: Mapping[str, Any],
instance_resolver: RefResolver,
visited_refs: dict[str, str],
default_validator_callable: Callable,
) -> Generator[_Error, None, None]:
"""Generator function that parameterizes default_validator_callable.
:type validator: :class:`jsonschema.validators.Validator`
:param schema_element: The schema element that is passed in to each
specific validator callable aka the 2nd arg in each
jsonschema._validators.* callable.
:param instance: The fragment of the swagger service spec that is being
validated.
:param schema: The fragment of the swagger jsonschema spec that describes
is used for validation.
:param instance_resolver: Resolves refs in the swagger service spec
:param visited_refs: Keeps track of visisted refs during validation of
the swagger service spec.
:param default_validator_callable: jsonschema._validators.* callable
"""
yield from deref_and_validate(
validator,
schema_element,
instance,
schema,
instance_resolver,
visited_refs,
default_validator_callable,
)
def deref_and_validate(
validator: type[_Validator],
schema_element: Any,
instance: dict[str, Any],
schema: Mapping[str, Any],
instance_resolver: RefResolver,
visited_refs: dict[str, str],
default_validator_callable: Callable,
) -> Generator[_Error, None, None]:
"""Generator function that dereferences instance if it is a $ref before
passing it downstream for actual validation. When a cyclic ref is detected,
short-circuit and return.
:type validator: :class:`jsonschema.validators.Validator`
:param schema_element: The schema element that is passed in to each
specific validator callable aka the 2nd arg in each
jsonschema._validators.* callable.
:param instance: The fragment of the swagger service spec that is being
validated.
:param schema: The fragment of the swagger jsonschema spec that describes
is used for validation.
:param instance_resolver: Resolves refs in the swagger service spec
:param visited_refs: Keeps track of visisted refs during validation of
the swagger service spec.
:param default_validator_callable: jsonschema._validators.* callable
"""
if (
isinstance(instance, dict)
and "$ref" in instance
and isinstance(instance["$ref"], str)
):
ref = instance["$ref"]
# Annotate $ref dict with scope - used by custom validations
# We still need to attach the scope even if this is a cycle, as otherwise there are cases
# with specs split into multiple files where it can't be dereferenced properly
attach_scope(instance, instance_resolver)
if ref in visited_refs:
log.debug("Found cycle in %s", ref)
return
with visiting(visited_refs, ref):
with instance_resolver.resolving(ref) as target:
yield from default_validator_callable(
validator, schema_element, target, schema
)
else:
yield from default_validator_callable(
validator, schema_element, instance, schema
)
def attach_scope(ref_dict: dict[str, Any], instance_resolver: RefResolver) -> None:
"""Attach scope to each $ref we encounter so that the $ref can be
resolved by custom validations done outside the scope of jsonscema
validations.
:param ref_dict: dict with $ref key
:type instance_resolver: :class:`jsonschema.RefResolver`
"""
if "x-scope" in ref_dict:
log.debug("Ref %s already has scope attached", ref_dict["$ref"])
return
log.debug("Attaching x-scope to %s", ref_dict)
ref_dict["x-scope"] = list(instance_resolver._scopes_stack) # type: ignore
@contextlib.contextmanager
def in_scope(
resolver: RefResolver, ref_dict: dict[str, Any]
) -> Generator[None, None, None]:
"""Context manager to assume the given scope for the passed in resolver.
The resolver's original scope is restored when exiting the context manager.
:type resolver: :class:`jsonschema.RefResolver
:type ref_dict: dict
"""
if "x-scope" not in ref_dict:
yield
else:
saved_scope_stack = resolver._scopes_stack # type: ignore
try:
resolver._scopes_stack = ref_dict["x-scope"] # type: ignore
yield
finally:
resolver._scopes_stack = saved_scope_stack # type: ignore
|