File: native.py

package info (click to toggle)
python-xsdata 24.1-2
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 2,936 kB
  • sloc: python: 29,257; xml: 404; makefile: 27; sh: 6
file content (123 lines) | stat: -rw-r--r-- 4,127 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
import functools
from typing import Any, Dict, Iterable, Iterator, Optional, Tuple
from urllib.parse import urljoin
from xml.etree import ElementInclude as xinclude
from xml.etree import ElementTree as etree

from xsdata.exceptions import XmlHandlerError
from xsdata.formats.dataclass.parsers.mixins import XmlHandler
from xsdata.models.enums import EventType
from xsdata.utils import namespaces

EVENTS = (EventType.START, EventType.END, EventType.START_NS)


class XmlEventHandler(XmlHandler):
    """
    Event handler based on :func:`xml.etree.ElementTree.iterparse` api.

    :param parser: The parser instance to feed with events
    :param clazz: The target binding model, auto located if omitted.
    """

    __slots__ = ()

    def parse(self, source: Any) -> Any:
        """
        Parse an XML document from a system identifier or an InputSource or
        directly from an xml Element or ElementTree.

        When source is an Element or ElementTree the handler will walk
        over the objects structure.

        When source is a system identifier or an InputSource the parser
        will ignore comments and recover from errors.

        When config process_xinclude is enabled the handler will parse
        the whole document and then walk down the element tree.
        """
        if isinstance(source, etree.ElementTree):
            source = source.getroot()

        if isinstance(source, etree.Element):
            ctx = iterwalk(source, {})
        elif self.parser.config.process_xinclude:
            root = etree.parse(source).getroot()  # nosec
            base_url = get_base_url(self.parser.config.base_url, source)
            loader = functools.partial(xinclude_loader, base_url=base_url)

            xinclude.include(root, loader=loader)
            ctx = iterwalk(root, {})
        else:
            ctx = etree.iterparse(source, EVENTS)  # nosec

        return self.process_context(ctx)

    def process_context(self, context: Iterable) -> Any:
        """Iterate context and push the events to main parser."""
        ns_map: Dict = {}
        for event, element in context:
            if event == EventType.START:
                self.parser.start(
                    self.clazz,
                    self.queue,
                    self.objects,
                    element.tag,
                    element.attrib,
                    self.merge_parent_namespaces(ns_map),
                )
                ns_map = {}
            elif event == EventType.END:
                self.parser.end(
                    self.queue,
                    self.objects,
                    element.tag,
                    element.text,
                    element.tail,
                )
                element.clear()
            elif event == EventType.START_NS:
                prefix, uri = element
                ns_map[prefix or None] = uri
            else:
                raise XmlHandlerError(f"Unhandled event: `{event}`.")

        return self.objects[-1][1] if self.objects else None


def iterwalk(element: etree.Element, ns_map: Dict) -> Iterator[Tuple[str, Any]]:
    """
    Walk over the element tree structure and emit start-ns/start/end events.

    The ElementTree doesn't preserve the original namespace prefixes, we
    have to generate new ones.
    """
    uri = namespaces.target_uri(element.tag)
    if uri is not None:
        prefix = namespaces.load_prefix(uri, ns_map)
        yield EventType.START_NS, (prefix, uri)

    yield EventType.START, element

    for child in element:
        yield from iterwalk(child, ns_map)

    yield EventType.END, element


def get_base_url(base_url: Optional[str], source: Any) -> Optional[str]:
    if base_url:
        return base_url

    return source if isinstance(source, str) else None


def xinclude_loader(
    href: str,
    parse: str,
    encoding: Optional[str] = None,
    base_url: Optional[str] = None,
) -> Any:
    """Custom loader for xinclude to support base_url argument that doesn't
    exist for python < 3.9."""
    return xinclude.default_loader(urljoin(base_url or "", href), parse, encoding)