1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104
|
"""Base class for the feed manager.
This allows managing feeds and their entries throughout their life-cycle.
"""
from __future__ import annotations
from datetime import datetime
import logging
from typing import Callable
from . import GeoRssFeed
from .consts import UPDATE_OK, UPDATE_OK_NO_DATA
_LOGGER = logging.getLogger(__name__)
class FeedManagerBase:
"""Generic Feed manager."""
def __init__(
self,
feed: GeoRssFeed,
generate_callback: Callable[[str], None],
update_callback: Callable[[str], None],
remove_callback: Callable[[str], None],
):
"""Initialise feed manager."""
self._feed: GeoRssFeed = feed
self.feed_entries: dict = {}
self._managed_external_ids = set()
self._last_update: datetime | None = None
self._generate_callback: Callable[[str], None] = generate_callback
self._update_callback: Callable[[str], None] = update_callback
self._remove_callback: Callable[[str], None] = remove_callback
def __repr__(self):
"""Return string representation of this feed."""
return f"<{self.__class__.__name__}(feed={self._feed})>"
def update(self):
"""Update the feed and then update connected entities."""
status, feed_entries = self._feed.update()
if status == UPDATE_OK:
_LOGGER.debug("Data retrieved %s", feed_entries)
# Keep a copy of all feed entries for future lookups by entities.
self.feed_entries = {entry.external_id: entry for entry in feed_entries}
# Record current time of update.
self._last_update = datetime.now()
# For entity management the external ids from the feed are used.
feed_external_ids = set(self.feed_entries)
remove_external_ids = self._managed_external_ids.difference(
feed_external_ids
)
self._remove_entities(remove_external_ids)
update_external_ids = self._managed_external_ids.intersection(
feed_external_ids
)
self._update_entities(update_external_ids)
create_external_ids = feed_external_ids.difference(
self._managed_external_ids
)
self._generate_new_entities(create_external_ids)
elif status == UPDATE_OK_NO_DATA:
_LOGGER.debug("Update successful, but no data received from %s", self._feed)
else:
_LOGGER.warning(
"Update not successful, no data received from %s", self._feed
)
# Remove all entities.
self._remove_entities(self._managed_external_ids.copy())
# Remove all feed entries and managed external ids.
self.feed_entries.clear()
self._managed_external_ids.clear()
def _generate_new_entities(self, external_ids):
"""Generate new entities for events."""
for external_id in external_ids:
self._generate_callback(external_id)
_LOGGER.debug("New entity added %s", external_id)
self._managed_external_ids.add(external_id)
def _update_entities(self, external_ids):
"""Update entities."""
for external_id in external_ids:
_LOGGER.debug("Existing entity found %s", external_id)
self._update_callback(external_id)
def _remove_entities(self, external_ids):
"""Remove entities."""
for external_id in external_ids:
_LOGGER.debug("Entity not current anymore %s", external_id)
self._managed_external_ids.remove(external_id)
self._remove_callback(external_id)
@property
def last_timestamp(self) -> datetime | None:
"""Return the last timestamp extracted from this feed."""
return self._feed.last_timestamp
@property
def last_update(self) -> datetime | None:
"""Return the last successful update of this feed."""
return self._last_update
|