File: feed.py

package info (click to toggle)
python-aio-geojson-generic-client 0.4-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 180 kB
  • sloc: python: 332; makefile: 4
file content (59 lines) | stat: -rw-r--r-- 1,780 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
"""Generic GeoJSON feed."""
from __future__ import annotations

import logging
from datetime import datetime

from aio_geojson_client.feed import GeoJsonFeed
from aiohttp import ClientSession
from geojson import FeatureCollection

from .feed_entry import GenericFeedEntry

_LOGGER = logging.getLogger(__name__)


class GenericFeed(GeoJsonFeed[GenericFeedEntry]):
    """Generic GeoJSON feed."""

    def __init__(
        self,
        websession: ClientSession,
        home_coordinates: tuple[float, float],
        url: str,
        filter_radius: float = None,
    ):
        """Initialise this service."""
        super().__init__(websession, home_coordinates, url, filter_radius=filter_radius)

    def __repr__(self):
        """Return string representation of this feed."""
        return "<{}(home={}, url={}, radius={})>".format(
            self.__class__.__name__,
            self._home_coordinates,
            self._url,
            self._filter_radius,
        )

    def _new_entry(
        self, home_coordinates: tuple[float, float], feature, global_data: dict
    ) -> GenericFeedEntry:
        """Generate a new entry."""
        return GenericFeedEntry(home_coordinates, feature)

    def _extract_last_timestamp(
        self, feed_entries: list[GenericFeedEntry]
    ) -> datetime | None:
        """Determine latest (newest) entry from the filtered feed."""
        if feed_entries:
            dates = sorted(
                filter(None, [entry.publication_date for entry in feed_entries]),
                reverse=True,
            )
            if dates:
                return dates[0]
        return None

    def _extract_from_feed(self, feed: FeatureCollection) -> dict | None:
        """Extract global metadata from feed."""
        return None