File: eventloopscheduler.py

package info (click to toggle)
python-rx 4.0.4-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 4,056 kB
  • sloc: python: 39,070; javascript: 77; makefile: 24
file content (218 lines) | stat: -rw-r--r-- 7,421 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
import logging
import threading
from collections import deque
from typing import Deque, Optional, TypeVar

from reactivex import abc, typing
from reactivex.disposable import Disposable
from reactivex.internal.concurrency import default_thread_factory
from reactivex.internal.constants import DELTA_ZERO
from reactivex.internal.exceptions import DisposedException
from reactivex.internal.priorityqueue import PriorityQueue

from .periodicscheduler import PeriodicScheduler
from .scheduleditem import ScheduledItem

log = logging.getLogger("Rx")

_TState = TypeVar("_TState")


class EventLoopScheduler(PeriodicScheduler, abc.DisposableBase):
    """Creates an object that schedules units of work on a designated thread."""

    def __init__(
        self,
        thread_factory: Optional[typing.StartableFactory] = None,
        exit_if_empty: bool = False,
    ) -> None:
        super().__init__()
        self._is_disposed = False

        self._thread_factory: typing.StartableFactory = (
            thread_factory or default_thread_factory
        )
        self._thread: Optional[typing.Startable] = None
        self._condition = threading.Condition(threading.Lock())
        self._queue: PriorityQueue[ScheduledItem] = PriorityQueue()
        self._ready_list: Deque[ScheduledItem] = deque()

        self._exit_if_empty = exit_if_empty

    def schedule(
        self, action: typing.ScheduledAction[_TState], state: Optional[_TState] = None
    ) -> abc.DisposableBase:
        """Schedules an action to be executed.

        Args:
            action: Action to be executed.
            state: [Optional] state to be given to the action function.

        Returns:
            The disposable object used to cancel the scheduled action
            (best effort).
        """

        return self.schedule_absolute(self.now, action, state=state)

    def schedule_relative(
        self,
        duetime: typing.RelativeTime,
        action: typing.ScheduledAction[_TState],
        state: Optional[_TState] = None,
    ) -> abc.DisposableBase:
        """Schedules an action to be executed after duetime.

        Args:
            duetime: Relative time after which to execute the action.
            action: Action to be executed.
            state: [Optional] state to be given to the action function.

        Returns:
            The disposable object used to cancel the scheduled action
            (best effort).
        """

        duetime = max(DELTA_ZERO, self.to_timedelta(duetime))
        return self.schedule_absolute(self.now + duetime, action, state)

    def schedule_absolute(
        self,
        duetime: typing.AbsoluteTime,
        action: typing.ScheduledAction[_TState],
        state: Optional[_TState] = None,
    ) -> abc.DisposableBase:
        """Schedules an action to be executed at duetime.

        Args:
            duetime: Absolute time at which to execute the action.
            action: Action to be executed.
            state: [Optional] state to be given to the action function.

        Returns:
            The disposable object used to cancel the scheduled action
            (best effort).
        """

        if self._is_disposed:
            raise DisposedException()

        dt = self.to_datetime(duetime)
        si: ScheduledItem = ScheduledItem(self, state, action, dt)

        with self._condition:
            if dt <= self.now:
                self._ready_list.append(si)
            else:
                self._queue.enqueue(si)
            self._condition.notify()  # signal that a new item is available
            self._ensure_thread()

        return Disposable(si.cancel)

    def schedule_periodic(
        self,
        period: typing.RelativeTime,
        action: typing.ScheduledPeriodicAction[_TState],
        state: Optional[_TState] = None,
    ) -> abc.DisposableBase:
        """Schedules a periodic piece of work.

        Args:
            period: Period in seconds or timedelta for running the
                work periodically.
            action: Action to be executed.
            state: [Optional] Initial state passed to the action upon
                the first iteration.

        Returns:
            The disposable object used to cancel the scheduled
            recurring action (best effort).
        """

        if self._is_disposed:
            raise DisposedException()

        return super().schedule_periodic(period, action, state=state)

    def _has_thread(self) -> bool:
        """Checks if there is an event loop thread running."""
        with self._condition:
            return not self._is_disposed and self._thread is not None

    def _ensure_thread(self) -> None:
        """Ensures there is an event loop thread running. Should be
        called under the gate."""

        if not self._thread:
            thread = self._thread_factory(self.run)
            self._thread = thread
            thread.start()

    def run(self) -> None:
        """Event loop scheduled on the designated event loop thread.
        The loop is suspended/resumed using the condition which gets notified
        by calls to Schedule or calls to dispose."""

        ready: Deque[ScheduledItem] = deque()

        while True:

            with self._condition:

                # The notification could be because of a call to dispose. This
                # takes precedence over everything else: We'll exit the loop
                # immediately. Subsequent calls to Schedule won't ever create a
                # new thread.
                if self._is_disposed:
                    return

                # Sort the ready_list (from recent calls for immediate schedule)
                # and the due subset of previously queued items.
                time = self.now
                while self._queue:
                    due = self._queue.peek().duetime
                    while self._ready_list and due > self._ready_list[0].duetime:
                        ready.append(self._ready_list.popleft())
                    if due > time:
                        break
                    ready.append(self._queue.dequeue())
                while self._ready_list:
                    ready.append(self._ready_list.popleft())

            # Execute the gathered actions
            while ready:
                item = ready.popleft()
                if not item.is_cancelled():
                    item.invoke()

            # Wait for next cycle, or if we're done let's exit if so configured
            with self._condition:

                if self._ready_list:
                    continue

                elif self._queue:
                    time = self.now
                    item = self._queue.peek()
                    seconds = (item.duetime - time).total_seconds()
                    if seconds > 0:
                        log.debug("timeout: %s", seconds)
                        self._condition.wait(seconds)

                elif self._exit_if_empty:
                    self._thread = None
                    return

                else:
                    self._condition.wait()

    def dispose(self) -> None:
        """Ends the thread associated with this scheduler. All
        remaining work in the scheduler queue is abandoned.
        """

        with self._condition:
            if not self._is_disposed:
                self._is_disposed = True
                self._condition.notify()