File: promise_list.py

package info (click to toggle)
python-promise 2.3.0-4
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 400 kB
  • sloc: python: 2,681; sh: 13; makefile: 4
file content (151 lines) | stat: -rw-r--r-- 4,646 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
from functools import partial
try:
    from collections.abc import Iterable
except ImportError:
    from collections import Iterable

if False:
    from .promise import Promise
    from typing import (
        Any,
        Optional,
        Tuple,
        Union,
        List,
        Type,
        Collection,
    )  # flake8: noqa


class PromiseList(object):

    __slots__ = ("_values", "_length", "_total_resolved", "promise", "_promise_class")

    def __init__(self, values, promise_class):
        # type: (Union[Collection, Promise[Collection]], Type[Promise]) -> None
        self._promise_class = promise_class
        self.promise = self._promise_class()

        self._length = 0
        self._total_resolved = 0
        self._values = None  # type: Optional[Collection]
        Promise = self._promise_class
        if Promise.is_thenable(values):
            values_as_promise = Promise._try_convert_to_promise(
                values
            )._target()  # type: ignore
            self._init_promise(values_as_promise)
        else:
            self._init(values)  # type: ignore

    def __len__(self):
        # type: () -> int
        return self._length

    def _init_promise(self, values):
        # type: (Promise[Collection]) -> None
        if values.is_fulfilled:
            values = values._value()
        elif values.is_rejected:
            self._reject(values._reason())
            return

        self.promise._is_async_guaranteed = True
        values._then(self._init, self._reject)
        return

    def _init(self, values):
        # type: (Collection) -> None
        self._values = values
        if not isinstance(values, Iterable):
            err = Exception(
                "PromiseList requires an iterable. Received {}.".format(repr(values))
            )
            self.promise._reject_callback(err, False)
            return

        if not values:
            self._resolve([])
            return

        self._iterate(values)
        return

    def _iterate(self, values):
        # type: (Collection[Any]) -> None
        Promise = self._promise_class
        is_resolved = False

        self._length = len(values)
        self._values = [None] * self._length

        result = self.promise

        for i, val in enumerate(values):
            if Promise.is_thenable(val):
                maybe_promise = Promise._try_convert_to_promise(val)._target()
                # if is_resolved:
                #     # maybe_promise.suppressUnhandledRejections
                #     pass
                if maybe_promise.is_pending:
                    maybe_promise._add_callbacks(
                        partial(self._promise_fulfilled, i=i),
                        self._promise_rejected,
                        None,
                    )
                    self._values[i] = maybe_promise
                elif maybe_promise.is_fulfilled:
                    is_resolved = self._promise_fulfilled(maybe_promise._value(), i)
                elif maybe_promise.is_rejected:
                    is_resolved = self._promise_rejected(maybe_promise._reason())

            else:
                is_resolved = self._promise_fulfilled(val, i)

            if is_resolved:
                break

        if not is_resolved:
            result._is_async_guaranteed = True

    def _promise_fulfilled(self, value, i):
        # type: (Any, int) -> bool
        if self.is_resolved:
            return False
        # assert not self.is_resolved
        # assert isinstance(self._values, Iterable)
        # assert isinstance(i, int)
        self._values[i] = value  # type: ignore
        self._total_resolved += 1
        if self._total_resolved >= self._length:
            self._resolve(self._values)  # type: ignore
            return True
        return False

    def _promise_rejected(self, reason):
        # type: (Exception) -> bool
        if self.is_resolved:
            return False
        # assert not self.is_resolved
        # assert isinstance(self._values, Iterable)
        self._total_resolved += 1
        self._reject(reason)
        return True

    @property
    def is_resolved(self):
        # type: () -> bool
        return self._values is None

    def _resolve(self, value):
        # type: (Collection[Any]) -> None
        assert not self.is_resolved
        assert not isinstance(value, self._promise_class)
        self._values = None
        self.promise._fulfill(value)

    def _reject(self, reason):
        # type: (Exception) -> None
        assert not self.is_resolved
        self._values = None
        self.promise._reject_callback(reason, False)