1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136
|
import logging
import threading
from datetime import datetime
from typing import Optional, TypeVar
from reactivex import abc, typing
from reactivex.disposable import Disposable
from reactivex.internal.concurrency import default_thread_factory
from .eventloopscheduler import EventLoopScheduler
from .periodicscheduler import PeriodicScheduler
_TState = TypeVar("_TState")
log = logging.getLogger("Rx")
class NewThreadScheduler(PeriodicScheduler):
"""Creates an object that schedules each unit of work on a separate thread."""
def __init__(
self, thread_factory: Optional[typing.StartableFactory] = None
) -> None:
super().__init__()
self.thread_factory: typing.StartableFactory = (
thread_factory or default_thread_factory
)
def schedule(
self, action: typing.ScheduledAction[_TState], state: Optional[_TState] = None
) -> abc.DisposableBase:
"""Schedules an action to be executed.
Args:
action: Action to be executed.
state: [Optional] state to be given to the action function.
Returns:
The disposable object used to cancel the scheduled action
(best effort).
"""
scheduler = EventLoopScheduler(
thread_factory=self.thread_factory, exit_if_empty=True
)
return scheduler.schedule(action, state)
def schedule_relative(
self,
duetime: typing.RelativeTime,
action: typing.ScheduledAction[_TState],
state: Optional[_TState] = None,
) -> abc.DisposableBase:
"""Schedules an action to be executed after duetime.
Args:
duetime: Relative time after which to execute the action.
action: Action to be executed.
state: [Optional] state to be given to the action function.
Returns:
The disposable object used to cancel the scheduled action
(best effort).
"""
scheduler = EventLoopScheduler(
thread_factory=self.thread_factory, exit_if_empty=True
)
return scheduler.schedule_relative(duetime, action, state)
def schedule_absolute(
self,
duetime: typing.AbsoluteTime,
action: typing.ScheduledAction[_TState],
state: Optional[_TState] = None,
) -> abc.DisposableBase:
"""Schedules an action to be executed at duetime.
Args:
duetime: Absolute time at which to execute the action.
action: Action to be executed.
state: [Optional] state to be given to the action function.
Returns:
The disposable object used to cancel the scheduled action
(best effort).
"""
dt = self.to_datetime(duetime)
return self.schedule_relative(dt - self.now, action, state=state)
def schedule_periodic(
self,
period: typing.RelativeTime,
action: typing.ScheduledPeriodicAction[_TState],
state: Optional[_TState] = None,
) -> abc.DisposableBase:
"""Schedules a periodic piece of work.
Args:
period: Period in seconds or timedelta for running the
work periodically.
action: Action to be executed.
state: [Optional] Initial state passed to the action upon
the first iteration.
Returns:
The disposable object used to cancel the scheduled
recurring action (best effort).
"""
seconds: float = self.to_seconds(period)
timeout: float = seconds
disposed: threading.Event = threading.Event()
def run() -> None:
nonlocal state, timeout
while True:
if timeout > 0.0:
disposed.wait(timeout)
if disposed.is_set():
return
time: datetime = self.now
state = action(state)
timeout = seconds - (self.now - time).total_seconds()
thread = self.thread_factory(run)
thread.start()
def dispose() -> None:
disposed.set()
return Disposable(dispose)
|