1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123
|
import copy
import warnings
from typing import Any, Dict, List, Optional, Tuple, Type
from xsdata.exceptions import ConverterWarning, ParserError
from xsdata.formats.bindings import T
from xsdata.formats.dataclass.context import XmlContext
from xsdata.formats.dataclass.models.elements import XmlVar
from xsdata.formats.dataclass.parsers.bases import NodeParser
from xsdata.formats.dataclass.parsers.config import ParserConfig
from xsdata.formats.dataclass.parsers.mixins import EventsHandler, XmlNode
from xsdata.formats.dataclass.parsers.utils import ParserUtils
from xsdata.utils.namespaces import target_uri
class UnionNode(XmlNode):
"""
XmlNode for fields with multiple possible types where at least one of them
is a dataclass.
The node will record all child events and in the end will replay
them and try to build all possible objects and sort them by score
before deciding the winner.
:param var: Class field xml var instance
:param attrs: Key-value attribute mapping
:param ns_map: Namespace prefix-URI map
:param position: The node position of objects cache
:param config: Parser configuration
:param context: Model context provider
"""
__slots__ = (
"var",
"attrs",
"ns_map",
"position",
"config",
"context",
"level",
"events",
)
def __init__(
self,
var: XmlVar,
attrs: Dict,
ns_map: Dict,
position: int,
config: ParserConfig,
context: XmlContext,
):
self.var = var
self.attrs = attrs
self.ns_map = ns_map
self.position = position
self.config = config
self.context = context
self.level = 0
self.events: List[Tuple[str, str, Any, Any]] = []
def child(self, qname: str, attrs: Dict, ns_map: Dict, position: int) -> XmlNode:
self.level += 1
self.events.append(("start", qname, copy.deepcopy(attrs), ns_map))
return self
def bind(
self, qname: str, text: Optional[str], tail: Optional[str], objects: List
) -> bool:
self.events.append(("end", qname, text, tail))
if self.level > 0:
self.level -= 1
return False
self.events.insert(0, ("start", qname, copy.deepcopy(self.attrs), self.ns_map))
obj = None
max_score = -1.0
parent_namespace = target_uri(qname)
for clazz in self.var.types:
if self.context.class_type.is_model(clazz):
self.context.build(clazz, parent_ns=parent_namespace)
candidate = self.parse_class(clazz)
else:
candidate = self.parse_value(text, [clazz])
score = self.context.class_type.score_object(candidate)
if score > max_score:
max_score = score
obj = candidate
if obj:
objects.append((self.var.qname, obj))
return True
raise ParserError(f"Failed to parse union node: {self.var.qname}")
def parse_class(self, clazz: Type[T]) -> Optional[T]:
"""Initialize a new XmlParser and try to parse the given element, treat
converter warnings as errors and return None."""
try:
with warnings.catch_warnings():
warnings.filterwarnings("error", category=ConverterWarning)
parser = NodeParser(
config=self.config, context=self.context, handler=EventsHandler
)
return parser.parse(self.events, clazz)
except Exception:
return None
def parse_value(self, value: Any, types: List[Type]) -> Any:
"""Parse simple values, treat warnings as errors and return None."""
try:
with warnings.catch_warnings():
warnings.filterwarnings("error", category=ConverterWarning)
return ParserUtils.parse_value(
value=value, types=types, ns_map=self.ns_map
)
except Exception:
return None
|