1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
|
from datetime import datetime
from typing import Any, Optional, TypeVar
from reactivex import abc
from reactivex.disposable import SingleAssignmentDisposable
from .scheduler import Scheduler
_TState = TypeVar("_TState")
class ScheduledItem(object):
def __init__(
self,
scheduler: Scheduler,
state: Optional[_TState],
action: abc.ScheduledAction[_TState],
duetime: datetime,
) -> None:
self.scheduler: Scheduler = scheduler
self.state: Optional[Any] = state
self.action: abc.ScheduledAction[_TState] = action
self.duetime: datetime = duetime
self.disposable: SingleAssignmentDisposable = SingleAssignmentDisposable()
def invoke(self) -> None:
ret = self.scheduler.invoke_action(self.action, state=self.state)
self.disposable.disposable = ret
def cancel(self) -> None:
"""Cancels the work item by disposing the resource returned by
invoke_core as soon as possible."""
self.disposable.dispose()
def is_cancelled(self) -> bool:
return self.disposable.is_disposed
def __lt__(self, other: "ScheduledItem") -> bool:
return self.duetime < other.duetime
def __gt__(self, other: "ScheduledItem") -> bool:
return self.duetime > other.duetime
def __eq__(self, other: Any) -> bool:
try:
return self.duetime == other.duetime
except AttributeError:
return NotImplemented
|