File: immediatescheduler.py

package info (click to toggle)
python-rx 4.0.4-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 4,056 kB
  • sloc: python: 39,070; javascript: 77; makefile: 24
file content (96 lines) | stat: -rw-r--r-- 3,121 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
from threading import Lock
from typing import MutableMapping, Optional, TypeVar
from weakref import WeakKeyDictionary

from reactivex import abc, typing
from reactivex.internal.constants import DELTA_ZERO
from reactivex.internal.exceptions import WouldBlockException

from .scheduler import Scheduler

_TState = TypeVar("_TState")


class ImmediateScheduler(Scheduler):
    """Represents an object that schedules units of work to run immediately,
    on the current thread. You're not allowed to schedule timeouts using the
    ImmediateScheduler since that will block the current thread while waiting.
    Attempts to do so will raise a :class:`WouldBlockException`.
    """

    _lock = Lock()
    _global: MutableMapping[type, "ImmediateScheduler"] = WeakKeyDictionary()

    @classmethod
    def singleton(cls) -> "ImmediateScheduler":
        with ImmediateScheduler._lock:
            try:
                self = ImmediateScheduler._global[cls]
            except KeyError:
                self = super().__new__(cls)
                ImmediateScheduler._global[cls] = self
        return self

    def __new__(cls) -> "ImmediateScheduler":
        return cls.singleton()

    def schedule(
        self, action: typing.ScheduledAction[_TState], state: Optional[_TState] = None
    ) -> abc.DisposableBase:
        """Schedules an action to be executed.

        Args:
            action: Action to be executed.
            state: [Optional] state to be given to the action function.

        Returns:
            The disposable object used to cancel the scheduled action
            (best effort).
        """

        return self.invoke_action(action, state)

    def schedule_relative(
        self,
        duetime: typing.RelativeTime,
        action: typing.ScheduledAction[_TState],
        state: Optional[_TState] = None,
    ) -> abc.DisposableBase:
        """Schedules an action to be executed after duetime.

        Args:
            duetime: Relative time after which to execute the action.
            action: Action to be executed.
            state: [Optional] state to be given to the action function.

        Returns:
            The disposable object used to cancel the scheduled action
            (best effort).
        """

        duetime = self.to_timedelta(duetime)
        if duetime > DELTA_ZERO:
            raise WouldBlockException()

        return self.invoke_action(action, state)

    def schedule_absolute(
        self,
        duetime: typing.AbsoluteTime,
        action: typing.ScheduledAction[_TState],
        state: Optional[_TState] = None,
    ) -> abc.DisposableBase:
        """Schedules an action to be executed at duetime.

        Args:
            duetime: Absolute time at which to execute the action.
            action: Action to be executed.
            state: [Optional] state to be given to the action function.

        Returns:
            The disposable object used to cancel the scheduled action
            (best effort).
        """

        duetime = self.to_datetime(duetime)
        return self.schedule_relative(duetime - self.now, action, state)