File: scheduler.py

package info (click to toggle)
python-moderngl-window 2.4.6-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 69,220 kB
  • sloc: python: 11,387; makefile: 21
file content (142 lines) | stat: -rw-r--r-- 5,239 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
import sched
import time
from moderngl_window.timers.base import BaseTimer


class Scheduler:
    def __init__(self, timer: BaseTimer):
        """Create a Scheduler object to handle events.

        Args:
            timer (BaseTimer): timer to use, subclass of BaseTimer.

        Raises:
            ValueError: timer is not a valid argument.
        """
        if not isinstance(timer, BaseTimer):
            raise ValueError(
                "timer, {}, has to be a instance of BaseTimer or a callable!".format(
                    timer
                )
            )

        self._events = dict()
        self._event_id = 0

        self._scheduler = sched.scheduler(lambda: timer.time, time.sleep)

    def run_once(
        self, action, delay: float, *, priority: int = 1, arguments=(), kwargs=dict()
    ) -> int:
        """Schedule a function for execution after a delay.

        Args:
            action (callable): function to be called.
            delay (float): delay in seconds.
            priority (int, optional): priority for this event, lower is more important. Defaults to 1.
            arguments (tuple, optional): arguments for the action. Defaults to ().
            kwargs (dict, optional): keyword arguments for the action. Defaults to dict().

        Returns:
            int: event id that can be canceled.
        """
        event = self._scheduler.enter(delay, priority, action, arguments, kwargs)
        self._events[self._event_id] = event
        self._event_id += 1
        return self._event_id - 1

    def run_at(
        self, action, time: float, *, priority: int = 1, arguments=(), kwargs=dict()
    ) -> int:
        """Schedule a function to be executed at a certain time.

        Args:
            action (callable): function to be called.
            time (float): epoch time at which the function should be called.
            priority (int, optional): priority for this event, lower is more important. Defaults to 1.
            arguments (tuple, optional): arguments for the action. Defaults to ().
            kwargs (dict, optional): keyword arguments for the action. Defaults to dict().

        Returns:
            int: event id that can be canceled.
        """
        event = self._scheduler.enterabs(time, priority, action, arguments, kwargs)
        self._events[self._event_id] = event
        self._event_id += 1
        return self._event_id - 1

    def run_every(
        self,
        action,
        delay: float,
        *,
        priority: int = 1,
        initial_delay: float = 0.0,
        arguments=(),
        kwargs=dict()
    ) -> int:
        """Schedule a recurring function to be called every `delay` seconds after a initial delay.

        Args:
            action (callable): function to be called.
            delay (float): delay in seconds.
            priority (int, optional): priority for this event, lower is more important. Defaults to 1.
            initial_delay (float, optional): initial delay in seconds before executing for the first time.
            Defaults to 0. arguments (tuple, optional): arguments for the action. Defaults to ().
            kwargs (dict, optional): keyword arguments for the action. Defaults to dict().

        Returns:
            int: event id that can be canceled.
        """
        recurring_event = self._recurring_event_factory(
            action, arguments, kwargs, (delay, priority), self._event_id
        )
        event = self._scheduler.enter(initial_delay, priority, recurring_event)
        self._events[self._event_id] = event
        self._event_id += 1
        return self._event_id - 1

    def _recurring_event_factory(
        self, function, arguments, kwargs, scheduling_info, id
    ):
        """Factory for creating recurring events that will reschedule themselves.

        Args:
            function (callable): function to be called.
            arguments (tuple): arguments for the function.
            kwargs (dict): keyword arguments for the function.
            scheduling_info (tuple): tuple of information for scheduling the task.
            id (int): event id this event should be assigned to.
        """

        def _f():
            function(*arguments, **kwargs)
            event = self._scheduler.enter(*scheduling_info, _f)
            self._events[id] = event

        return _f

    def execute(self) -> None:
        """Run the scheduler without blocking and execute any expired events.
        """
        self._scheduler.run(blocking=False)

    def cancel(self, event_id: int, delay: float = 0) -> None:
        """Cancel a previously scheduled event.

        Args:
            event_id (int): event to be canceled
            delay (float, optional): delay before canceling the event. Defaults to 0.
        """
        if delay == 0:
            self._cancel(event_id)
        else:
            self.run_once(self._cancel, delay, priority=0, arguments=(event_id,))

    def _cancel(self, event_id: int):
        if event_id not in self._events:
            raise ValueError(
                "Recurring event with id {} does not exist".format(event_id)
            )
        event = self._events.pop(event_id)
        self._scheduler.cancel(event)