1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172
|
from abc import abstractmethod
from datetime import datetime, timedelta, timezone
from typing import Optional, TypeVar
from reactivex import abc, typing
from reactivex.disposable import Disposable
from reactivex.internal.basic import default_now
from reactivex.internal.constants import UTC_ZERO
_TState = TypeVar("_TState")
class Scheduler(abc.SchedulerBase):
"""Base class for the various scheduler implementations in this package as
well as the mainloop sub-package. This does not include an implementation
of schedule_periodic, refer to PeriodicScheduler.
"""
@property
def now(self) -> datetime:
"""Represents a notion of time for this scheduler. Tasks being
scheduled on a scheduler will adhere to the time denoted by this
property.
Returns:
The scheduler's current time, as a datetime instance.
"""
return default_now()
@abstractmethod
def schedule(
self, action: abc.ScheduledAction[_TState], state: Optional[_TState] = None
) -> abc.DisposableBase:
"""Schedules an action to be executed.
Args:
action: Action to be executed.
state: [Optional] state to be given to the action function.
Returns:
The disposable object used to cancel the scheduled action
(best effort).
"""
return NotImplemented
@abstractmethod
def schedule_relative(
self,
duetime: typing.RelativeTime,
action: abc.ScheduledAction[_TState],
state: Optional[_TState] = None,
) -> abc.DisposableBase:
"""Schedules an action to be executed after duetime.
Args:
duetime: Relative time after which to execute the action.
action: Action to be executed.
state: [Optional] state to be given to the action function.
Returns:
The disposable object used to cancel the scheduled action
(best effort).
"""
return NotImplemented
@abstractmethod
def schedule_absolute(
self,
duetime: typing.AbsoluteTime,
action: abc.ScheduledAction[_TState],
state: Optional[_TState] = None,
) -> abc.DisposableBase:
"""Schedules an action to be executed at duetime.
Args:
duetime: Absolute time at which to execute the action.
action: Action to be executed.
state: [Optional] state to be given to the action function.
Returns:
The disposable object used to cancel the scheduled action
(best effort).
"""
return NotImplemented
def invoke_action(
self, action: abc.ScheduledAction[_TState], state: Optional[_TState] = None
) -> abc.DisposableBase:
"""Invoke the given given action. This is typically called by instances
of ScheduledItem.
Args:
action: Action to be executed.
state: [Optional] state to be given to the action function.
Returns:
The disposable object returned by the action, if any; or a new
(no-op) disposable otherwise.
"""
ret = action(self, state)
if isinstance(ret, abc.DisposableBase):
return ret
return Disposable()
@classmethod
def to_seconds(cls, value: typing.AbsoluteOrRelativeTime) -> float:
"""Converts time value to seconds. This method handles both absolute
(datetime) and relative (timedelta) values. If the argument is already
a float, it is simply returned unchanged.
Args:
value: the time value to convert to seconds.
Returns:
The value converted to seconds.
"""
if isinstance(value, datetime):
value = value - UTC_ZERO
if isinstance(value, timedelta):
value = value.total_seconds()
return value
@classmethod
def to_datetime(cls, value: typing.AbsoluteOrRelativeTime) -> datetime:
"""Converts time value to datetime. This method handles both absolute
(float) and relative (timedelta) values. If the argument is already
a datetime, it is simply returned unchanged.
Args:
value: the time value to convert to datetime.
Returns:
The value converted to datetime.
"""
if isinstance(value, timedelta):
value = UTC_ZERO + value
elif not isinstance(value, datetime):
value = datetime.fromtimestamp((value), tz=timezone.utc)
return value
@classmethod
def to_timedelta(cls, value: typing.AbsoluteOrRelativeTime) -> timedelta:
"""Converts time value to timedelta. This method handles both absolute
(datetime) and relative (float) values. If the argument is already
a timedelta, it is simply returned unchanged. If the argument is an
absolute time, the result value will be the timedelta since the epoch,
January 1st, 1970, 00:00:00.
Args:
value: the time value to convert to timedelta.
Returns:
The value converted to timedelta.
"""
if isinstance(value, datetime):
value = value - UTC_ZERO
elif not isinstance(value, timedelta):
value = timedelta(seconds=value)
return value
|