1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
|
"""Clocked schedule Implementation."""
from celery import schedules
from celery.utils.time import maybe_make_aware
from .utils import NEVER_CHECK_TIMEOUT
class clocked(schedules.BaseSchedule):
"""clocked schedule.
Depends on PeriodicTask one_off=True
"""
def __init__(self, clocked_time, nowfun=None, app=None):
"""Initialize clocked."""
self.clocked_time = maybe_make_aware(clocked_time)
super().__init__(nowfun=nowfun, app=app)
def remaining_estimate(self, last_run_at):
return self.clocked_time - self.now()
def is_due(self, last_run_at):
rem_delta = self.remaining_estimate(None)
remaining_s = max(rem_delta.total_seconds(), 0)
if remaining_s == 0:
return schedules.schedstate(is_due=True, next=NEVER_CHECK_TIMEOUT)
return schedules.schedstate(is_due=False, next=remaining_s)
def __repr__(self):
return f'<clocked: {self.clocked_time}>'
def __eq__(self, other):
if isinstance(other, clocked):
return self.clocked_time == other.clocked_time
return False
def __ne__(self, other):
return not self.__eq__(other)
def __reduce__(self):
return self.__class__, (self.clocked_time, self.nowfun)
|