1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116
|
#
# Copyright (c), 2025, SISSA (International School for Advanced Studies).
# All rights reserved.
# This file is distributed under the terms of the MIT License.
# See the file 'LICENSE' in the root directory of the present
# distribution, or http://opensource.org/licenses/MIT.
#
# @author Davide Brunato <brunato@sissa.it>
#
from collections.abc import Callable, Iterator, Sequence
from functools import partial
from typing import Any, Optional
from xml.etree import ElementTree
from xmlschema.aliases import AncestorsType, IOType, IterParseType, ElementType, NsmapType
from xmlschema.exceptions import XMLResourceParseError
from xmlschema.xpath import ElementPathSelector
FilterFunctionType = Callable[[ElementType, ElementType, AncestorsType], bool]
ClearFunctionType = Callable[[ElementType, ElementType, AncestorsType], None]
###
# Default filter and clear functions
def no_filter(r: ElementType, e: ElementType, a: AncestorsType) -> bool:
return True
def no_cleanup(root: ElementType, elem: ElementType, ancestors: AncestorsType) -> None:
return
def clear_elem(root: ElementType, elem: ElementType, ancestors: AncestorsType) -> None:
elem.clear()
###
# Iterparse generator function
def filtered_iterparse(fp: IOType,
events: Optional[Sequence[str]] = None,
filter_fn: Optional[FilterFunctionType] = None,
clear_fn: Optional[ClearFunctionType] = None,
ancestors: Optional[list[ElementType]] = None,
depth: int = 1) -> Iterator[tuple[str, Any]]:
"""
An event-based parser for filtering XML elements during parsing.
"""
if events is None:
events = 'start-ns', 'end-ns', 'start', 'end'
elif 'start' not in events or 'end' not in events:
events = tuple(events) + ('start', 'end')
if filter_fn is None:
filter_fn = no_filter
if clear_fn is None:
clear_fn = no_cleanup
level = 0
stop_node: Any = None
root: Any = None
node: Any
try:
for event, node in ElementTree.iterparse(fp, events):
if event == 'end':
level -= 1
if level < depth:
if ancestors is not None:
ancestors.pop()
elif level == depth and stop_node is node:
stop_node = None
clear_fn(root, node, ancestors)
elif event == 'start':
if level < depth:
if not level:
root = node
if ancestors is not None:
ancestors.append(node)
elif level == depth and not filter_fn(root, node, ancestors):
stop_node = node
level += 1
continue
level += 1
if stop_node is None:
yield event, node
else:
yield event, node
except SyntaxError as err:
raise XMLResourceParseError("invalid XML syntax: {}".format(err)) from err
def iterfind_parser(path: str,
namespaces: Optional[NsmapType] = None,
ancestors: AncestorsType = None) -> IterParseType:
selector = ElementPathSelector(path, namespaces)
def filter_fn(root: ElementType, node: ElementType, ancestors: AncestorsType) -> bool:
return selector.select_all or node in selector.iter_select(root)
def clear_fn(root: ElementType, node: ElementType, ancestors: AncestorsType) -> None:
node.clear()
if ancestors is not None:
if node in ancestors[-1]:
ancestors[-1].remove(node)
return partial(
filtered_iterparse,
filter_fn=filter_fn,
clear_fn=clear_fn,
ancestors=ancestors,
depth=selector.depth
)
|