File: endpoint.py

package info (click to toggle)
python-aiobotocore 2.13.1-1.1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 832 kB
  • sloc: python: 10,572; makefile: 71
file content (334 lines) | stat: -rw-r--r-- 12,153 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
import asyncio

from botocore.endpoint import (
    DEFAULT_TIMEOUT,
    MAX_POOL_CONNECTIONS,
    Endpoint,
    EndpointCreator,
    HTTPClientError,
    create_request_object,
    history_recorder,
    is_valid_endpoint_url,
    is_valid_ipv6_endpoint_url,
    logger,
)
from botocore.hooks import first_non_none_response
from urllib3.response import HTTPHeaderDict

from aiobotocore.httpchecksum import handle_checksum_body
from aiobotocore.httpsession import AIOHTTPSession
from aiobotocore.response import StreamingBody

DEFAULT_HTTP_SESSION_CLS = AIOHTTPSession


async def convert_to_response_dict(http_response, operation_model):
    """Convert an HTTP response object to a request dict.

    This converts the requests library's HTTP response object to
    a dictionary.

    :type http_response: botocore.vendored.requests.model.Response
    :param http_response: The HTTP response from an AWS service request.

    :rtype: dict
    :return: A response dictionary which will contain the following keys:
        * headers (dict)
        * status_code (int)
        * body (string or file-like object)

    """
    response_dict = {
        # botocore converts keys to str, so make sure that they are in
        # the expected case. See detailed discussion here:
        # https://github.com/aio-libs/aiobotocore/pull/116
        # aiohttp's CIMultiDict camel cases the headers :(
        'headers': HTTPHeaderDict(
            {
                k.decode('utf-8').lower(): v.decode('utf-8')
                for k, v in http_response.raw.raw_headers
            }
        ),
        'status_code': http_response.status_code,
        'context': {
            'operation_name': operation_model.name,
        },
    }
    if response_dict['status_code'] >= 300:
        response_dict['body'] = await http_response.content
    elif operation_model.has_event_stream_output:
        response_dict['body'] = http_response.raw
    elif operation_model.has_streaming_output:
        length = response_dict['headers'].get('content-length')
        response_dict['body'] = StreamingBody(http_response.raw, length)
    else:
        response_dict['body'] = await http_response.content
    return response_dict


class AioEndpoint(Endpoint):
    async def close(self):
        await self.http_session.close()

    async def create_request(self, params, operation_model=None):
        request = create_request_object(params)
        if operation_model:
            request.stream_output = any(
                [
                    operation_model.has_streaming_output,
                    operation_model.has_event_stream_output,
                ]
            )
            service_id = operation_model.service_model.service_id.hyphenize()
            event_name = 'request-created.{service_id}.{op_name}'.format(
                service_id=service_id, op_name=operation_model.name
            )
            await self._event_emitter.emit(
                event_name,
                request=request,
                operation_name=operation_model.name,
            )
        prepared_request = self.prepare_request(request)
        return prepared_request

    async def _send_request(self, request_dict, operation_model):
        attempts = 1
        context = request_dict['context']
        self._update_retries_context(context, attempts)
        request = await self.create_request(request_dict, operation_model)
        success_response, exception = await self._get_response(
            request, operation_model, context
        )
        while await self._needs_retry(
            attempts,
            operation_model,
            request_dict,
            success_response,
            exception,
        ):
            attempts += 1
            self._update_retries_context(context, attempts, success_response)
            # If there is a stream associated with the request, we need
            # to reset it before attempting to send the request again.
            # This will ensure that we resend the entire contents of the
            # body.
            request.reset_stream()
            # Create a new request when retried (including a new signature).
            request = await self.create_request(request_dict, operation_model)
            success_response, exception = await self._get_response(
                request, operation_model, context
            )
        if (
            success_response is not None
            and 'ResponseMetadata' in success_response[1]
        ):
            # We want to share num retries, not num attempts.
            total_retries = attempts - 1
            success_response[1]['ResponseMetadata'][
                'RetryAttempts'
            ] = total_retries
        if exception is not None:
            raise exception
        else:
            return success_response

    async def _get_response(self, request, operation_model, context):
        # This will return a tuple of (success_response, exception)
        # and success_response is itself a tuple of
        # (http_response, parsed_dict).
        # If an exception occurs then the success_response is None.
        # If no exception occurs then exception is None.
        success_response, exception = await self._do_get_response(
            request, operation_model, context
        )
        kwargs_to_emit = {
            'response_dict': None,
            'parsed_response': None,
            'context': context,
            'exception': exception,
        }
        if success_response is not None:
            http_response, parsed_response = success_response
            kwargs_to_emit['parsed_response'] = parsed_response
            kwargs_to_emit['response_dict'] = await convert_to_response_dict(
                http_response, operation_model
            )
        service_id = operation_model.service_model.service_id.hyphenize()
        await self._event_emitter.emit(
            f"response-received.{service_id}.{operation_model.name}",
            **kwargs_to_emit,
        )
        return success_response, exception

    async def _do_get_response(self, request, operation_model, context):
        try:
            logger.debug("Sending http request: %s", request)
            history_recorder.record(
                'HTTP_REQUEST',
                {
                    'method': request.method,
                    'headers': request.headers,
                    'streaming': operation_model.has_streaming_input,
                    'url': request.url,
                    'body': request.body,
                },
            )
            service_id = operation_model.service_model.service_id.hyphenize()
            event_name = f"before-send.{service_id}.{operation_model.name}"
            responses = await self._event_emitter.emit(
                event_name, request=request
            )
            http_response = first_non_none_response(responses)
            if http_response is None:
                http_response = await self._send(request)
        except HTTPClientError as e:
            return (None, e)
        except Exception as e:
            logger.debug(
                "Exception received when sending HTTP request.", exc_info=True
            )
            return (None, e)

        # This returns the http_response and the parsed_data.
        response_dict = await convert_to_response_dict(
            http_response, operation_model
        )
        await handle_checksum_body(
            http_response,
            response_dict,
            context,
            operation_model,
        )

        http_response_record_dict = response_dict.copy()
        http_response_record_dict[
            'streaming'
        ] = operation_model.has_streaming_output
        history_recorder.record('HTTP_RESPONSE', http_response_record_dict)

        protocol = operation_model.metadata['protocol']
        parser = self._response_parser_factory.create_parser(protocol)

        if asyncio.iscoroutinefunction(parser.parse):
            parsed_response = await parser.parse(
                response_dict, operation_model.output_shape
            )
        else:
            parsed_response = parser.parse(
                response_dict, operation_model.output_shape
            )

        if http_response.status_code >= 300:
            await self._add_modeled_error_fields(
                response_dict,
                parsed_response,
                operation_model,
                parser,
            )
        history_recorder.record('PARSED_RESPONSE', parsed_response)
        return (http_response, parsed_response), None

    async def _add_modeled_error_fields(
        self,
        response_dict,
        parsed_response,
        operation_model,
        parser,
    ):
        error_code = parsed_response.get("Error", {}).get("Code")
        if error_code is None:
            return
        service_model = operation_model.service_model
        error_shape = service_model.shape_for_error_code(error_code)
        if error_shape is None:
            return

        if asyncio.iscoroutinefunction(parser.parse):
            modeled_parse = await parser.parse(response_dict, error_shape)
        else:
            modeled_parse = parser.parse(response_dict, error_shape)
        # TODO: avoid naming conflicts with ResponseMetadata and Error
        parsed_response.update(modeled_parse)

    # NOTE: The only line changed here changing time.sleep to asyncio.sleep
    async def _needs_retry(
        self,
        attempts,
        operation_model,
        request_dict,
        response=None,
        caught_exception=None,
    ):
        service_id = operation_model.service_model.service_id.hyphenize()
        event_name = f"needs-retry.{service_id}.{operation_model.name}"
        responses = await self._event_emitter.emit(
            event_name,
            response=response,
            endpoint=self,
            operation=operation_model,
            attempts=attempts,
            caught_exception=caught_exception,
            request_dict=request_dict,
        )
        handler_response = first_non_none_response(responses)
        if handler_response is None:
            return False
        else:
            # Request needs to be retried, and we need to sleep
            # for the specified number of times.
            logger.debug(
                "Response received to retry, sleeping for %s seconds",
                handler_response,
            )
            await asyncio.sleep(handler_response)
            return True

    async def _send(self, request):
        return await self.http_session.send(request)


class AioEndpointCreator(EndpointCreator):
    def create_endpoint(
        self,
        service_model,
        region_name,
        endpoint_url,
        verify=None,
        response_parser_factory=None,
        timeout=DEFAULT_TIMEOUT,
        max_pool_connections=MAX_POOL_CONNECTIONS,
        http_session_cls=DEFAULT_HTTP_SESSION_CLS,
        proxies=None,
        socket_options=None,
        client_cert=None,
        proxies_config=None,
        connector_args=None,
    ):
        if not is_valid_endpoint_url(
            endpoint_url
        ) and not is_valid_ipv6_endpoint_url(endpoint_url):
            raise ValueError("Invalid endpoint: %s" % endpoint_url)

        if proxies is None:
            proxies = self._get_proxies(endpoint_url)
        endpoint_prefix = service_model.endpoint_prefix

        logger.debug('Setting %s timeout as %s', endpoint_prefix, timeout)
        http_session = http_session_cls(
            timeout=timeout,
            proxies=proxies,
            verify=self._get_verify_value(verify),
            max_pool_connections=max_pool_connections,
            socket_options=socket_options,
            client_cert=client_cert,
            proxies_config=proxies_config,
            connector_args=connector_args,
        )

        return AioEndpoint(
            endpoint_url,
            endpoint_prefix=endpoint_prefix,
            event_emitter=self._event_emitter,
            response_parser_factory=response_parser_factory,
            http_session=http_session,
        )