1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96
|
"""Synchronization primitives."""
from .abstract import Thenable
from .promises import promise
__all__ = ['barrier']
class barrier:
"""Barrier.
Synchronization primitive to call a callback after a list
of promises have been fulfilled.
Example:
.. code-block:: python
# Request supports the .then() method.
p1 = http.Request('http://a')
p2 = http.Request('http://b')
p3 = http.Request('http://c')
requests = [p1, p2, p3]
def all_done():
pass # all requests complete
b = barrier(requests).then(all_done)
# oops, we forgot we want another request
b.add(http.Request('http://d'))
Note that you cannot add new promises to a barrier after
the barrier is fulfilled.
"""
def __init__(self, promises=None, args=None, kwargs=None,
callback=None, size=None):
self.p = promise()
self.args = args or ()
self.kwargs = kwargs or {}
self._value = 0
self.size = size or 0
if not self.size and promises:
# iter(l) calls len(l) so generator wrappers
# can only return NotImplemented in the case the
# generator is not fully consumed yet.
plen = promises.__len__()
if plen is not NotImplemented:
self.size = plen
self.ready = self.failed = False
self.reason = None
self.cancelled = False
self.finalized = False
[self.add_noincr(p) for p in promises or []]
self.finalized = bool(promises or self.size)
if callback:
self.then(callback)
def __call__(self, *args, **kwargs):
if not self.ready and not self.cancelled:
self._value += 1
if self.finalized and self._value >= self.size:
self.ready = True
self.p(*self.args, **self.kwargs)
def finalize(self):
if not self.finalized and self._value >= self.size:
self.p(*self.args, **self.kwargs)
self.finalized = True
def cancel(self):
self.cancelled = True
self.p.cancel()
def add_noincr(self, p):
if not self.cancelled:
if self.ready:
raise ValueError('Cannot add promise to full barrier')
p.then(self)
def add(self, p):
if not self.cancelled:
self.add_noincr(p)
self.size += 1
def then(self, callback, errback=None):
self.p.then(callback, errback)
def throw(self, *args, **kwargs):
if not self.cancelled:
self.p.throw(*args, **kwargs)
throw1 = throw
Thenable.register(barrier) # noqa: E305
|