File: gdata.py

package info (click to toggle)
python-xmlschema 4.1.0-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 5,208 kB
  • sloc: python: 39,174; xml: 1,282; makefile: 36
file content (178 lines) | stat: -rw-r--r-- 7,665 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
#
# Copyright (c), 2016-2024, SISSA (International School for Advanced Studies).
# All rights reserved.
# This file is distributed under the terms of the MIT License.
# See the file 'LICENSE' in the root directory of the present
# distribution, or http://opensource.org/licenses/MIT.
#
# @author Mikhail Razgovorov <1338833@gmail.com>
#
from collections.abc import Container, MutableMapping, MutableSequence
from typing import TYPE_CHECKING, Any, Optional, Type, Union

from xmlschema.exceptions import XMLSchemaTypeError
from xmlschema.aliases import NsmapType, BaseXsdType
from xmlschema.names import XSD_ANY_TYPE
from xmlschema.utils.qnames import local_name

from .base import ElementData, stackable, XMLSchemaConverter

if TYPE_CHECKING:
    from xmlschema.validators import XsdElement


class GDataConverter(XMLSchemaConverter):
    """
    XML Schema based converter class for GData protocol convention.

    ref: https://developers.google.com/gdata/docs/json

    :param namespaces: Map from namespace prefixes to URI.
    :param dict_class: dictionary class to use for decoded data. Default is `dict`.
    :param list_class: list class to use for decoded data. Default is `list`.
    :param kwargs: Additional keyword arguments to pass to base converter and \
    namespace mapper classes.
    """
    __slots__ = ()

    def __init__(self, namespaces: Optional[NsmapType] = None,
                 dict_class: Optional[Type[dict[str, Any]]] = None,
                 list_class: Optional[Type[list[Any]]] = None,
                 **kwargs: Any) -> None:
        kwargs.update(attr_prefix='', text_key='$t', cdata_prefix='$')
        super().__init__(namespaces, dict_class, list_class, **kwargs)

    @property
    def lossy(self) -> bool:
        return True  # a child element can override an attribute in the same namespace

    def map_qname(self, qname: str) -> str:
        name = super().map_qname(qname)
        if name.startswith('{') or ':' not in name:
            return name
        else:
            return name.replace(':', '$')

    def unmap_qname(self, qname: str,
                    name_table: Optional[Container[Optional[str]]] = None,
                    xmlns: Optional[list[tuple[str, str]]] = None) -> str:
        if '$' in qname and not qname.startswith('$'):
            qname = qname.replace('$', ':')
        return super().unmap_qname(qname, name_table, xmlns)

    def get_xmlns_from_data(self, obj: Any) -> Optional[list[tuple[str, str]]]:
        if not self._use_namespaces or not isinstance(obj, MutableMapping):
            return None

        xmlns = []
        for k, v in obj.items():
            if k == 'xmlns':
                xmlns.append(('', v))
            elif k.startswith('xmlns$'):
                xmlns.append((k[6:], v))
        return xmlns

    @stackable
    def element_decode(self, data: ElementData, xsd_element: 'XsdElement',
                       xsd_type: Optional[BaseXsdType] = None, level: int = 0) -> Any:
        xsd_type = xsd_type or xsd_element.type

        tag = self.map_qname(data.tag)
        result_dict = self.dict(t for t in self.map_attributes(data.attributes))

        xmlns = self.get_effective_xmlns(data.xmlns, level, xsd_element)
        if self._use_namespaces and xmlns:
            result_dict.update((f'xmlns${k}' if k else 'xmlns', v) for k, v in xmlns)

        xsd_group = xsd_type.model_group
        if xsd_group is None or not data.content:
            if data.text is not None:
                result_dict['$t'] = data.text
        else:
            has_single_group = xsd_group.is_single()
            for name, item, xsd_child in self.map_content(data.content):
                if name.startswith('$') and name[1:].isdigit():
                    result_dict[name] = item
                    continue

                assert isinstance(item, MutableMapping) and xsd_child is not None

                item = item[name]
                if name in result_dict:
                    other = result_dict[name]
                    if not isinstance(other, MutableSequence) or not other:
                        result_dict[name] = self.list([other, item])
                    elif isinstance(other[0], MutableSequence) or \
                            not isinstance(item, MutableSequence):
                        other.append(item)
                    else:
                        result_dict[name] = self.list([other, item])
                else:
                    if xsd_type.name == XSD_ANY_TYPE or \
                            has_single_group and xsd_child.is_single():
                        result_dict[name] = item
                    else:
                        result_dict[name] = self.list([item])

        return self.dict([(tag, result_dict)])

    @stackable
    def element_encode(self, obj: Any, xsd_element: 'XsdElement', level: int = 0) -> ElementData:
        if not isinstance(obj, MutableMapping):
            raise XMLSchemaTypeError(f"A dictionary expected, got {type(obj)} instead.")
        elif len(obj) != 1 or '$t' in obj:
            tag = xsd_element.name
        else:
            key, value = next(iter(obj.items()))
            if not isinstance(value, MutableMapping):
                tag = xsd_element.name
            else:
                tag = self.unmap_qname(key, xmlns=self.get_xmlns_from_data(value))
                if xsd_element.is_matching(tag):
                    obj = value
                elif not self.namespaces and local_name(tag) == xsd_element.local_name:
                    obj = value
                else:
                    tag = xsd_element.name

        text = None
        content: list[tuple[Union[str, int], Any]] = []
        attributes = {}

        xmlns = self.set_context(obj, level)
        for name, value in obj.items():
            if name == '$t':
                text = value
            elif name[0] == '$' and name[1:].isdigit():
                content.append((int(name[1:]), value))
            elif not isinstance(value, (MutableMapping, MutableSequence)):
                if name == 'xmlns' or name.startswith('xmlns$'):
                    continue  # an xmlns declaration
                ns_name = self.unmap_qname(name, xsd_element.attributes)
                attributes[ns_name] = value
            elif not isinstance(value, MutableSequence) or not value:
                ns_name = self.unmap_qname(name, xmlns=self.get_xmlns_from_data(value))
                content.append((ns_name, value))
            elif isinstance(value[0], (MutableMapping, MutableSequence)):
                ns_name = self.unmap_qname(name, xmlns=self.get_xmlns_from_data(value[0]))
                for item in value:
                    content.append((ns_name, item))
            else:
                ns_name = self.unmap_qname(name)
                xsd_child = xsd_element.match_child(ns_name)
                if xsd_child is not None:
                    if xsd_child.type and xsd_child.type.is_list():
                        content.append((ns_name, value))
                    else:
                        content.extend((ns_name, item) for item in value)
                else:
                    if isinstance(value, MutableSequence):
                        # Fallback tentative to an attribute if no element match
                        attr_name = self.unmap_qname(name, xsd_element.attributes)
                        if attr_name in xsd_element.attributes:
                            attributes[attr_name] = value
                            continue

                    content.append((ns_name, value))

        return ElementData(tag, text, content, attributes, xmlns)