1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261
|
import sys
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Type, Union
from urllib.parse import urljoin
from xsdata.formats.bindings import T
from xsdata.formats.dataclass.parsers.bases import Parsed
from xsdata.formats.dataclass.parsers.mixins import XmlNode
from xsdata.formats.dataclass.parsers.xml import UserXmlParser
from xsdata.models import xsd
from xsdata.models.enums import FormType, Mode, Namespace
from xsdata.models.mixins import ElementBase
OPEN_CONTENT_ELEMENT = Union[xsd.ComplexType, xsd.Restriction, xsd.Extension]
@dataclass
class SchemaParser(UserXmlParser):
"""
A simple parser to convert an xsd schema to an easy to handle data
structure based on dataclasses.
The parser is as a dummy as possible, but it will try to normalize
certain things like apply parent properties to children.
:param location:
:param element_form:
:param attribute_form:
:param target_namespace:
:param default_attributes:
:param default_open_content:
"""
index: int = field(default_factory=int)
indices: List[int] = field(default_factory=list)
location: Optional[str] = field(default=None)
element_form: Optional[FormType] = field(init=False, default=None)
attribute_form: Optional[FormType] = field(init=False, default=None)
target_namespace: Optional[str] = field(default=None)
default_attributes: Optional[str] = field(default=None)
default_open_content: Optional[xsd.DefaultOpenContent] = field(default=None)
def start(
self,
clazz: Optional[Type[T]],
queue: List[XmlNode],
objects: List[Parsed],
qname: str,
attrs: Dict,
ns_map: Dict,
):
self.index += 1
self.indices.append(self.index)
super().start(clazz, queue, objects, qname, attrs, ns_map)
def end(
self,
queue: List[XmlNode],
objects: List[Parsed],
qname: str,
text: Optional[str],
tail: Optional[str],
) -> Any:
"""Override parent method to set element index and namespaces map."""
item = queue[-1]
super().end(queue, objects, qname, text, tail)
obj = objects[-1][1]
self.set_index(obj, self.indices.pop())
self.set_namespace_map(obj, getattr(item, "ns_map", None))
return obj
def start_schema(self, attrs: Dict):
"""Collect the schema's default form for attributes and elements for
later usage."""
self.element_form = attrs.get("elementFormDefault", None)
self.attribute_form = attrs.get("attributeFormDefault", None)
self.default_attributes = attrs.get("defaultAttributes", None)
def end_schema(self, obj: T):
"""Normalize various properties for the schema and it's children."""
if isinstance(obj, xsd.Schema):
self.set_schema_forms(obj)
self.set_schema_namespaces(obj)
self.add_default_imports(obj)
self.resolve_schemas_locations(obj)
self.reset_element_occurs(obj)
def end_attribute(self, obj: T):
"""Assign the schema's default form for attributes if the given
attribute form is None."""
if isinstance(obj, xsd.Attribute) and obj.form is None and self.attribute_form:
obj.form = FormType(self.attribute_form)
def end_complex_type(self, obj: T):
"""
Post parsing processor to apply default open content and attributes if
applicable.
Default open content doesn't apply if the current complex type
has one of complex content, simple content or has its own open
content.
"""
if not isinstance(obj, xsd.ComplexType):
return
if obj.default_attributes_apply and self.default_attributes:
attribute_group = xsd.AttributeGroup(ref=self.default_attributes)
obj.attribute_groups.insert(0, attribute_group)
if (
obj.simple_content
or obj.complex_content
or obj.open_content
or not self.default_open_content
):
return
if self.default_open_content.applies_to_empty or self.has_elements(obj):
obj.open_content = self.default_open_content
def end_default_open_content(self, obj: T):
"""Set the instance default open content to be used later as a property
for all extensions and restrictions."""
if isinstance(obj, xsd.DefaultOpenContent):
if obj.any and obj.mode == Mode.SUFFIX:
obj.any.index = sys.maxsize
self.default_open_content = obj
def end_element(self, obj: T):
"""Assign the schema's default form for elements if the given element
form is None."""
if isinstance(obj, xsd.Element) and obj.form is None and self.element_form:
obj.form = FormType(self.element_form)
def end_extension(self, obj: T):
"""Set the open content if any to the given extension."""
if isinstance(obj, xsd.Extension) and not obj.open_content:
obj.open_content = self.default_open_content
@classmethod
def end_open_content(cls, obj: T):
"""Adjust the index to trick later processors into putting attributes
derived from this open content last in classes."""
if isinstance(obj, xsd.OpenContent) and obj.any and obj.mode == Mode.SUFFIX:
obj.any.index = sys.maxsize
def end_restriction(self, obj: T):
"""Set the open content if any to the given restriction."""
if isinstance(obj, xsd.Restriction) and not obj.open_content:
obj.open_content = self.default_open_content
def set_schema_forms(self, obj: xsd.Schema):
"""
Set the default form type for elements and attributes.
Global elements and attributes are by default qualified.
"""
if self.element_form:
obj.element_form_default = FormType(self.element_form)
if self.attribute_form:
obj.attribute_form_default = FormType(self.attribute_form)
for child_element in obj.elements:
child_element.form = FormType.QUALIFIED
for child_attribute in obj.attributes:
child_attribute.form = FormType.QUALIFIED
def set_schema_namespaces(self, obj: xsd.Schema):
"""Set the given schema's target namespace and add the default
namespaces if the are missing xsi, xlink, xml, xs."""
obj.target_namespace = obj.target_namespace or self.target_namespace
def resolve_schemas_locations(self, obj: xsd.Schema):
"""Resolve the locations of the schema overrides, redefines, includes
and imports relatively to the schema location."""
if not self.location:
return
obj.location = self.location
for over in obj.overrides:
over.location = self.resolve_path(over.schema_location)
for red in obj.redefines:
red.location = self.resolve_path(red.schema_location)
for inc in obj.includes:
inc.location = self.resolve_path(inc.schema_location)
for imp in obj.imports:
imp.location = self.resolve_local_path(imp.schema_location, imp.namespace)
def resolve_path(self, location: Optional[str]) -> Optional[str]:
"""Resolve the given location string relatively the schema location
path."""
return urljoin(self.location, location) if self.location and location else None
def resolve_local_path(
self, location: Optional[str], namespace: Optional[str]
) -> Optional[str]:
"""Resolve the given namespace to one of the local standard schemas or
fallback to the external file path."""
common_ns = Namespace.get_enum(namespace)
local_path = common_ns.location if common_ns else None
if local_path and (not location or location.find("w3.org/") > 0):
return local_path
return self.resolve_path(location)
@classmethod
def has_elements(cls, obj: ElementBase) -> bool:
accepted_types = (xsd.Element, xsd.Any, xsd.Group)
return any(
isinstance(child, accepted_types) or cls.has_elements(child)
for child in obj.children()
)
@classmethod
def set_namespace_map(cls, obj: Any, ns_map: Optional[Dict]):
"""Add common namespaces like xml, xsi, xlink if they are missing."""
if hasattr(obj, "ns_map"):
if ns_map:
obj.ns_map.update(
{prefix: uri for prefix, uri in ns_map.items() if uri}
)
ns_list = obj.ns_map.values()
obj.ns_map.update(
{
ns.prefix: ns.uri
for ns in Namespace.common()
if ns.uri not in ns_list
}
)
@classmethod
def set_index(cls, obj: Any, index: int):
if hasattr(obj, "index"):
obj.index = index
@classmethod
def add_default_imports(cls, obj: xsd.Schema):
"""Add missing imports to the standard schemas if the namespace is
declared and."""
imp_namespaces = [imp.namespace for imp in obj.imports]
xsi_ns = Namespace.XSI.uri
if xsi_ns in obj.ns_map.values() and xsi_ns not in imp_namespaces:
obj.imports.insert(0, xsd.Import(namespace=xsi_ns))
@classmethod
def reset_element_occurs(cls, obj: xsd.Schema):
for element in obj.elements:
element.min_occurs = None
element.max_occurs = None
|