File: pygamescheduler.py

package info (click to toggle)
python-rx 4.1.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 4,204 kB
  • sloc: python: 39,525; javascript: 77; makefile: 24
file content (113 lines) | stat: -rw-r--r-- 3,514 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
import logging
import threading
from typing import Any, Optional, TypeVar

from reactivex import abc, typing
from reactivex.internal import PriorityQueue
from reactivex.internal.constants import DELTA_ZERO

from ..periodicscheduler import PeriodicScheduler
from ..scheduleditem import ScheduledItem

_TState = TypeVar("_TState")

log = logging.getLogger("Rx")


class PyGameScheduler(PeriodicScheduler):
    """A scheduler that schedules works for PyGame.

    Note that this class expects the caller to invoke run() repeatedly.

    http://www.pygame.org/docs/ref/time.html
    http://www.pygame.org/docs/ref/event.html"""

    def __init__(self, pygame: Any):
        """Create a new PyGameScheduler.

        Args:
            pygame: The PyGame module to use; typically, you would get this by
                import pygame
        """

        super().__init__()
        self._pygame = pygame  # TODO not used, refactor to actually use pygame?
        self._lock = threading.Lock()
        self._queue: PriorityQueue[ScheduledItem] = PriorityQueue()

    def schedule(
        self, action: typing.ScheduledAction[_TState], state: Optional[_TState] = None
    ) -> abc.DisposableBase:
        """Schedules an action to be executed.

        Args:
            action: Action to be executed.
            state: [Optional] state to be given to the action function.

        Returns:
            The disposable object used to cancel the scheduled action
            (best effort).
        """

        log.debug("PyGameScheduler.schedule(state=%s)", state)
        return self.schedule_absolute(self.now, action, state=state)

    def schedule_relative(
        self,
        duetime: typing.RelativeTime,
        action: typing.ScheduledAction[_TState],
        state: Optional[_TState] = None,
    ) -> abc.DisposableBase:
        """Schedules an action to be executed after duetime.
        Args:
            duetime: Relative time after which to execute the action.
            action: Action to be executed.
            state: [Optional] state to be given to the action function.

        Returns:
            The disposable object used to cancel the scheduled action
            (best effort).
        """

        duetime = max(DELTA_ZERO, self.to_timedelta(duetime))
        return self.schedule_absolute(self.now + duetime, action, state=state)

    def schedule_absolute(
        self,
        duetime: typing.AbsoluteTime,
        action: typing.ScheduledAction[_TState],
        state: Optional[_TState] = None,
    ) -> abc.DisposableBase:
        """Schedules an action to be executed at duetime.

        Args:
            duetime: Absolute time at which to execute the action.
            action: Action to be executed.
            state: [Optional] state to be given to the action function.

        Returns:
            The disposable object used to cancel the scheduled action
            (best effort).
        """

        duetime = self.to_datetime(duetime)
        si: ScheduledItem = ScheduledItem(self, state, action, duetime)

        with self._lock:
            self._queue.enqueue(si)

        return si.disposable

    def run(self) -> None:
        while self._queue:
            with self._lock:
                item: ScheduledItem = self._queue.peek()
                diff = item.duetime - self.now

                if diff > DELTA_ZERO:
                    break

                item = self._queue.dequeue()

            if not item.is_cancelled():
                item.invoke()