1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204
|
"""
Classes for calling functions a schedule. Has time zone support.
For example, to run a job at 08:00 every morning in 'Asia/Calcutta':
>>> import zoneinfo
>>> job = lambda: print("time is now", datetime.datetime())
>>> time = datetime.time(8, tzinfo=zoneinfo.ZoneInfo('Asia/Calcutta'))
>>> cmd = PeriodicCommandFixedDelay.daily_at(time, job)
>>> sched = InvokeScheduler()
>>> sched.add(cmd)
>>> while True: # doctest: +SKIP
... sched.run_pending()
... time.sleep(.1)
By default, the scheduler uses timezone-aware times in UTC. A
client may override the default behavior by overriding ``now``
and ``from_timestamp`` functions.
>>> now()
datetime.datetime(...utc)
>>> from_timestamp(1718723533.7685602)
datetime.datetime(...utc)
"""
import datetime
import numbers
import abc
import bisect
from .utc import now, fromtimestamp as from_timestamp
class DelayedCommand(datetime.datetime):
"""
A command to be executed after some delay (seconds or timedelta).
"""
@classmethod
def from_datetime(cls, other):
return cls(
other.year,
other.month,
other.day,
other.hour,
other.minute,
other.second,
other.microsecond,
other.tzinfo,
)
@classmethod
def after(cls, delay, target):
if not isinstance(delay, datetime.timedelta):
delay = datetime.timedelta(seconds=delay)
due_time = now() + delay
cmd = cls.from_datetime(due_time)
cmd.delay = delay
cmd.target = target
return cmd
@staticmethod
def _from_timestamp(input):
"""
If input is a real number, interpret it as a Unix timestamp
(seconds sinc Epoch in UTC) and return a timezone-aware
datetime object. Otherwise return input unchanged.
"""
if not isinstance(input, numbers.Real):
return input
return from_timestamp(input)
@classmethod
def at_time(cls, at, target):
"""
Construct a DelayedCommand to come due at `at`, where `at` may be
a datetime or timestamp.
"""
at = cls._from_timestamp(at)
cmd = cls.from_datetime(at)
cmd.delay = at - now()
cmd.target = target
return cmd
def due(self):
return now() >= self
class PeriodicCommand(DelayedCommand):
"""
Like a delayed command, but expect this command to run every delay
seconds.
"""
def _next_time(self):
"""
Add delay to self, localized
"""
return self + self.delay
def next(self):
cmd = self.__class__.from_datetime(self._next_time())
cmd.delay = self.delay
cmd.target = self.target
return cmd
def __setattr__(self, key, value):
if key == 'delay' and not value > datetime.timedelta():
raise ValueError("A PeriodicCommand must have a positive, non-zero delay.")
super().__setattr__(key, value)
class PeriodicCommandFixedDelay(PeriodicCommand):
"""
Like a periodic command, but don't calculate the delay based on
the current time. Instead use a fixed delay following the initial
run.
"""
@classmethod
def at_time(cls, at, delay, target):
"""
>>> cmd = PeriodicCommandFixedDelay.at_time(0, 30, None)
>>> cmd.delay.total_seconds()
30.0
"""
at = cls._from_timestamp(at)
cmd = cls.from_datetime(at)
if isinstance(delay, numbers.Number):
delay = datetime.timedelta(seconds=delay)
cmd.delay = delay
cmd.target = target
return cmd
@classmethod
def daily_at(cls, at, target):
"""
Schedule a command to run at a specific time each day.
>>> from tempora import utc
>>> noon = utc.time(12, 0)
>>> cmd = PeriodicCommandFixedDelay.daily_at(noon, None)
>>> cmd.delay.total_seconds()
86400.0
"""
daily = datetime.timedelta(days=1)
# convert when to the next datetime matching this time
when = datetime.datetime.combine(datetime.date.today(), at)
when -= daily
while when < now():
when += daily
return cls.at_time(when, daily, target)
class Scheduler:
"""
A rudimentary abstract scheduler accepting DelayedCommands
and dispatching them on schedule.
"""
def __init__(self):
self.queue = []
def add(self, command):
assert isinstance(command, DelayedCommand)
bisect.insort(self.queue, command)
def run_pending(self):
while self.queue:
command = self.queue[0]
if not command.due():
break
self.run(command)
if isinstance(command, PeriodicCommand):
self.add(command.next())
del self.queue[0]
@abc.abstractmethod
def run(self, command):
"""
Run the command
"""
class InvokeScheduler(Scheduler):
"""
Command targets are functions to be invoked on schedule.
"""
def run(self, command):
command.target()
class CallbackScheduler(Scheduler):
"""
Command targets are passed to a dispatch callable on schedule.
"""
def __init__(self, dispatch):
super().__init__()
self.dispatch = dispatch
def run(self, command):
self.dispatch(command.target)
|