File: subscriber.py

package info (click to toggle)
python-yalexs 9.2.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 1,120 kB
  • sloc: python: 7,916; makefile: 3; sh: 2
file content (89 lines) | stat: -rw-r--r-- 3,120 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
"""yalexs subscribers."""

from __future__ import annotations

import asyncio
from abc import ABC, abstractmethod
from collections import defaultdict
from collections.abc import Callable
from datetime import timedelta
from functools import partial
from typing import Any

from ..backports.tasks import create_eager_task


class SubscriberMixin(ABC):
    """Base implementation for a subscriber."""

    def __init__(self, update_interval: timedelta) -> None:
        """Initialize an subscriber."""
        super().__init__()
        self._update_interval_seconds = update_interval.total_seconds()
        self._subscriptions: defaultdict[str, set[Callable[[], None]]] = defaultdict(
            set
        )
        self._unsub_interval: asyncio.TimerHandle | None = None
        self._loop = asyncio.get_running_loop()
        self._refresh_task: asyncio.Task | None = None

    def async_subscribe_device_id(
        self, device_id: str, update_callback: Callable[[], None]
    ) -> Callable[[], None]:
        """Add an callback subscriber.

        Returns a callable that can be used to unsubscribe.
        """
        if not self._subscriptions:
            self._async_setup_listeners()
        self._subscriptions[device_id].add(update_callback)
        return partial(self.async_unsubscribe_device_id, device_id, update_callback)

    @abstractmethod
    async def _async_refresh(self) -> None:
        """Refresh data."""

    def _async_scheduled_refresh(self) -> None:
        """Call the refresh method."""
        self._unsub_interval = self._loop.call_later(
            self._update_interval_seconds,
            self._async_scheduled_refresh,
        )
        self._refresh_task = create_eager_task(
            self._async_refresh(), loop=self._loop, name=f"{self} schedule refresh"
        )

    def _async_cancel_update_interval(self) -> None:
        """Cancel the scheduled update."""
        if self._unsub_interval:
            self._unsub_interval.cancel()
            self._unsub_interval = None

    def _async_setup_listeners(self) -> None:
        """Create interval and stop listeners."""
        self._async_cancel_update_interval()
        self._unsub_interval = self._loop.call_later(
            self._update_interval_seconds,
            self._async_scheduled_refresh,
        )

    def async_stop(self, *args: Any) -> None:
        """Cleanup on shutdown."""
        self._refresh_task.cancel()
        self._async_cancel_update_interval()

    def async_unsubscribe_device_id(
        self, device_id: str, update_callback: Callable[[], None]
    ) -> None:
        """Remove a callback subscriber."""
        self._subscriptions[device_id].remove(update_callback)
        if not self._subscriptions[device_id]:
            del self._subscriptions[device_id]
        if self._subscriptions:
            return
        self._async_cancel_update_interval()

    def async_signal_device_id_update(self, device_id: str) -> None:
        """Call the callbacks for a device_id."""
        for update_callback in self._subscriptions.get(device_id, ()):
            update_callback()