1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168
|
"""Base class for the feed manager. This allows managing feeds and their entries throughout their life-cycle."""
from __future__ import annotations
import logging
from collections.abc import Awaitable, Callable
from datetime import datetime
from .consts import T_FEED_ENTRY, T_FILTER_DEFINITION, UPDATE_OK, UPDATE_OK_NO_DATA
from .feed import GeoJsonFeed
from .feed_entry import FeedEntry
from .status_update import StatusUpdate
_LOGGER = logging.getLogger(__name__)
class FeedManagerBase:
"""Generic Feed manager."""
def __init__(
self,
feed: GeoJsonFeed,
generate_async_callback: Callable[[str], Awaitable[None]],
update_async_callback: Callable[[str], Awaitable[None]],
remove_async_callback: Callable[[str], Awaitable[None]],
status_async_callback: Callable[[StatusUpdate], Awaitable[None]] = None,
):
"""Initialise feed manager."""
self._feed = feed
self.feed_entries = {}
self._managed_external_ids = set()
self._last_update = None
self._last_update_successful = None
self._generate_async_callback = generate_async_callback
self._update_async_callback = update_async_callback
self._remove_async_callback = remove_async_callback
self._status_async_callback = status_async_callback
def __repr__(self):
"""Return string representation of this feed."""
return f"<{self.__class__.__name__}(feed={self._feed})>"
async def _update_internal(
self, status: str, feed_entries: list[T_FEED_ENTRY] | None
):
"""Update the feed and then update connected entities."""
# Record current time of update.
self._last_update = datetime.now()
count_created = 0
count_updated = 0
count_removed = 0
await self._store_feed_entries(status, feed_entries)
if status == UPDATE_OK:
_LOGGER.debug("Data retrieved %s", feed_entries)
# Record current time of update.
self._last_update_successful = self._last_update
# For entity management the external ids from the feed are used.
feed_external_ids = {entry.external_id for entry in feed_entries}
count_removed = await self._update_feed_remove_entries(feed_external_ids)
count_updated = await self._update_feed_update_entries(feed_external_ids)
count_created = await self._update_feed_create_entries(feed_external_ids)
elif status == UPDATE_OK_NO_DATA:
_LOGGER.debug("Update successful, but no data received from %s", self._feed)
# Record current time of update.
self._last_update_successful = self._last_update
else:
_LOGGER.warning(
"Update not successful, no data received from %s", self._feed
)
# Remove all entities.
count_removed = await self._update_feed_remove_entries(set())
# Send status update to subscriber.
await self._status_update(status, count_created, count_updated, count_removed)
async def update(self):
"""Update the feed and then update connected entities."""
status, feed_entries = await self._feed.update()
await self._update_internal(status, feed_entries)
async def update_override(self, filter_overrides: T_FILTER_DEFINITION = None):
"""Update the feed and then update connected entities."""
status, feed_entries = await self._feed.update_override(
filter_overrides=filter_overrides
)
await self._update_internal(status, feed_entries)
async def _store_feed_entries(
self, status: str, feed_entries: list[FeedEntry] | None
):
"""Keep a copy of all feed entries for future lookups."""
if feed_entries or status == UPDATE_OK_NO_DATA:
if status == UPDATE_OK:
self.feed_entries = {entry.external_id: entry for entry in feed_entries}
else:
self.feed_entries.clear()
async def _update_feed_create_entries(self, feed_external_ids: set[str]) -> int:
"""Create entities after feed update."""
create_external_ids = feed_external_ids.difference(self._managed_external_ids)
count_created = len(create_external_ids)
await self._generate_new_entities(create_external_ids)
return count_created
async def _update_feed_update_entries(self, feed_external_ids: set[str]) -> int:
"""Update entities after feed update."""
update_external_ids = self._managed_external_ids.intersection(feed_external_ids)
count_updated = len(update_external_ids)
await self._update_entities(update_external_ids)
return count_updated
async def _update_feed_remove_entries(self, feed_external_ids: set[str]) -> int:
"""Remove entities after feed update."""
remove_external_ids = self._managed_external_ids.difference(feed_external_ids)
count_removed = len(remove_external_ids)
await self._remove_entities(remove_external_ids)
return count_removed
async def _generate_new_entities(self, external_ids: set[str]):
"""Generate new entities for events using callback."""
for external_id in external_ids:
await self._generate_async_callback(external_id)
_LOGGER.debug("New entity added %s", external_id)
self._managed_external_ids.add(external_id)
async def _update_entities(self, external_ids: set[str]):
"""Update entities using callback."""
for external_id in external_ids:
_LOGGER.debug("Existing entity found %s", external_id)
await self._update_async_callback(external_id)
async def _remove_entities(self, external_ids: set[str]):
"""Remove entities using callback."""
for external_id in external_ids:
_LOGGER.debug("Entity not current anymore %s", external_id)
self._managed_external_ids.remove(external_id)
await self._remove_async_callback(external_id)
async def _status_update(
self, status: str, count_created: int, count_updated: int, count_removed: int
):
"""Provide status update."""
if self._status_async_callback:
await self._status_async_callback(
StatusUpdate(
status,
self.last_update,
self.last_update_successful,
self.last_timestamp,
len(self.feed_entries),
count_created,
count_updated,
count_removed,
)
)
@property
def last_timestamp(self) -> datetime | None:
"""Return the last timestamp extracted from this feed."""
return self._feed.last_timestamp
@property
def last_update(self) -> datetime | None:
"""Return the last update of this feed."""
return self._last_update
@property
def last_update_successful(self) -> datetime | None:
"""Return the last successful update of this feed."""
return self._last_update_successful
|