1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96
|
from threading import Lock
from typing import MutableMapping, Optional, TypeVar
from weakref import WeakKeyDictionary
from reactivex import abc, typing
from reactivex.internal.constants import DELTA_ZERO
from reactivex.internal.exceptions import WouldBlockException
from .scheduler import Scheduler
_TState = TypeVar("_TState")
class ImmediateScheduler(Scheduler):
"""Represents an object that schedules units of work to run immediately,
on the current thread. You're not allowed to schedule timeouts using the
ImmediateScheduler since that will block the current thread while waiting.
Attempts to do so will raise a :class:`WouldBlockException`.
"""
_lock = Lock()
_global: MutableMapping[type, "ImmediateScheduler"] = WeakKeyDictionary()
@classmethod
def singleton(cls) -> "ImmediateScheduler":
with ImmediateScheduler._lock:
try:
self = ImmediateScheduler._global[cls]
except KeyError:
self = super().__new__(cls)
ImmediateScheduler._global[cls] = self
return self
def __new__(cls) -> "ImmediateScheduler":
return cls.singleton()
def schedule(
self, action: typing.ScheduledAction[_TState], state: Optional[_TState] = None
) -> abc.DisposableBase:
"""Schedules an action to be executed.
Args:
action: Action to be executed.
state: [Optional] state to be given to the action function.
Returns:
The disposable object used to cancel the scheduled action
(best effort).
"""
return self.invoke_action(action, state)
def schedule_relative(
self,
duetime: typing.RelativeTime,
action: typing.ScheduledAction[_TState],
state: Optional[_TState] = None,
) -> abc.DisposableBase:
"""Schedules an action to be executed after duetime.
Args:
duetime: Relative time after which to execute the action.
action: Action to be executed.
state: [Optional] state to be given to the action function.
Returns:
The disposable object used to cancel the scheduled action
(best effort).
"""
duetime = self.to_timedelta(duetime)
if duetime > DELTA_ZERO:
raise WouldBlockException()
return self.invoke_action(action, state)
def schedule_absolute(
self,
duetime: typing.AbsoluteTime,
action: typing.ScheduledAction[_TState],
state: Optional[_TState] = None,
) -> abc.DisposableBase:
"""Schedules an action to be executed at duetime.
Args:
duetime: Absolute time at which to execute the action.
action: Action to be executed.
state: [Optional] state to be given to the action function.
Returns:
The disposable object used to cancel the scheduled action
(best effort).
"""
duetime = self.to_datetime(duetime)
return self.schedule_relative(duetime - self.now, action, state)
|