1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131
|
import sys
from gevent.hub import get_hub, getcurrent
from gevent.timeout import Timeout
__all__ = ['Semaphore']
class Semaphore(object):
"""A semaphore manages a counter representing the number of release() calls minus the number of acquire() calls,
plus an initial value. The acquire() method blocks if necessary until it can return without making the counter
negative.
If not given, value defaults to 1.
This Semaphore's __exit__ method does not call the trace function.
"""
def __init__(self, value=1):
if value < 0:
raise ValueError("semaphore initial value must be >= 0")
self._links = []
self.counter = value
self._notifier = None
# we don't want to do get_hub() here to allow module-level locks
# without initializing the hub
def __str__(self):
params = (self.__class__.__name__, self.counter, len(self._links))
return '<%s counter=%s _links[%s]>' % params
def locked(self):
return self.counter <= 0
def release(self):
self.counter += 1
self._start_notify()
def _start_notify(self):
if self._links and self.counter > 0 and not self._notifier:
self._notifier = get_hub().loop.run_callback(self._notify_links)
def _notify_links(self):
while True:
self._dirty = False
for link in self._links:
if self.counter <= 0:
return
try:
link(self)
except:
getcurrent().handle_error((link, self), *sys.exc_info())
if self._dirty:
break
if not self._dirty:
return
def rawlink(self, callback):
"""Register a callback to call when a counter is more than zero.
*callback* will be called in the :class:`Hub <gevent.hub.Hub>`, so it must not use blocking gevent API.
*callback* will be passed one argument: this instance.
"""
if not callable(callback):
raise TypeError('Expected callable: %r' % (callback, ))
self._links.append(callback)
self._dirty = True
def unlink(self, callback):
"""Remove the callback set by :meth:`rawlink`"""
try:
self._links.remove(callback)
self._dirty = True
except ValueError:
pass
def wait(self, timeout=None):
if self.counter > 0:
return self.counter
else:
switch = getcurrent().switch
self.rawlink(switch)
try:
timer = Timeout.start_new(timeout)
try:
try:
result = get_hub().switch()
assert result is self, 'Invalid switch into Semaphore.wait(): %r' % (result, )
except Timeout:
ex = sys.exc_info()[1]
if ex is not timer:
raise
finally:
timer.cancel()
finally:
self.unlink(switch)
return self.counter
def acquire(self, blocking=True, timeout=None):
if self.counter > 0:
self.counter -= 1
return True
elif not blocking:
return False
else:
switch = getcurrent().switch
self.rawlink(switch)
try:
timer = Timeout.start_new(timeout)
try:
try:
result = get_hub().switch()
assert result is self, 'Invalid switch into Semaphore.acquire(): %r' % (result, )
except Timeout:
ex = sys.exc_info()[1]
if ex is timer:
return False
raise
finally:
timer.cancel()
finally:
self.unlink(switch)
self.counter -= 1
assert self.counter >= 0
return True
def __enter__(self):
self.acquire()
def __exit__(self, *args):
self.release()
|