File: retryhandler.py

package info (click to toggle)
python-aiobotocore 2.13.1-1.1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 832 kB
  • sloc: python: 10,572; makefile: 71
file content (220 lines) | stat: -rw-r--r-- 7,961 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
from botocore.retryhandler import (
    ChecksumError,
    CRC32Checker,
    ExceptionRaiser,
    HTTPStatusCodeChecker,
    MaxAttemptsDecorator,
    MultiChecker,
    RetryHandler,
    ServiceErrorCodeChecker,
    _extract_retryable_exception,
    crc32,
    create_retry_action_from_config,
    logger,
)

from ._helpers import resolve_awaitable


def create_retry_handler(config, operation_name=None):
    checker = create_checker_from_retry_config(
        config, operation_name=operation_name
    )
    action = create_retry_action_from_config(
        config, operation_name=operation_name
    )
    return AioRetryHandler(checker=checker, action=action)


def create_checker_from_retry_config(config, operation_name=None):
    checkers = []
    max_attempts = None
    retryable_exceptions = []
    if '__default__' in config:
        policies = config['__default__'].get('policies', [])
        max_attempts = config['__default__']['max_attempts']
        for key in policies:
            current_config = policies[key]
            checkers.append(_create_single_checker(current_config))
            retry_exception = _extract_retryable_exception(current_config)
            if retry_exception is not None:
                retryable_exceptions.extend(retry_exception)
    if operation_name is not None and config.get(operation_name) is not None:
        operation_policies = config[operation_name]['policies']
        for key in operation_policies:
            checkers.append(_create_single_checker(operation_policies[key]))
            retry_exception = _extract_retryable_exception(
                operation_policies[key]
            )
            if retry_exception is not None:
                retryable_exceptions.extend(retry_exception)
    if len(checkers) == 1:
        # Don't need to use a MultiChecker
        return AioMaxAttemptsDecorator(checkers[0], max_attempts=max_attempts)
    else:
        multi_checker = AioMultiChecker(checkers)
        return AioMaxAttemptsDecorator(
            multi_checker,
            max_attempts=max_attempts,
            retryable_exceptions=tuple(retryable_exceptions),
        )


def _create_single_checker(config):
    if 'response' in config['applies_when']:
        return _create_single_response_checker(
            config['applies_when']['response']
        )
    elif 'socket_errors' in config['applies_when']:
        return ExceptionRaiser()


def _create_single_response_checker(response):
    if 'service_error_code' in response:
        checker = ServiceErrorCodeChecker(
            status_code=response['http_status_code'],
            error_code=response['service_error_code'],
        )
    elif 'http_status_code' in response:
        checker = HTTPStatusCodeChecker(
            status_code=response['http_status_code']
        )
    elif 'crc32body' in response:
        checker = AioCRC32Checker(header=response['crc32body'])
    else:
        # TODO: send a signal.
        raise ValueError("Unknown retry policy")
    return checker


class AioRetryHandler(RetryHandler):
    async def _call(self, attempts, response, caught_exception, **kwargs):
        """Handler for a retry.

        Intended to be hooked up to an event handler (hence the **kwargs),
        this will process retries appropriately.

        """
        checker_kwargs = {
            'attempt_number': attempts,
            'response': response,
            'caught_exception': caught_exception,
        }
        if isinstance(self._checker, MaxAttemptsDecorator):
            retries_context = kwargs['request_dict']['context'].get('retries')
            checker_kwargs.update({'retries_context': retries_context})

        if await resolve_awaitable(self._checker(**checker_kwargs)):
            result = self._action(attempts=attempts)
            logger.debug("Retry needed, action of: %s", result)
            return result
        logger.debug("No retry needed.")

    def __call__(self, *args, **kwargs):
        return self._call(*args, **kwargs)  # return awaitable


class AioMaxAttemptsDecorator(MaxAttemptsDecorator):
    async def _call(
        self, attempt_number, response, caught_exception, retries_context
    ):
        if retries_context:
            retries_context['max'] = max(
                retries_context.get('max', 0), self._max_attempts
            )

        should_retry = await self._should_retry(
            attempt_number, response, caught_exception
        )
        if should_retry:
            if attempt_number >= self._max_attempts:
                # explicitly set MaxAttemptsReached
                if response is not None and 'ResponseMetadata' in response[1]:
                    response[1]['ResponseMetadata'][
                        'MaxAttemptsReached'
                    ] = True
                logger.debug(
                    "Reached the maximum number of retry attempts: %s",
                    attempt_number,
                )
                return False
            else:
                return should_retry
        else:
            return False

    def __call__(self, *args, **kwargs):
        return self._call(*args, **kwargs)

    async def _should_retry(self, attempt_number, response, caught_exception):
        if self._retryable_exceptions and attempt_number < self._max_attempts:
            try:
                return await resolve_awaitable(
                    self._checker(attempt_number, response, caught_exception)
                )
            except self._retryable_exceptions as e:
                logger.debug(
                    "retry needed, retryable exception caught: %s",
                    e,
                    exc_info=True,
                )
                return True
        else:
            # If we've exceeded the max attempts we just let the exception
            # propagate if one has occurred.
            return await resolve_awaitable(
                self._checker(attempt_number, response, caught_exception)
            )


class AioMultiChecker(MultiChecker):
    async def _call(self, attempt_number, response, caught_exception):
        for checker in self._checkers:
            checker_response = await resolve_awaitable(
                checker(attempt_number, response, caught_exception)
            )
            if checker_response:
                return checker_response
        return False

    def __call__(self, *args, **kwargs):
        return self._call(*args, **kwargs)


class AioCRC32Checker(CRC32Checker):
    async def _call(self, attempt_number, response, caught_exception):
        if response is not None:
            return await self._check_response(attempt_number, response)
        elif caught_exception is not None:
            return self._check_caught_exception(
                attempt_number, caught_exception
            )
        else:
            raise ValueError("Both response and caught_exception are None.")

    def __call__(self, *args, **kwargs):
        return self._call(*args, **kwargs)

    async def _check_response(self, attempt_number, response):
        http_response = response[0]
        expected_crc = http_response.headers.get(self._header_name)
        if expected_crc is None:
            logger.debug(
                "crc32 check skipped, the %s header is not "
                "in the http response.",
                self._header_name,
            )
        else:
            actual_crc32 = crc32(await response[0].content) & 0xFFFFFFFF
            if not actual_crc32 == int(expected_crc):
                logger.debug(
                    "retry needed: crc32 check failed, expected != actual: "
                    "%s != %s",
                    int(expected_crc),
                    actual_crc32,
                )
                raise ChecksumError(
                    checksum_type='crc32',
                    expected_checksum=int(expected_crc),
                    actual_checksum=actual_crc32,
                )