File: wxscheduler.py

package info (click to toggle)
python-rx 4.0.4-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 4,056 kB
  • sloc: python: 39,070; javascript: 77; makefile: 24
file content (177 lines) | stat: -rw-r--r-- 5,522 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
import logging
from typing import Any, Optional, Set, TypeVar, cast

from reactivex import abc, typing
from reactivex.disposable import (
    CompositeDisposable,
    Disposable,
    SingleAssignmentDisposable,
)

from ..periodicscheduler import PeriodicScheduler

_TState = TypeVar("_TState")

log = logging.getLogger("Rx")


class WxScheduler(PeriodicScheduler):
    """A scheduler for a wxPython event loop."""

    def __init__(self, wx: Any) -> None:
        """Create a new WxScheduler.

        Args:
            wx: The wx module to use; typically, you would get this by
                import wx
        """

        super().__init__()
        self._wx = wx
        timer_class: Any = self._wx.Timer

        class Timer(timer_class):
            def __init__(self, callback: typing.Action) -> None:
                super().__init__()  # type: ignore
                self.callback = callback

            def Notify(self) -> None:
                self.callback()

        self._timer_class = Timer
        self._timers: Set[Timer] = set()

    def cancel_all(self) -> None:
        """Cancel all scheduled actions.

        Should be called when destroying wx controls to prevent
        accessing dead wx objects in actions that might be pending.
        """
        for timer in self._timers:
            timer.Stop()  # type: ignore

    def _wxtimer_schedule(
        self,
        time: typing.AbsoluteOrRelativeTime,
        action: typing.ScheduledSingleOrPeriodicAction[_TState],
        state: Optional[_TState] = None,
        periodic: bool = False,
    ) -> abc.DisposableBase:
        scheduler = self

        sad = SingleAssignmentDisposable()

        def interval() -> None:
            nonlocal state
            if periodic:
                state = cast(typing.ScheduledPeriodicAction[_TState], action)(state)
            else:
                sad.disposable = cast(typing.ScheduledAction[_TState], action)(
                    scheduler, state
                )

        msecs = max(1, int(self.to_seconds(time) * 1000.0))  # Must be non-zero

        log.debug("timeout wx: %s", msecs)

        timer = self._timer_class(interval)
        # A timer can only be used from the main thread
        if self._wx.IsMainThread():
            timer.Start(msecs, oneShot=not periodic)  # type: ignore
        else:
            self._wx.CallAfter(timer.Start, msecs, oneShot=not periodic)  # type: ignore
        self._timers.add(timer)

        def dispose() -> None:
            timer.Stop()  # type: ignore
            self._timers.remove(timer)

        return CompositeDisposable(sad, Disposable(dispose))

    def schedule(
        self, action: typing.ScheduledAction[_TState], state: Optional[_TState] = None
    ) -> abc.DisposableBase:
        """Schedules an action to be executed.

        Args:
            action: Action to be executed.
            state: [Optional] state to be given to the action function.

        Returns:
            The disposable object used to cancel the scheduled action
            (best effort).
        """
        sad = SingleAssignmentDisposable()
        is_disposed = False

        def invoke_action() -> None:
            if not is_disposed:
                sad.disposable = action(self, state)

        self._wx.CallAfter(invoke_action)

        def dispose() -> None:
            nonlocal is_disposed
            is_disposed = True

        return CompositeDisposable(sad, Disposable(dispose))

    def schedule_relative(
        self,
        duetime: typing.RelativeTime,
        action: typing.ScheduledAction[_TState],
        state: Optional[_TState] = None,
    ) -> abc.DisposableBase:
        """Schedules an action to be executed after duetime.

        Args:
            duetime: Relative time after which to execute the action.
            action: Action to be executed.
            state: [Optional] state to be given to the action function.

        Returns:
            The disposable object used to cancel the scheduled action
            (best effort).
        """
        return self._wxtimer_schedule(duetime, action, state=state)

    def schedule_absolute(
        self,
        duetime: typing.AbsoluteTime,
        action: typing.ScheduledAction[_TState],
        state: Optional[_TState] = None,
    ) -> abc.DisposableBase:
        """Schedules an action to be executed at duetime.

        Args:
            duetime: Absolute time at which to execute the action.
            action: Action to be executed.
            state: [Optional] state to be given to the action function.

        Returns:
            The disposable object used to cancel the scheduled action
            (best effort).
        """

        duetime = self.to_datetime(duetime)
        return self._wxtimer_schedule(duetime - self.now, action, state=state)

    def schedule_periodic(
        self,
        period: typing.RelativeTime,
        action: typing.ScheduledPeriodicAction[_TState],
        state: Optional[_TState] = None,
    ) -> abc.DisposableBase:
        """Schedules a periodic piece of work to be executed in the loop.

        Args:
             period: Period in seconds for running the work repeatedly.
             action: Action to be executed.
             state: [Optional] state to be given to the action function.

         Returns:
             The disposable object used to cancel the scheduled action
             (best effort).
        """

        return self._wxtimer_schedule(period, action, state=state, periodic=True)