File: paginate.py

package info (click to toggle)
python-aiobotocore 2.13.1-1.1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 832 kB
  • sloc: python: 10,572; makefile: 71
file content (191 lines) | stat: -rw-r--r-- 8,030 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
import aioitertools
import jmespath
from botocore.exceptions import PaginationError
from botocore.paginate import PageIterator, Paginator
from botocore.utils import merge_dicts, set_value_from_jmespath


class AioPageIterator(PageIterator):
    def __aiter__(self):
        return self.__anext__()

    async def __anext__(self):
        current_kwargs = self._op_kwargs
        previous_next_token = None
        next_token = {key: None for key in self._input_token}
        if self._starting_token is not None:
            # If the starting token exists, populate the next_token with the
            # values inside it. This ensures that we have the service's
            # pagination token on hand if we need to truncate after the
            # first response.
            next_token = self._parse_starting_token()[0]
        # The number of items from result_key we've seen so far.
        total_items = 0
        first_request = True
        primary_result_key = self.result_keys[0]
        starting_truncation = 0
        self._inject_starting_params(current_kwargs)

        while True:
            response = await self._make_request(current_kwargs)
            parsed = self._extract_parsed_response(response)
            if first_request:
                # The first request is handled differently.  We could
                # possibly have a resume/starting token that tells us where
                # to index into the retrieved page.
                if self._starting_token is not None:
                    starting_truncation = self._handle_first_request(
                        parsed, primary_result_key, starting_truncation
                    )
                first_request = False
                self._record_non_aggregate_key_values(parsed)
            else:
                # If this isn't the first request, we have already sliced into
                # the first request and had to make additional requests after.
                # We no longer need to add this to truncation.
                starting_truncation = 0
            current_response = primary_result_key.search(parsed)
            if current_response is None:
                current_response = []
            num_current_response = len(current_response)
            truncate_amount = 0
            if self._max_items is not None:
                truncate_amount = (
                    total_items + num_current_response - self._max_items
                )

            if truncate_amount > 0:
                self._truncate_response(
                    parsed,
                    primary_result_key,
                    truncate_amount,
                    starting_truncation,
                    next_token,
                )
                yield response
                break
            else:
                yield response
                total_items += num_current_response
                next_token = self._get_next_token(parsed)
                if all(t is None for t in next_token.values()):
                    break
                if (
                    self._max_items is not None
                    and total_items == self._max_items
                ):
                    # We're on a page boundary so we can set the current
                    # next token to be the resume token.
                    self.resume_token = next_token
                    break
                if (
                    previous_next_token is not None
                    and previous_next_token == next_token
                ):
                    message = (
                        f"The same next token was received "
                        f"twice: {next_token}"
                    )
                    raise PaginationError(message=message)
                self._inject_token_into_kwargs(current_kwargs, next_token)
                previous_next_token = next_token

    async def search(self, expression):
        compiled = jmespath.compile(expression)
        async for page in self:
            results = compiled.search(page)
            if isinstance(results, list):
                for element in results:
                    yield element  # unfortunately yield from not avail from async f
            else:
                yield results

    def result_key_iters(self):
        teed_results = aioitertools.tee(self, len(self.result_keys))
        return [
            ResultKeyIterator(i, result_key)
            for i, result_key in zip(teed_results, self.result_keys)
        ]

    async def build_full_result(self):
        complete_result = {}
        async for response in self:
            page = response
            # We want to try to catch operation object pagination
            # and format correctly for those. They come in the form
            # of a tuple of two elements: (http_response, parsed_responsed).
            # We want the parsed_response as that is what the page iterator
            # uses. We can remove it though once operation objects are removed.
            if isinstance(response, tuple) and len(response) == 2:
                page = response[1]
            # We're incrementally building the full response page
            # by page.  For each page in the response we need to
            # inject the necessary components from the page
            # into the complete_result.
            for result_expression in self.result_keys:
                # In order to incrementally update a result key
                # we need to search the existing value from complete_result,
                # then we need to search the _current_ page for the
                # current result key value.  Then we append the current
                # value onto the existing value, and re-set that value
                # as the new value.
                result_value = result_expression.search(page)
                if result_value is None:
                    continue
                existing_value = result_expression.search(complete_result)
                if existing_value is None:
                    # Set the initial result
                    set_value_from_jmespath(
                        complete_result,
                        result_expression.expression,
                        result_value,
                    )
                    continue
                # Now both result_value and existing_value contain something
                if isinstance(result_value, list):
                    existing_value.extend(result_value)
                elif isinstance(result_value, (int, float, str)):
                    # Modify the existing result with the sum or concatenation
                    set_value_from_jmespath(
                        complete_result,
                        result_expression.expression,
                        existing_value + result_value,
                    )
        merge_dicts(complete_result, self.non_aggregate_part)
        if self.resume_token is not None:
            complete_result['NextToken'] = self.resume_token
        return complete_result


class AioPaginator(Paginator):
    PAGE_ITERATOR_CLS = AioPageIterator


class ResultKeyIterator:
    """Iterates over the results of paginated responses.

    Each iterator is associated with a single result key.
    Iterating over this object will give you each element in
    the result key list.

    :param pages_iterator: An iterator that will give you
        pages of results (a ``PageIterator`` class).
    :param result_key: The JMESPath expression representing
        the result key.

    """

    def __init__(self, pages_iterator, result_key):
        self._pages_iterator = pages_iterator
        self.result_key = result_key

    def __aiter__(self):
        return self.__anext__()

    async def __anext__(self):
        async for page in self._pages_iterator:
            results = self.result_key.search(page)
            if results is None:
                results = []
            for result in results:
                yield result  # yield from not avail from async func