File: columnar.py

package info (click to toggle)
python-xmlschema 4.1.0-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 5,208 kB
  • sloc: python: 39,174; xml: 1,282; makefile: 36
file content (174 lines) | stat: -rw-r--r-- 7,436 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
#
# Copyright (c), 2016-2021, SISSA (International School for Advanced Studies).
# All rights reserved.
# This file is distributed under the terms of the MIT License.
# See the file 'LICENSE' in the root directory of the present
# distribution, or http://opensource.org/licenses/MIT.
#
# @author Davide Brunato <brunato@sissa.it>
#
from collections.abc import MutableMapping, MutableSequence
from typing import TYPE_CHECKING, Any, Optional, Type

from xmlschema.exceptions import XMLSchemaTypeError, XMLSchemaValueError
from xmlschema.aliases import NsmapType, BaseXsdType
from xmlschema.resources import XMLResource

from .base import ElementData, XMLSchemaConverter

if TYPE_CHECKING:
    from xmlschema.validators import XsdElement


class ColumnarConverter(XMLSchemaConverter):
    """
    XML Schema based converter class for columnar formats.

    :param namespaces: map from namespace prefixes to URI.
    :param dict_class: dictionary class to use for decoded data. Default is `dict`.
    :param list_class: list class to use for decoded data. Default is `list`.
    :param attr_prefix: used as separator string for renaming the decoded attributes. \
    Can be the empty string (the default) or a single/double underscore.
    """
    __slots__ = ()

    def __init__(self, namespaces: Optional[NsmapType] = None,
                 dict_class: Optional[Type[dict[str, Any]]] = None,
                 list_class: Optional[Type[list[Any]]] = None,
                 attr_prefix: Optional[str] = '',
                 **kwargs: Any) -> None:
        kwargs.update(text_key=None, cdata_prefix=None)
        super().__init__(namespaces, dict_class, list_class,
                         attr_prefix=attr_prefix, **kwargs)

    @property
    def xmlns_processing_default(self) -> str:
        return 'stacked' if isinstance(self.source, XMLResource) else 'none'

    @property
    def lossy(self) -> bool:
        return True  # Loss cdata parts

    @property
    def loss_xmlns(self) -> bool:
        return True

    def __setattr__(self, name: str, value: Any) -> None:
        if name != 'attr_prefix':
            super().__setattr__(name, value)
        elif not isinstance(value, str):
            msg = "%(name)r must be a <class 'str'> instance, not %(type)r"
            raise XMLSchemaTypeError(msg % {'name': name, 'type': type(value)})
        elif value not in ('', '_', '__'):
            msg = '%r can be the empty string or a single/double underscore'
            raise XMLSchemaValueError(msg % name)
        else:
            super(XMLSchemaConverter, self).__setattr__(name, value)

    def element_decode(self, data: ElementData, xsd_element: 'XsdElement',
                       xsd_type: Optional[BaseXsdType] = None, level: int = 0) -> Any:
        result_dict: Any

        xsd_type = xsd_type or xsd_element.type
        if data.attributes:
            if self.attr_prefix:
                pfx = xsd_element.local_name + self.attr_prefix
            else:
                pfx = xsd_element.local_name
            result_dict = self.dict((pfx + self.map_qname(k), v) for k, v in data.attributes)
        else:
            result_dict = self.dict()

        if xsd_type.simple_type is not None:
            result_dict[xsd_element.local_name] = data.text

        if data.content:
            for name, value, xsd_child in self.map_content(data.content):
                if not value:
                    continue
                elif xsd_child.local_name:
                    name = xsd_child.local_name
                else:
                    name = name[2 + len(xsd_child.namespace):]

                if xsd_child.is_single():
                    if xsd_child.type is not None and xsd_child.type.simple_type is not None:
                        for k in value:
                            result_dict[k] = value[k]
                    else:
                        result_dict[name] = value
                else:
                    if xsd_child.type is not None and xsd_child.type.simple_type is not None \
                            and not xsd_child.attributes:
                        if len(xsd_element.findall('*')) == 1:
                            try:
                                result_dict.append(list(value.values())[0])
                            except AttributeError:
                                result_dict = self.list(value.values())
                        else:
                            try:
                                result_dict[name].append(list(value.values())[0])
                            except KeyError:
                                result_dict[name] = self.list(value.values())
                            except AttributeError:
                                result_dict[name] = self.list(value.values())
                    else:
                        try:
                            result_dict[name].append(value)
                        except KeyError:
                            result_dict[name] = self.list([value])
                        except AttributeError:
                            result_dict[name] = self.list([value])

        if level == 0:
            return self.dict([(xsd_element.local_name, result_dict)])
        else:
            return result_dict

    def element_encode(self, obj: Any, xsd_element: 'XsdElement', level: int = 0) -> ElementData:
        if level != 0:
            tag = xsd_element.local_name
        else:
            tag = xsd_element.local_name
            try:
                obj = obj[tag]
            except (KeyError, AttributeError, TypeError):
                pass

        if not isinstance(obj, MutableMapping):
            if xsd_element.type.simple_type is not None:
                return ElementData(xsd_element.name, obj, None, {}, None)
            elif xsd_element.type.mixed and not isinstance(obj, MutableSequence):
                return ElementData(xsd_element.name, obj, None, {}, None)
            else:
                return ElementData(xsd_element.name, None, obj, {}, None)

        text = None
        content: list[tuple[Optional[str], MutableSequence[Any]]] = []
        attributes = {}
        pfx = tag + self.attr_prefix if self.attr_prefix else tag

        for name, value in obj.items():
            if name == tag:
                text = value
            elif name.startswith(pfx) and len(name) > len(pfx):
                attr_name = name[len(pfx):]
                ns_name = self.unmap_qname(attr_name, xsd_element.attributes)
                attributes[ns_name] = value
            elif not isinstance(value, MutableSequence) or not value:
                content.append((self.unmap_qname(name), value))
            elif isinstance(value[0], (MutableMapping, MutableSequence)):
                ns_name = self.unmap_qname(name)
                content.extend((ns_name, item) for item in value)
            else:
                ns_name = self.unmap_qname(name)
                xsd_child = xsd_element.match_child(ns_name)
                if xsd_child is not None:
                    if xsd_child.type and xsd_child.type.is_list():
                        content.append((ns_name, value))
                    else:
                        content.extend((ns_name, item) for item in value)
                else:
                    content.extend((ns_name, item) for item in value)

        return ElementData(xsd_element.name, text, content, attributes, None)