1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398
|
import sys
from functools import lru_cache
from asdf.tagged import Tagged
from asdf.util import get_class_name, uri_match
from ._extension import ExtensionProxy
def _resolve_type(path):
"""
Convert a class path (like the string "asdf.AsdfFile") to a
class (``asdf.AsdfFile``) only if the module implementing the
class has already been imported.
Parameters
----------
path : str
Path/name of class (for example, "asdf.AsdfFile")
Returns
-------
typ : class or None
The class (if it's already been imported) or None
"""
if "." not in path:
# check if this path is a module
if path in sys.modules:
return sys.modules[path]
return None
# this type is part of a module
module_name, type_name = path.rsplit(".", maxsplit=1)
# if the module is not imported, don't index it
if module_name not in sys.modules:
return None
module = sys.modules[module_name]
if not hasattr(module, type_name):
# the imported module does not have this class, perhaps
# it is dynamically created so do not index it yet
return None
return getattr(module, type_name)
class ExtensionManager:
"""
Wraps a list of extensions and indexes their converters
by tag and by Python type.
Parameters
----------
extensions : iterable of asdf.extension.Extension
List of enabled extensions to manage. Extensions placed earlier
in the list take precedence.
"""
def __init__(self, extensions):
self._extensions = [ExtensionProxy.maybe_wrap(e) for e in extensions]
self._tag_defs_by_tag = {}
self._converters_by_tag = {}
# To optimize performance converters can be registered using either:
# - the class/type they convert
# - the name/path (string) of the class/type they convert
# This allows the registration to continue without importing
# every module for every extension (which would be needed to turn
# the class paths into proper classes). Using class paths can be
# complicated by packages that have private implementations of
# classes that are exposed at a different 'public' location.
# These private classes may change between minor versions
# and would break converters that are registered using the private
# class path. However, often libraries do not modify the module
# of the 'public' class (so inspecting the class path returns
# the private class path). One example of this in asdf is
# Converter (exposed as ``asdf.extension.Converter`` but with
# a class path of ``asdf.extension._converter.Converter``).
# To allow converters to be registered with the public location
# we will need to attempt to import the public class path
# and then register the private class path after the class is
# imported. We don't want to do this unnecessarily and since
# class instances do not contain the public class path
# we adopt a strategy of checking class paths and only
# registering those that have already been imported. This
# is ok because asdf will only use the converter type
# when attempting to serialize an object in memory (so the
# public class path will already be imported at the time
# the converter is needed).
# first we store the converters in the order they are discovered
# the key here can either be a class path (str) or class (type)
converters_by_type = {}
validators = set()
for extension in self._extensions:
for tag_def in extension.tags:
if tag_def.tag_uri not in self._tag_defs_by_tag:
self._tag_defs_by_tag[tag_def.tag_uri] = tag_def
for converter in extension.converters:
for tag in converter.tags:
if tag not in self._converters_by_tag:
self._converters_by_tag[tag] = converter
for typ in converter.types:
if typ not in converters_by_type:
converters_by_type[typ] = converter
validators.update(extension.validators)
self._converters_by_class_path = {}
self._converters_by_type = {}
for type_or_path, converter in converters_by_type.items():
if isinstance(type_or_path, str):
path = type_or_path
typ = _resolve_type(path)
if typ is None:
if path not in self._converters_by_class_path:
self._converters_by_class_path[path] = converter
continue
else:
typ = type_or_path
if typ not in self._converters_by_type:
self._converters_by_type[typ] = converter
self._validator_manager = _get_cached_validator_manager(tuple(validators))
@property
def extensions(self):
"""
Get the list of extensions.
Returns
-------
list of asdf.extension.ExtensionProxy
"""
return self._extensions
def handles_tag(self, tag):
"""
Return `True` if the specified tag is handled by a
converter.
Parameters
----------
tag : str
Tag URI.
Returns
-------
bool
"""
return tag in self._converters_by_tag
def handles_type(self, typ):
"""
Returns `True` if the specified Python type is handled
by a converter.
Parameters
----------
typ : type
Returns
-------
bool
"""
if typ in self._converters_by_type:
return True
self._index_converters()
return typ in self._converters_by_type
def handles_tag_definition(self, tag):
"""
Return `True` if the specified tag has a definition.
Parameters
----------
tag : str
Tag URI.
Returns
-------
bool
"""
return tag in self._tag_defs_by_tag
def get_tag_definition(self, tag):
"""
Get the tag definition for the specified tag.
Parameters
----------
tag : str
Tag URI.
Returns
-------
asdf.extension.TagDefinition
Raises
------
KeyError
Unrecognized tag URI.
"""
try:
return self._tag_defs_by_tag[tag]
except KeyError:
msg = f"No support available for YAML tag '{tag}'. You may need to install a missing extension."
raise KeyError(msg) from None
def get_converter_for_tag(self, tag):
"""
Get the converter for the specified tag.
Parameters
----------
tag : str
Tag URI.
Returns
-------
asdf.extension.Converter
Raises
------
KeyError
Unrecognized tag URI.
"""
try:
return self._converters_by_tag[tag]
except KeyError:
msg = f"No support available for YAML tag '{tag}'. You may need to install a missing extension."
raise KeyError(msg) from None
def get_converter_for_type(self, typ):
"""
Get the converter for the specified Python type.
Parameters
----------
typ : type
Returns
-------
asdf.extension.Converter
Raises
------
KeyError
Unrecognized type.
"""
if typ not in self._converters_by_type:
self._index_converters()
try:
return self._converters_by_type[typ]
except KeyError:
msg = (
f"No support available for Python type '{get_class_name(typ, instance=False)}'. "
"You may need to install or enable an extension."
)
raise KeyError(msg) from None
def _index_converters(self):
"""
Search _converters_by_class_path for paths (strings) that
refer to classes that are currently imported. For imported
classes, add them to _converters_by_class (if the class
doesn't already have a converter).
"""
# search class paths to find ones that are imported
for class_path in list(self._converters_by_class_path):
typ = _resolve_type(class_path)
if typ is None:
continue
if typ not in self._converters_by_type:
self._converters_by_type[typ] = self._converters_by_class_path[class_path]
del self._converters_by_class_path[class_path]
@property
def validator_manager(self):
return self._validator_manager
def get_cached_extension_manager(extensions):
"""
Get a previously created ExtensionManager for the specified
extensions, or create and cache one if necessary. Building
the manager is expensive, so it helps performance to reuse
it when possible.
Parameters
----------
extensions : list of asdf.extension.Extension
Returns
-------
asdf.extension.ExtensionManager
"""
from ._extension import ExtensionProxy
# The tuple makes the extensions hashable so that we
# can pass them to the lru_cache method. The ExtensionProxy
# overrides __hash__ to return the hashed object id of the wrapped
# extension, so this will method will only return the same
# ExtensionManager if the list contains identical extension
# instances in identical order.
extensions = tuple(ExtensionProxy.maybe_wrap(e) for e in extensions)
return _get_cached_extension_manager(extensions)
@lru_cache
def _get_cached_extension_manager(extensions):
return ExtensionManager(extensions)
class ValidatorManager:
"""
Wraps a list of custom validators and indexes them by schema property.
Parameters
----------
validators : iterable of asdf.extension.Validator
List of validators to manage.
"""
def __init__(self, validators):
self._validators = list(validators)
self._validators_by_schema_property = {}
for validator in self._validators:
if validator.schema_property not in self._validators_by_schema_property:
self._validators_by_schema_property[validator.schema_property] = set()
self._validators_by_schema_property[validator.schema_property].add(validator)
self._jsonschema_validators_by_schema_property = {}
for schema_property in self._validators_by_schema_property:
self._jsonschema_validators_by_schema_property[schema_property] = self._get_jsonschema_validator(
schema_property,
)
def validate(self, schema_property, schema_property_value, node, schema):
"""
Validate an ASDF tree node against custom validators for a schema property.
Parameters
----------
schema_property : str
Name of the schema property (identifies the validator(s) to use).
schema_property_value : object
Value of the schema property.
node : asdf.tagged.Tagged
The ASDF node to validate.
schema : dict
The schema object that contains the property that triggered
the validation.
Yields
------
asdf.exceptions.ValidationError
"""
if schema_property in self._validators_by_schema_property:
for validator in self._validators_by_schema_property[schema_property]:
if _validator_matches(validator, node):
yield from validator.validate(schema_property_value, node, schema)
def get_jsonschema_validators(self):
"""
Get a dictionary of validator methods suitable for use
with the jsonschema library.
Returns
-------
dict of str: callable
"""
return dict(self._jsonschema_validators_by_schema_property)
def _get_jsonschema_validator(self, schema_property):
def _validator(_, schema_property_value, node, schema):
return self.validate(schema_property, schema_property_value, node, schema)
return _validator
def _validator_matches(validator, node):
if any(t == "**" for t in validator.tags):
return True
if not isinstance(node, Tagged):
return False
return any(uri_match(t, node._tag) for t in validator.tags)
@lru_cache
def _get_cached_validator_manager(validators):
return ValidatorManager(validators)
|