1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314
|
import json
import warnings
from dataclasses import dataclass, field
from typing import Any, Callable, Dict, Iterable, List, Optional, Sequence, Type, Union
from xsdata.exceptions import ConverterWarning, ParserError
from xsdata.formats.bindings import AbstractParser, T
from xsdata.formats.converter import converter
from xsdata.formats.dataclass.context import XmlContext
from xsdata.formats.dataclass.models.elements import XmlMeta, XmlVar
from xsdata.formats.dataclass.parsers.config import ParserConfig
from xsdata.formats.dataclass.parsers.utils import ParserUtils
from xsdata.formats.dataclass.typing import get_args, get_origin
from xsdata.utils import collections
from xsdata.utils.constants import EMPTY_MAP
@dataclass
class JsonParser(AbstractParser):
"""
Json parser for dataclasses.
:param config: Parser configuration
:param context: Model context provider
:param load_factory: Replace the default json.load call with another
implementation
"""
config: ParserConfig = field(default_factory=ParserConfig)
context: XmlContext = field(default_factory=XmlContext)
load_factory: Callable = field(default=json.load)
def parse(self, source: Any, clazz: Optional[Type[T]] = None) -> T:
"""Parse the input stream or filename and return the resulting object
tree."""
data = self.load_json(source)
tp = self.verify_type(clazz, data)
with warnings.catch_warnings():
if self.config.fail_on_converter_warnings:
warnings.filterwarnings("error", category=ConverterWarning)
try:
if not isinstance(data, list):
return self.bind_dataclass(data, tp)
return [self.bind_dataclass(obj, tp) for obj in data] # type: ignore
except ConverterWarning as e:
raise ParserError(e)
def load_json(self, source: Any) -> Union[Dict, List]:
if not hasattr(source, "read"):
with open(source, "rb") as fp:
return self.load_factory(fp)
return self.load_factory(source)
def verify_type(self, clazz: Optional[Type[T]], data: Union[Dict, List]) -> Type[T]:
if clazz is None:
return self.detect_type(data)
try:
origin = get_origin(clazz)
list_type = False
if origin is list:
list_type = True
args = get_args(clazz)
if len(args) != 1 or not self.context.class_type.is_model(args[0]):
raise TypeError()
clazz = args[0]
elif origin is not None:
raise TypeError()
except TypeError:
raise ParserError(f"Invalid clazz argument: {clazz}")
if list_type != isinstance(data, list):
if list_type:
raise ParserError("Document is object, expected array")
raise ParserError("Document is array, expected object")
return clazz # type: ignore
def detect_type(self, data: Union[Dict, List]) -> Type[T]:
if not data:
raise ParserError("Document is empty, can not detect type")
keys = data[0].keys() if isinstance(data, list) else data.keys()
clazz: Optional[Type[T]] = self.context.find_type_by_fields(set(keys))
if clazz:
return clazz
raise ParserError(f"Unable to locate model with properties({list(keys)})")
def bind_dataclass(self, data: Dict, clazz: Type[T]) -> T:
"""Recursively build the given model from the input dict data."""
if set(data.keys()) == self.context.class_type.derived_keys:
return self.bind_derived_dataclass(data, clazz)
meta = self.context.build(clazz)
xml_vars = meta.get_all_vars()
params = {}
for key, value in data.items():
is_array = collections.is_array(value)
var = self.find_var(xml_vars, key, is_array)
if var is None and self.config.fail_on_unknown_properties:
raise ParserError(f"Unknown property {clazz.__qualname__}.{key}")
if var and var.init:
params[var.name] = self.bind_value(meta, var, value)
try:
return self.config.class_factory(clazz, params)
except TypeError as e:
raise ParserError(e)
def bind_derived_dataclass(self, data: Dict, clazz: Type[T]) -> Any:
qname = data["qname"]
xsi_type = data["type"]
params = data["value"]
generic = self.context.class_type.derived_element
if clazz is generic:
real_clazz: Optional[Type[T]] = None
if xsi_type:
real_clazz = self.context.find_type(xsi_type)
if real_clazz is None:
raise ParserError(
f"Unable to locate derived model "
f"with properties({list(params.keys())})"
)
value = self.bind_dataclass(params, real_clazz)
else:
value = self.bind_dataclass(params, clazz)
return generic(qname=qname, type=xsi_type, value=value)
def bind_best_dataclass(self, data: Dict, classes: Iterable[Type[T]]) -> T:
"""Attempt to bind the given data to one possible models, if more than
one is successful return the object with the highest score."""
obj = None
keys = set(data.keys())
max_score = -1.0
for clazz in classes:
if not self.context.class_type.is_model(clazz):
continue
if self.context.local_names_match(keys, clazz):
candidate = self.bind_optional_dataclass(data, clazz)
score = self.context.class_type.score_object(candidate)
if score > max_score:
max_score = score
obj = candidate
if obj:
return obj
raise ParserError(
f"Failed to bind object with properties({list(data.keys())}) "
f"to any of the {[cls.__qualname__ for cls in classes]}"
)
def bind_optional_dataclass(self, data: Dict, clazz: Type[T]) -> Optional[T]:
"""Recursively build the given model from the input dict data but fail
on any converter warnings."""
try:
with warnings.catch_warnings():
warnings.filterwarnings("error", category=ConverterWarning)
return self.bind_dataclass(data, clazz)
except Exception:
return None
def bind_value(
self, meta: XmlMeta, var: XmlVar, value: Any, recursive: bool = False
) -> Any:
"""Main entry point for binding values."""
# xs:anyAttributes get it out of the way, it's the mapping exception!
if var.is_attributes:
return dict(value)
# Repeating element, recursively bind the values
if not recursive and var.list_element and isinstance(value, list):
assert var.factory is not None
return var.factory(self.bind_value(meta, var, val, True) for val in value)
# If not dict this is an text or tokens value.
if not isinstance(value, dict):
return self.bind_text(meta, var, value)
keys = value.keys()
if keys == self.context.class_type.any_keys:
# Bind data to AnyElement dataclass
return self.bind_dataclass(value, self.context.class_type.any_element)
if keys == self.context.class_type.derived_keys:
# Bind data to AnyElement dataclass
return self.bind_derived_value(meta, var, value)
# Bind data to a user defined dataclass
return self.bind_complex_type(meta, var, value)
def bind_text(self, meta: XmlMeta, var: XmlVar, value: Any) -> Any:
"""Bind text/tokens value entrypoint."""
if var.is_elements:
# Compound field we need to match the value to one of the choice elements
check_subclass = self.context.class_type.is_model(value)
choice = var.find_value_choice(value, check_subclass)
if choice:
return self.bind_text(meta, choice, value)
if value is None:
return value
raise ParserError(
f"Failed to bind '{value}' "
f"to {meta.clazz.__qualname__}.{var.name} field"
)
if var.any_type or var.is_wildcard:
# field can support any object return the value as it is
return value
value = converter.serialize(value)
# Convert value according to the field types
return ParserUtils.parse_value(
value=value,
types=var.types,
default=var.default,
ns_map=EMPTY_MAP,
tokens_factory=var.tokens_factory,
format=var.format,
)
def bind_complex_type(self, meta: XmlMeta, var: XmlVar, data: Dict) -> Any:
"""Bind data to a user defined dataclass."""
if var.is_clazz_union:
# Union of dataclasses
return self.bind_best_dataclass(data, var.types)
if var.elements:
# Compound field with multiple choices
return self.bind_best_dataclass(data, var.element_types)
if var.any_type or var.is_wildcard:
# xs:anyType element, check all meta classes
return self.bind_best_dataclass(data, meta.element_types)
assert var.clazz is not None
subclasses = set(self.context.get_subclasses(var.clazz))
if subclasses:
# field annotation is an abstract/base type
subclasses.add(var.clazz)
return self.bind_best_dataclass(data, subclasses)
return self.bind_dataclass(data, var.clazz)
def bind_derived_value(self, meta: XmlMeta, var: XmlVar, data: Dict) -> Any:
"""Bind derived element entry point."""
qname = data["qname"]
xsi_type = data["type"]
params = data["value"]
if var.elements:
choice = var.find_choice(qname)
if choice is None:
raise ParserError(
f"Unable to locate compound element"
f" {meta.clazz.__qualname__}.{var.name}[{qname}]"
)
return self.bind_derived_value(meta, choice, data)
if not isinstance(params, dict):
value = self.bind_text(meta, var, params)
elif xsi_type:
clazz: Optional[Type] = self.context.find_type(xsi_type)
if clazz is None:
raise ParserError(f"Unable to locate xsi:type `{xsi_type}`")
value = self.bind_dataclass(params, clazz)
elif var.clazz:
value = self.bind_complex_type(meta, var, params)
else:
value = self.bind_best_dataclass(params, meta.element_types)
generic = self.context.class_type.derived_element
return generic(qname=qname, value=value, type=xsi_type)
@classmethod
def find_var(
cls, xml_vars: Sequence[XmlVar], local_name: str, is_list: bool = False
) -> Optional[XmlVar]:
for var in xml_vars:
if var.local_name == local_name:
var_is_list = var.list_element or var.tokens
if is_list == var_is_list or var.clazz is None:
return var
return None
@dataclass
class DictConverter(JsonParser):
def convert(self, data: Dict, clazz: Type[T]) -> T:
return self.bind_dataclass(data, clazz)
|