1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151
|
from functools import partial
try:
from collections.abc import Iterable
except ImportError:
from collections import Iterable
if False:
from .promise import Promise
from typing import (
Any,
Optional,
Tuple,
Union,
List,
Type,
Collection,
) # flake8: noqa
class PromiseList(object):
__slots__ = ("_values", "_length", "_total_resolved", "promise", "_promise_class")
def __init__(self, values, promise_class):
# type: (Union[Collection, Promise[Collection]], Type[Promise]) -> None
self._promise_class = promise_class
self.promise = self._promise_class()
self._length = 0
self._total_resolved = 0
self._values = None # type: Optional[Collection]
Promise = self._promise_class
if Promise.is_thenable(values):
values_as_promise = Promise._try_convert_to_promise(
values
)._target() # type: ignore
self._init_promise(values_as_promise)
else:
self._init(values) # type: ignore
def __len__(self):
# type: () -> int
return self._length
def _init_promise(self, values):
# type: (Promise[Collection]) -> None
if values.is_fulfilled:
values = values._value()
elif values.is_rejected:
self._reject(values._reason())
return
self.promise._is_async_guaranteed = True
values._then(self._init, self._reject)
return
def _init(self, values):
# type: (Collection) -> None
self._values = values
if not isinstance(values, Iterable):
err = Exception(
"PromiseList requires an iterable. Received {}.".format(repr(values))
)
self.promise._reject_callback(err, False)
return
if not values:
self._resolve([])
return
self._iterate(values)
return
def _iterate(self, values):
# type: (Collection[Any]) -> None
Promise = self._promise_class
is_resolved = False
self._length = len(values)
self._values = [None] * self._length
result = self.promise
for i, val in enumerate(values):
if Promise.is_thenable(val):
maybe_promise = Promise._try_convert_to_promise(val)._target()
# if is_resolved:
# # maybe_promise.suppressUnhandledRejections
# pass
if maybe_promise.is_pending:
maybe_promise._add_callbacks(
partial(self._promise_fulfilled, i=i),
self._promise_rejected,
None,
)
self._values[i] = maybe_promise
elif maybe_promise.is_fulfilled:
is_resolved = self._promise_fulfilled(maybe_promise._value(), i)
elif maybe_promise.is_rejected:
is_resolved = self._promise_rejected(maybe_promise._reason())
else:
is_resolved = self._promise_fulfilled(val, i)
if is_resolved:
break
if not is_resolved:
result._is_async_guaranteed = True
def _promise_fulfilled(self, value, i):
# type: (Any, int) -> bool
if self.is_resolved:
return False
# assert not self.is_resolved
# assert isinstance(self._values, Iterable)
# assert isinstance(i, int)
self._values[i] = value # type: ignore
self._total_resolved += 1
if self._total_resolved >= self._length:
self._resolve(self._values) # type: ignore
return True
return False
def _promise_rejected(self, reason):
# type: (Exception) -> bool
if self.is_resolved:
return False
# assert not self.is_resolved
# assert isinstance(self._values, Iterable)
self._total_resolved += 1
self._reject(reason)
return True
@property
def is_resolved(self):
# type: () -> bool
return self._values is None
def _resolve(self, value):
# type: (Collection[Any]) -> None
assert not self.is_resolved
assert not isinstance(value, self._promise_class)
self._values = None
self.promise._fulfill(value)
def _reject(self, reason):
# type: (Exception) -> None
assert not self.is_resolved
self._values = None
self.promise._reject_callback(reason, False)
|