1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135
|
"""XML Parser."""
from __future__ import annotations
import logging
from datetime import datetime
import dateparser
import xmltodict
from ..consts import (
XML_TAG_CHANNEL,
XML_TAG_DC_DATE,
XML_TAG_FEED,
XML_TAG_GDACS_BBOX,
XML_TAG_GEO_LAT,
XML_TAG_GEO_LONG,
XML_TAG_GEORSS_POINT,
XML_TAG_GEORSS_POLYGON,
XML_TAG_GML_POS,
XML_TAG_GML_POS_LIST,
XML_TAG_HEIGHT,
XML_TAG_LAST_BUILD_DATE,
XML_TAG_PUB_DATE,
XML_TAG_PUBLISHED,
XML_TAG_RSS,
XML_TAG_TTL,
XML_TAG_UPDATED,
XML_TAG_WIDTH,
)
from .feed import Feed
_LOGGER = logging.getLogger(__name__)
DEFAULT_NAMESPACES = {
"http://www.w3.org/2005/Atom": None,
"http://purl.org/dc/elements/1.1/": "dc",
"http://www.georss.org/georss": "georss",
"http://www.w3.org/2003/01/geo/wgs84_pos#": "geo",
"http://www.w3.org/2003/01/geo/": "geo",
"http://www.opengis.net/gml": "gml",
"http://www.gdacs.org": "gdacs",
}
KEYS_DATE = [
XML_TAG_DC_DATE,
XML_TAG_LAST_BUILD_DATE,
XML_TAG_PUB_DATE,
XML_TAG_PUBLISHED,
XML_TAG_UPDATED,
]
KEYS_FLOAT = [XML_TAG_GEO_LAT, XML_TAG_GEO_LONG]
KEYS_FLOAT_LIST = [
XML_TAG_GEORSS_POLYGON,
XML_TAG_GML_POS_LIST,
XML_TAG_GML_POS,
XML_TAG_GEORSS_POINT,
XML_TAG_GDACS_BBOX,
]
KEYS_INT = [XML_TAG_HEIGHT, XML_TAG_TTL, XML_TAG_WIDTH]
class XmlParser:
"""Built-in XML parser."""
def __init__(self, additional_namespaces: dict = None):
"""Initialise the XML parser."""
self._namespaces = DEFAULT_NAMESPACES
if additional_namespaces:
self._namespaces.update(additional_namespaces)
@staticmethod
def postprocessor(
path: list[str], key: str, value: str
) -> tuple[str, str | float | int | datetime | tuple]:
"""Conduct type conversion for selected keys."""
try:
if key in KEYS_DATE and value:
return key, dateparser.parse(value)
if key in KEYS_FLOAT and value:
return key, float(value)
if key in KEYS_FLOAT_LIST and value:
point_coordinates = XmlParser._process_coordinates(value)
# Return tuple of coordinates to make this conversion
# compatible with parsing multiple tags of the same type and
# combining all values into a single array.
# If we just returned an array here, coordinates would be mixed
# up like: [lat1, lon1, [lat2, lon2], [lat3, lon3]]
return key, tuple(point_coordinates)
if key in KEYS_INT and value:
return key, int(value)
except (ValueError, TypeError) as error:
_LOGGER.warning("Unable to process (%s/%s): %s", key, value, error)
return key, value
@staticmethod
def _process_coordinates(value: str) -> list[float]:
"""Turn white-space separated list of numbers into list of floats."""
coordinate_values = value.split()
point_coordinates: list[float] = []
for i in range(0, len(coordinate_values)):
point_coordinates.append(float(coordinate_values[i]))
return point_coordinates
def parse(self, xml: str) -> Feed | None:
"""Parse the provided xml."""
if xml:
parsed_dict = xmltodict.parse(
xml,
process_namespaces=True,
namespaces=self._namespaces,
postprocessor=XmlParser.postprocessor,
)
if XML_TAG_RSS in parsed_dict:
return XmlParser._create_feed_from_rss(parsed_dict)
if XML_TAG_FEED in parsed_dict:
return XmlParser._create_feed_from_feed(parsed_dict)
return None
@staticmethod
def _create_feed_from_rss(parsed_dict: dict) -> Feed | None:
"""Create feed from provided RSS data."""
rss = parsed_dict.get(XML_TAG_RSS)
if XML_TAG_CHANNEL in rss:
channel = rss.get(XML_TAG_CHANNEL)
return Feed(channel)
else:
_LOGGER.warning(
"Invalid structure: %s not followed by %s", XML_TAG_RSS, XML_TAG_CHANNEL
)
return None
@staticmethod
def _create_feed_from_feed(parsed_dict: dict) -> Feed:
"""Create feed from provided Feed data."""
feed_data = parsed_dict.get(XML_TAG_FEED)
return Feed(feed_data)
|