File: scheduleditem.py

package info (click to toggle)
python-rx 4.0.4-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 4,056 kB
  • sloc: python: 39,070; javascript: 77; makefile: 24
file content (49 lines) | stat: -rw-r--r-- 1,484 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
from datetime import datetime
from typing import Any, Optional, TypeVar

from reactivex import abc
from reactivex.disposable import SingleAssignmentDisposable

from .scheduler import Scheduler

_TState = TypeVar("_TState")


class ScheduledItem(object):
    def __init__(
        self,
        scheduler: Scheduler,
        state: Optional[_TState],
        action: abc.ScheduledAction[_TState],
        duetime: datetime,
    ) -> None:
        self.scheduler: Scheduler = scheduler
        self.state: Optional[Any] = state
        self.action: abc.ScheduledAction[_TState] = action
        self.duetime: datetime = duetime
        self.disposable: SingleAssignmentDisposable = SingleAssignmentDisposable()

    def invoke(self) -> None:
        ret = self.scheduler.invoke_action(self.action, state=self.state)
        self.disposable.disposable = ret

    def cancel(self) -> None:
        """Cancels the work item by disposing the resource returned by
        invoke_core as soon as possible."""

        self.disposable.dispose()

    def is_cancelled(self) -> bool:
        return self.disposable.is_disposed

    def __lt__(self, other: "ScheduledItem") -> bool:
        return self.duetime < other.duetime

    def __gt__(self, other: "ScheduledItem") -> bool:
        return self.duetime > other.duetime

    def __eq__(self, other: Any) -> bool:
        try:
            return self.duetime == other.duetime
        except AttributeError:
            return NotImplemented