1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285
|
from __future__ import with_statement
"""
This module is derived from the sched.py included with Python 2.6 . I have
modified it and am re-licensing it under the GPL v2, as is allowed by the PSF
license:
http://svn.python.org/view/python/trunk/LICENSE?revision=68142&view=markup
This software is licensed under the GPL (GNU General Public License) version 2
as it appears here: http://www.gnu.org/copyleft/gpl.html
It is also included with this archive as `gpl.txt <gpl.txt>`_.
--- below from the original module
A generally useful event scheduler class.
Each instance of this class manages its own queue.
No multi-threading is implied; you are supposed to hack that
yourself, or use a single instance per application.
Each instance is parametrized with two functions, one that is
supposed to return the current time, one that is supposed to
implement a delay. You can implement real-time scheduling by
substituting time and sleep from built-in module time, or you can
implement simulated time by writing your own functions. This can
also be used to integrate scheduling with STDWIN events; the delay
function is allowed to modify the queue. Time can be expressed as
integers or floating point numbers, as long as it is consistent.
Events are specified by tuples (time, priority, action, argument).
As in UNIX, lower priority numbers mean higher priority; in this
way the queue can be maintained as a priority queue. Execution of the
event means calling the action function, passing it the argument
sequence in "argument" (remember that in Python, multiple function
arguments are be packed in a sequence).
The action function may be an instance method so it
has another way to reference private data (besides global variables).
"""
# XXX The timefunc and delayfunc should have been defined as methods
# XXX so you can define new kinds of schedulers using subclassing
# XXX instead of having to define a module or class just to hold
# XXX the global state of your particular time and delay functions.
import heapq
__all__ = ["scheduler"]
class Event(object):
attrs = 'time priority action argument'.split()
__slots__ = '_values', '_cancelled'
def __init__(self, time, priority, action, argument):
self._values = (time, priority, action, argument)
self._cancelled = 0
def __cmp__(self, other):
return cmp(self._values, other._values) or cmp(id(self), id(other))
def __iter__(self):
for i in self._values:
yield i
def __getattr__(self, key):
if key == '_cancelled':
return self._cancelled
return self._values[self.attrs.index(key)]
def _get_cancelled(self):
return self._cancelled
def _set_cancelled(self, value):
if not value:
if self._cancelled:
raise Exception("You can't un-cancel an event.")
return
self._cancelled = 1
cancelled = property(_get_cancelled, _set_cancelled)
del _get_cancelled, _set_cancelled
def __repr__(self):
return '%s(%s, %s, %s, %s)'%(self.__class__.__name__, self.time, self.priority, self.action, self.argument)
def LocalSynchronize(method):
def Call(self, *args, **kwargs):
# slight performance improvement over using FakeLock all the time for all
# method calls
if self.lock is FakeLock:
return method(self, *args, **kwargs)
else:
with self.lock:
return method(self, *args, **kwargs)
Call.__name__ = method.__name__
return Call
class FakeLock(object):
def __enter__(self):
return
def __exit__(self, type, value, traceback):
return
def __call__(self):
return self
FakeLock = FakeLock()
class scheduler(object):
def __init__(self, timefunc, delayfunc, locked=False):
"""Initialize a new instance, passing the time and delay
functions.
If the optional locked argument (defaulting to false) is true, this
scheduler will use a threading.RLock() instance to guarantee internal
consistancy.
"""
self._queue = []
self.timefunc = timefunc
self.delayfunc = delayfunc
self.cancelled = 0
if not locked:
self.lock = FakeLock
else:
import threading
self.lock = threading.RLock()
@LocalSynchronize
def enterabs(self, time, priority, action, argument):
"""Enter a new event in the queue at an absolute time.
Returns an ID for the event which can be used to remove it,
if necessary.
"""
event = Event(time, priority, action, argument)
heapq.heappush(self._queue, event)
return event # The ID
@LocalSynchronize
def enter(self, delay, priority, action, argument):
"""A variant that specifies the time as a relative time.
This is actually the more commonly used interface.
"""
time = self.timefunc() + delay
return self.enterabs(time, priority, action, argument)
@LocalSynchronize
def peek(self):
"""Will return the first item in the queue in a properly synchronized
manner.
An empty queue will raise an IndexError.
"""
return self._queue[0]
@LocalSynchronize
def cancel(self, event):
"""Remove an event from the queue.
This must be presented the ID as returned by enter().
"""
self.cancelled += 1
event.cancelled = 1
if self._ok_clear():
self._clear_cancelled()
@LocalSynchronize
def empty(self):
"""Check whether the queue is empty."""
return not self._queue
def _ok_clear(self):
"""Clear cancelled events when the queue is at least 128 events and more
than half of the events are cancelled, or clear regardless of the event
count when cancelled events are at least 7/8 of all events in the queue.
"""
lq = len(self._queue)
return (lq > 128 and self.cancelled > (lq>>1)) or (self.cancelled<<3 > lq*7)
@LocalSynchronize
def _clear_cancelled(self):
"""Clears all cancelled events and re-heapifies the schedule.
Note the following:
>>> a = sched.queue
>>> sched._clear_cancelled()
>>> b = sched.queue
After the above is executed, a and b may not be the same!
"""
self._queue[:] = [i for i in self._queue if not i.cancelled]
heapq.heapify(self._queue)
self.cancelled = 0
def run(self, anow=None):
"""Execute events until the queue is empty.
When there is a positive delay until the first event, the
delay function is called and the event is left in the queue;
otherwise, the event is removed from the queue and executed
(its action function is called, passing it the argument). If
the delay function returns prematurely, it is simply
restarted.
It is legal for both the delay function and the action
function to to modify the queue or to raise an exception;
exceptions are not caught but the scheduler's state remains
well-defined so run() may be called again.
A questionable hack is added to allow other threads to run:
just after an event is executed, a delay of 0 is executed, to
avoid monopolizing the CPU when other threads are also
runnable.
"""
# localize variable access to minimize overhead
# and to improve thread safety
q = self._queue
delayfunc = self.delayfunc
timefunc = self.timefunc
pop = heapq.heappop
push = heapq.heappush
peek = self.peek
lock = self.lock
if self._ok_clear():
self._clear_cancelled()
while q and ((anow is None) or (q[0].time <= anow)):
time, priority, action, argument = checked_event = q[0]
now = timefunc()
if now < time:
delayfunc(time - now)
else:
with lock:
event = pop(q)
# Verify that the event was not removed or altered
# by another thread after we last looked at q[0].
if event is checked_event:
if event.cancelled:
with lock:
self.cancelled -= 1
else:
action(*argument)
delayfunc(0) # Let other threads run
else:
with lock:
push(q, event)
@LocalSynchronize
def getqueue(self, now=None, copy=True, auto_clear=True):
"""An ordered list of upcoming events.
Events are Event objects with fields for:
time, priority, action, arguments
"""
# Use heapq to sort the queue rather than using 'sorted(self._queue)'.
# With heapq, two events scheduled at the same time will show in
# the actual order they would be retrieved.
if copy:
events = self._queue[:]
else:
events = self._queue
if auto_clear and self._ok_clear():
self._clear_cancelled()
out = []
pop = heapq.heappop
while events and ((now is None) or (events[0].time <= now)):
event = pop(events)
if not event.cancelled:
out.append(event)
elif not copy: # the event was cancelled, and this is not a copy
self.cancelled -= 1
return out
@property
@LocalSynchronize
def queue(self):
return self.getqueue()
|