File: feed_manager.py

package info (click to toggle)
python-aio-geojson-client 0.20-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 268 kB
  • sloc: python: 1,368; makefile: 4
file content (168 lines) | stat: -rw-r--r-- 7,200 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
"""Base class for the feed manager. This allows managing feeds and their entries throughout their life-cycle."""
from __future__ import annotations

import logging
from collections.abc import Awaitable, Callable
from datetime import datetime

from .consts import T_FEED_ENTRY, T_FILTER_DEFINITION, UPDATE_OK, UPDATE_OK_NO_DATA
from .feed import GeoJsonFeed
from .feed_entry import FeedEntry
from .status_update import StatusUpdate

_LOGGER = logging.getLogger(__name__)


class FeedManagerBase:
    """Generic Feed manager."""

    def __init__(
        self,
        feed: GeoJsonFeed,
        generate_async_callback: Callable[[str], Awaitable[None]],
        update_async_callback: Callable[[str], Awaitable[None]],
        remove_async_callback: Callable[[str], Awaitable[None]],
        status_async_callback: Callable[[StatusUpdate], Awaitable[None]] = None,
    ):
        """Initialise feed manager."""
        self._feed = feed
        self.feed_entries = {}
        self._managed_external_ids = set()
        self._last_update = None
        self._last_update_successful = None
        self._generate_async_callback = generate_async_callback
        self._update_async_callback = update_async_callback
        self._remove_async_callback = remove_async_callback
        self._status_async_callback = status_async_callback

    def __repr__(self):
        """Return string representation of this feed."""
        return f"<{self.__class__.__name__}(feed={self._feed})>"

    async def _update_internal(
        self, status: str, feed_entries: list[T_FEED_ENTRY] | None
    ):
        """Update the feed and then update connected entities."""
        # Record current time of update.
        self._last_update = datetime.now()
        count_created = 0
        count_updated = 0
        count_removed = 0
        await self._store_feed_entries(status, feed_entries)
        if status == UPDATE_OK:
            _LOGGER.debug("Data retrieved %s", feed_entries)
            # Record current time of update.
            self._last_update_successful = self._last_update
            # For entity management the external ids from the feed are used.
            feed_external_ids = {entry.external_id for entry in feed_entries}
            count_removed = await self._update_feed_remove_entries(feed_external_ids)
            count_updated = await self._update_feed_update_entries(feed_external_ids)
            count_created = await self._update_feed_create_entries(feed_external_ids)
        elif status == UPDATE_OK_NO_DATA:
            _LOGGER.debug("Update successful, but no data received from %s", self._feed)
            # Record current time of update.
            self._last_update_successful = self._last_update
        else:
            _LOGGER.warning(
                "Update not successful, no data received from %s", self._feed
            )
            # Remove all entities.
            count_removed = await self._update_feed_remove_entries(set())
        # Send status update to subscriber.
        await self._status_update(status, count_created, count_updated, count_removed)

    async def update(self):
        """Update the feed and then update connected entities."""
        status, feed_entries = await self._feed.update()
        await self._update_internal(status, feed_entries)

    async def update_override(self, filter_overrides: T_FILTER_DEFINITION = None):
        """Update the feed and then update connected entities."""
        status, feed_entries = await self._feed.update_override(
            filter_overrides=filter_overrides
        )
        await self._update_internal(status, feed_entries)

    async def _store_feed_entries(
        self, status: str, feed_entries: list[FeedEntry] | None
    ):
        """Keep a copy of all feed entries for future lookups."""
        if feed_entries or status == UPDATE_OK_NO_DATA:
            if status == UPDATE_OK:
                self.feed_entries = {entry.external_id: entry for entry in feed_entries}
        else:
            self.feed_entries.clear()

    async def _update_feed_create_entries(self, feed_external_ids: set[str]) -> int:
        """Create entities after feed update."""
        create_external_ids = feed_external_ids.difference(self._managed_external_ids)
        count_created = len(create_external_ids)
        await self._generate_new_entities(create_external_ids)
        return count_created

    async def _update_feed_update_entries(self, feed_external_ids: set[str]) -> int:
        """Update entities after feed update."""
        update_external_ids = self._managed_external_ids.intersection(feed_external_ids)
        count_updated = len(update_external_ids)
        await self._update_entities(update_external_ids)
        return count_updated

    async def _update_feed_remove_entries(self, feed_external_ids: set[str]) -> int:
        """Remove entities after feed update."""
        remove_external_ids = self._managed_external_ids.difference(feed_external_ids)
        count_removed = len(remove_external_ids)
        await self._remove_entities(remove_external_ids)
        return count_removed

    async def _generate_new_entities(self, external_ids: set[str]):
        """Generate new entities for events using callback."""
        for external_id in external_ids:
            await self._generate_async_callback(external_id)
            _LOGGER.debug("New entity added %s", external_id)
            self._managed_external_ids.add(external_id)

    async def _update_entities(self, external_ids: set[str]):
        """Update entities using callback."""
        for external_id in external_ids:
            _LOGGER.debug("Existing entity found %s", external_id)
            await self._update_async_callback(external_id)

    async def _remove_entities(self, external_ids: set[str]):
        """Remove entities using callback."""
        for external_id in external_ids:
            _LOGGER.debug("Entity not current anymore %s", external_id)
            self._managed_external_ids.remove(external_id)
            await self._remove_async_callback(external_id)

    async def _status_update(
        self, status: str, count_created: int, count_updated: int, count_removed: int
    ):
        """Provide status update."""
        if self._status_async_callback:
            await self._status_async_callback(
                StatusUpdate(
                    status,
                    self.last_update,
                    self.last_update_successful,
                    self.last_timestamp,
                    len(self.feed_entries),
                    count_created,
                    count_updated,
                    count_removed,
                )
            )

    @property
    def last_timestamp(self) -> datetime | None:
        """Return the last timestamp extracted from this feed."""
        return self._feed.last_timestamp

    @property
    def last_update(self) -> datetime | None:
        """Return the last update of this feed."""
        return self._last_update

    @property
    def last_update_successful(self) -> datetime | None:
        """Return the last successful update of this feed."""
        return self._last_update_successful