File: __init__.py

package info (click to toggle)
python-aio-georss-client 0.12-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 376 kB
  • sloc: python: 2,656; xml: 513; makefile: 4
file content (135 lines) | stat: -rw-r--r-- 4,421 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
"""XML Parser."""
from __future__ import annotations

import logging
from datetime import datetime

import dateparser
import xmltodict

from ..consts import (
    XML_TAG_CHANNEL,
    XML_TAG_DC_DATE,
    XML_TAG_FEED,
    XML_TAG_GDACS_BBOX,
    XML_TAG_GEO_LAT,
    XML_TAG_GEO_LONG,
    XML_TAG_GEORSS_POINT,
    XML_TAG_GEORSS_POLYGON,
    XML_TAG_GML_POS,
    XML_TAG_GML_POS_LIST,
    XML_TAG_HEIGHT,
    XML_TAG_LAST_BUILD_DATE,
    XML_TAG_PUB_DATE,
    XML_TAG_PUBLISHED,
    XML_TAG_RSS,
    XML_TAG_TTL,
    XML_TAG_UPDATED,
    XML_TAG_WIDTH,
)
from .feed import Feed

_LOGGER = logging.getLogger(__name__)

DEFAULT_NAMESPACES = {
    "http://www.w3.org/2005/Atom": None,
    "http://purl.org/dc/elements/1.1/": "dc",
    "http://www.georss.org/georss": "georss",
    "http://www.w3.org/2003/01/geo/wgs84_pos#": "geo",
    "http://www.w3.org/2003/01/geo/": "geo",
    "http://www.opengis.net/gml": "gml",
    "http://www.gdacs.org": "gdacs",
}
KEYS_DATE = [
    XML_TAG_DC_DATE,
    XML_TAG_LAST_BUILD_DATE,
    XML_TAG_PUB_DATE,
    XML_TAG_PUBLISHED,
    XML_TAG_UPDATED,
]
KEYS_FLOAT = [XML_TAG_GEO_LAT, XML_TAG_GEO_LONG]
KEYS_FLOAT_LIST = [
    XML_TAG_GEORSS_POLYGON,
    XML_TAG_GML_POS_LIST,
    XML_TAG_GML_POS,
    XML_TAG_GEORSS_POINT,
    XML_TAG_GDACS_BBOX,
]
KEYS_INT = [XML_TAG_HEIGHT, XML_TAG_TTL, XML_TAG_WIDTH]


class XmlParser:
    """Built-in XML parser."""

    def __init__(self, additional_namespaces: dict = None):
        """Initialise the XML parser."""
        self._namespaces = DEFAULT_NAMESPACES
        if additional_namespaces:
            self._namespaces.update(additional_namespaces)

    @staticmethod
    def postprocessor(
        path: list[str], key: str, value: str
    ) -> tuple[str, str | float | int | datetime | tuple]:
        """Conduct type conversion for selected keys."""
        try:
            if key in KEYS_DATE and value:
                return key, dateparser.parse(value)
            if key in KEYS_FLOAT and value:
                return key, float(value)
            if key in KEYS_FLOAT_LIST and value:
                point_coordinates = XmlParser._process_coordinates(value)
                # Return tuple of coordinates to make this conversion
                # compatible with parsing multiple tags of the same type and
                # combining all values into a single array.
                # If we just returned an array here, coordinates would be mixed
                # up like: [lat1, lon1, [lat2, lon2], [lat3, lon3]]
                return key, tuple(point_coordinates)
            if key in KEYS_INT and value:
                return key, int(value)
        except (ValueError, TypeError) as error:
            _LOGGER.warning("Unable to process (%s/%s): %s", key, value, error)
        return key, value

    @staticmethod
    def _process_coordinates(value: str) -> list[float]:
        """Turn white-space separated list of numbers into list of floats."""
        coordinate_values = value.split()
        point_coordinates: list[float] = []
        for i in range(0, len(coordinate_values)):
            point_coordinates.append(float(coordinate_values[i]))
        return point_coordinates

    def parse(self, xml: str) -> Feed | None:
        """Parse the provided xml."""
        if xml:
            parsed_dict = xmltodict.parse(
                xml,
                process_namespaces=True,
                namespaces=self._namespaces,
                postprocessor=XmlParser.postprocessor,
            )
            if XML_TAG_RSS in parsed_dict:
                return XmlParser._create_feed_from_rss(parsed_dict)
            if XML_TAG_FEED in parsed_dict:
                return XmlParser._create_feed_from_feed(parsed_dict)
        return None

    @staticmethod
    def _create_feed_from_rss(parsed_dict: dict) -> Feed | None:
        """Create feed from provided RSS data."""
        rss = parsed_dict.get(XML_TAG_RSS)
        if XML_TAG_CHANNEL in rss:
            channel = rss.get(XML_TAG_CHANNEL)
            return Feed(channel)
        else:
            _LOGGER.warning(
                "Invalid structure: %s not followed by %s", XML_TAG_RSS, XML_TAG_CHANNEL
            )
            return None

    @staticmethod
    def _create_feed_from_feed(parsed_dict: dict) -> Feed:
        """Create feed from provided Feed data."""
        feed_data = parsed_dict.get(XML_TAG_FEED)
        return Feed(feed_data)