File: trampolinescheduler.py

package info (click to toggle)
python-rx 4.0.4-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 4,056 kB
  • sloc: python: 39,070; javascript: 77; makefile: 24
file content (117 lines) | stat: -rw-r--r-- 3,788 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
import logging
from typing import Optional, TypeVar

from reactivex import abc, typing
from reactivex.abc.disposable import DisposableBase
from reactivex.abc.scheduler import ScheduledAction
from reactivex.internal.constants import DELTA_ZERO

from .scheduleditem import ScheduledItem
from .scheduler import Scheduler
from .trampoline import Trampoline

_TState = TypeVar("_TState")
log = logging.getLogger("Rx")


class TrampolineScheduler(Scheduler):
    """Represents an object that schedules units of work on the trampoline.
    You should never schedule timeouts using the *TrampolineScheduler*, as
    it will block the thread while waiting.

    Each instance has its own trampoline (and queue), and you can schedule work
    on it from different threads. Beware though, that the first thread to call
    a *schedule* method while the trampoline is idle will then remain occupied
    until the queue is empty.
    """

    def __init__(self) -> None:

        self._tramp = Trampoline()

    def get_trampoline(self) -> Trampoline:
        return self._tramp

    def schedule(
        self, action: abc.ScheduledAction[_TState], state: Optional[_TState] = None
    ) -> abc.DisposableBase:
        """Schedules an action to be executed.

        Args:
            action: Action to be executed.
            state: [Optional] state to be given to the action function.

        Returns:
            The disposable object used to cancel the scheduled action
            (best effort).
        """

        return self.schedule_absolute(self.now, action, state=state)

    def schedule_relative(
        self,
        duetime: typing.RelativeTime,
        action: abc.ScheduledAction[_TState],
        state: Optional[_TState] = None,
    ) -> abc.DisposableBase:
        """Schedules an action to be executed after duetime.

        Args:
            duetime: Relative time after which to execute the action.
            action: Action to be executed.
            state: [Optional] state to be given to the action function.

        Returns:
            The disposable object used to cancel the scheduled action
            (best effort).
        """

        duetime = max(DELTA_ZERO, self.to_timedelta(duetime))
        return self.schedule_absolute(self.now + duetime, action, state=state)

    def schedule_absolute(
        self,
        duetime: typing.AbsoluteTime,
        action: abc.ScheduledAction[_TState],
        state: Optional[_TState] = None,
    ) -> abc.DisposableBase:
        """Schedules an action to be executed at duetime.

        Args:
            duetime: Absolute time at which to execute the action.
            action: Action to be executed.
            state: [Optional] state to be given to the action function.

        Returns:
            The disposable object used to cancel the scheduled action
            (best effort).
        """

        dt = self.to_datetime(duetime)
        if dt > self.now:
            log.warning("Do not schedule blocking work!")
        item: ScheduledItem = ScheduledItem(self, state, action, dt)

        self.get_trampoline().run(item)

        return item.disposable

    def schedule_required(self) -> bool:
        """Test if scheduling is required.

        Gets a value indicating whether the caller must call a
        schedule method. If the trampoline is active, then it returns
        False; otherwise, if the trampoline is not active, then it
        returns True.
        """
        return self.get_trampoline().idle()

    def ensure_trampoline(
        self, action: ScheduledAction[_TState]
    ) -> Optional[DisposableBase]:
        """Method for testing the TrampolineScheduler."""

        if self.schedule_required():
            return self.schedule(action)

        return action(self, None)