File: feed_manager.py

package info (click to toggle)
python-georss-client 0.18-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 320 kB
  • sloc: python: 1,582; xml: 430; makefile: 8; sh: 5
file content (104 lines) | stat: -rw-r--r-- 4,020 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
"""Base class for the feed manager.

This allows managing feeds and their entries throughout their life-cycle.
"""

from __future__ import annotations

from datetime import datetime
import logging
from typing import Callable

from . import GeoRssFeed
from .consts import UPDATE_OK, UPDATE_OK_NO_DATA

_LOGGER = logging.getLogger(__name__)


class FeedManagerBase:
    """Generic Feed manager."""

    def __init__(
        self,
        feed: GeoRssFeed,
        generate_callback: Callable[[str], None],
        update_callback: Callable[[str], None],
        remove_callback: Callable[[str], None],
    ):
        """Initialise feed manager."""
        self._feed: GeoRssFeed = feed
        self.feed_entries: dict = {}
        self._managed_external_ids = set()
        self._last_update: datetime | None = None
        self._generate_callback: Callable[[str], None] = generate_callback
        self._update_callback: Callable[[str], None] = update_callback
        self._remove_callback: Callable[[str], None] = remove_callback

    def __repr__(self):
        """Return string representation of this feed."""
        return f"<{self.__class__.__name__}(feed={self._feed})>"

    def update(self):
        """Update the feed and then update connected entities."""
        status, feed_entries = self._feed.update()
        if status == UPDATE_OK:
            _LOGGER.debug("Data retrieved %s", feed_entries)
            # Keep a copy of all feed entries for future lookups by entities.
            self.feed_entries = {entry.external_id: entry for entry in feed_entries}
            # Record current time of update.
            self._last_update = datetime.now()
            # For entity management the external ids from the feed are used.
            feed_external_ids = set(self.feed_entries)
            remove_external_ids = self._managed_external_ids.difference(
                feed_external_ids
            )
            self._remove_entities(remove_external_ids)
            update_external_ids = self._managed_external_ids.intersection(
                feed_external_ids
            )
            self._update_entities(update_external_ids)
            create_external_ids = feed_external_ids.difference(
                self._managed_external_ids
            )
            self._generate_new_entities(create_external_ids)
        elif status == UPDATE_OK_NO_DATA:
            _LOGGER.debug("Update successful, but no data received from %s", self._feed)
        else:
            _LOGGER.warning(
                "Update not successful, no data received from %s", self._feed
            )
            # Remove all entities.
            self._remove_entities(self._managed_external_ids.copy())
            # Remove all feed entries and managed external ids.
            self.feed_entries.clear()
            self._managed_external_ids.clear()

    def _generate_new_entities(self, external_ids):
        """Generate new entities for events."""
        for external_id in external_ids:
            self._generate_callback(external_id)
            _LOGGER.debug("New entity added %s", external_id)
            self._managed_external_ids.add(external_id)

    def _update_entities(self, external_ids):
        """Update entities."""
        for external_id in external_ids:
            _LOGGER.debug("Existing entity found %s", external_id)
            self._update_callback(external_id)

    def _remove_entities(self, external_ids):
        """Remove entities."""
        for external_id in external_ids:
            _LOGGER.debug("Entity not current anymore %s", external_id)
            self._managed_external_ids.remove(external_id)
            self._remove_callback(external_id)

    @property
    def last_timestamp(self) -> datetime | None:
        """Return the last timestamp extracted from this feed."""
        return self._feed.last_timestamp

    @property
    def last_update(self) -> datetime | None:
        """Return the last successful update of this feed."""
        return self._last_update