File: waiter.py

package info (click to toggle)
python-botocore 1.12.103%2Brepack-1
  • links: PTS, VCS
  • area: main
  • in suites: buster
  • size: 41,552 kB
  • sloc: python: 43,119; xml: 15,052; makefile: 131
file content (331 lines) | stat: -rw-r--r-- 12,418 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
# Copyright 2012-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
import jmespath
import logging
import time

from botocore.utils import get_service_module_name
from botocore.docs.docstring import WaiterDocstring
from .exceptions import WaiterError, ClientError, WaiterConfigError
from . import xform_name


logger = logging.getLogger(__name__)


def create_waiter_with_client(waiter_name, waiter_model, client):
    """

    :type waiter_name: str
    :param waiter_name: The name of the waiter.  The name should match
        the name (including the casing) of the key name in the waiter
        model file (typically this is CamelCasing).

    :type waiter_model: botocore.waiter.WaiterModel
    :param waiter_model: The model for the waiter configuration.

    :type client: botocore.client.BaseClient
    :param client: The botocore client associated with the service.

    :rtype: botocore.waiter.Waiter
    :return: The waiter object.

    """
    single_waiter_config = waiter_model.get_waiter(waiter_name)
    operation_name = xform_name(single_waiter_config.operation)
    operation_method = NormalizedOperationMethod(
        getattr(client, operation_name))

    # Create a new wait method that will serve as a proxy to the underlying
    # Waiter.wait method. This is needed to attach a docstring to the
    # method.
    def wait(self, **kwargs):
        Waiter.wait(self, **kwargs)

    wait.__doc__ = WaiterDocstring(
        waiter_name=waiter_name,
        event_emitter=client.meta.events,
        service_model=client.meta.service_model,
        service_waiter_model=waiter_model,
        include_signature=False
    )

    # Rename the waiter class based on the type of waiter.
    waiter_class_name = str('%s.Waiter.%s' % (
        get_service_module_name(client.meta.service_model),
        waiter_name))

    # Create the new waiter class
    documented_waiter_cls = type(
        waiter_class_name, (Waiter,), {'wait': wait})

    # Return an instance of the new waiter class.
    return documented_waiter_cls(
        waiter_name, single_waiter_config, operation_method
    )


class NormalizedOperationMethod(object):
    def __init__(self, client_method):
        self._client_method = client_method

    def __call__(self, **kwargs):
        try:
            return self._client_method(**kwargs)
        except ClientError as e:
            return e.response


class WaiterModel(object):
    SUPPORTED_VERSION = 2

    def __init__(self, waiter_config):
        """

        Note that the WaiterModel takes ownership of the waiter_config.
        It may or may not mutate the waiter_config.  If this is a concern,
        it is best to make a copy of the waiter config before passing it to
        the WaiterModel.

        :type waiter_config: dict
        :param waiter_config: The loaded waiter config
            from the <service>*.waiters.json file.  This can be
            obtained from a botocore Loader object as well.

        """
        self._waiter_config = waiter_config['waiters']

        # These are part of the public API.  Changing these
        # will result in having to update the consuming code,
        # so don't change unless you really need to.
        version = waiter_config.get('version', 'unknown')
        self._verify_supported_version(version)
        self.version = version
        self.waiter_names = list(sorted(waiter_config['waiters'].keys()))

    def _verify_supported_version(self, version):
        if version != self.SUPPORTED_VERSION:
            raise WaiterConfigError(
                error_msg=("Unsupported waiter version, supported version "
                           "must be: %s, but version of waiter config "
                           "is: %s" % (self.SUPPORTED_VERSION,
                                       version)))

    def get_waiter(self, waiter_name):
        try:
            single_waiter_config = self._waiter_config[waiter_name]
        except KeyError:
            raise ValueError("Waiter does not exist: %s" % waiter_name)
        return SingleWaiterConfig(single_waiter_config)


class SingleWaiterConfig(object):
    """Represents the waiter configuration for a single waiter.

    A single waiter is considered the configuration for a single
    value associated with a named waiter (i.e TableExists).

    """
    def __init__(self, single_waiter_config):
        self._config = single_waiter_config

        # These attributes are part of the public API.
        self.description = single_waiter_config.get('description', '')
        # Per the spec, these three fields are required.
        self.operation = single_waiter_config['operation']
        self.delay = single_waiter_config['delay']
        self.max_attempts = single_waiter_config['maxAttempts']

    @property
    def acceptors(self):
        acceptors = []
        for acceptor_config in self._config['acceptors']:
            acceptor = AcceptorConfig(acceptor_config)
            acceptors.append(acceptor)
        return acceptors


class AcceptorConfig(object):
    def __init__(self, config):
        self.state = config['state']
        self.matcher = config['matcher']
        self.expected = config['expected']
        self.argument = config.get('argument')
        self.matcher_func = self._create_matcher_func()

    def _create_matcher_func(self):
        # An acceptor function is a callable that takes a single value.  The
        # parsed AWS response.  Note that the parsed error response is also
        # provided in the case of errors, so it's entirely possible to
        # handle all the available matcher capabilities in the future.
        # There's only three supported matchers, so for now, this is all
        # contained to a single method.  If this grows, we can expand this
        # out to separate methods or even objects.

        if self.matcher == 'path':
            return self._create_path_matcher()
        elif self.matcher == 'pathAll':
            return self._create_path_all_matcher()
        elif self.matcher == 'pathAny':
            return self._create_path_any_matcher()
        elif self.matcher == 'status':
            return self._create_status_matcher()
        elif self.matcher == 'error':
            return self._create_error_matcher()
        else:
            raise WaiterConfigError(
                error_msg="Unknown acceptor: %s" % self.matcher)

    def _create_path_matcher(self):
        expression = jmespath.compile(self.argument)
        expected = self.expected

        def acceptor_matches(response):
            if 'Error' in response:
                return
            return expression.search(response) == expected
        return acceptor_matches

    def _create_path_all_matcher(self):
        expression = jmespath.compile(self.argument)
        expected = self.expected

        def acceptor_matches(response):
            if 'Error' in response:
                return
            result = expression.search(response)
            if not isinstance(result, list) or not result:
                # pathAll matcher must result in a list.
                # Also we require at least one element in the list,
                # that is, an empty list should not result in this
                # acceptor match.
                return False
            for element in result:
                if element != expected:
                    return False
            return True
        return acceptor_matches

    def _create_path_any_matcher(self):
        expression = jmespath.compile(self.argument)
        expected = self.expected

        def acceptor_matches(response):
            if 'Error' in response:
                return
            result = expression.search(response)
            if not isinstance(result, list) or not result:
                # pathAny matcher must result in a list.
                # Also we require at least one element in the list,
                # that is, an empty list should not result in this
                # acceptor match.
                return False
            for element in result:
                if element == expected:
                    return True
            return False
        return acceptor_matches

    def _create_status_matcher(self):
        expected = self.expected

        def acceptor_matches(response):
            # We don't have any requirements on the expected incoming data
            # other than it is a dict, so we don't assume there's
            # a ResponseMetadata.HTTPStatusCode.
            status_code = response.get('ResponseMetadata', {}).get(
                'HTTPStatusCode')
            return status_code == expected
        return acceptor_matches

    def _create_error_matcher(self):
        expected = self.expected

        def acceptor_matches(response):
            # When the client encounters an error, it will normally raise
            # an exception.  However, the waiter implementation will catch
            # this exception, and instead send us the parsed error
            # response.  So response is still a dictionary, and in the case
            # of an error response will contain the "Error" and
            # "ResponseMetadata" key.
            return response.get("Error", {}).get("Code", "") == expected
        return acceptor_matches


class Waiter(object):
    def __init__(self, name, config, operation_method):
        """

        :type name: string
        :param name: The name of the waiter

        :type config: botocore.waiter.SingleWaiterConfig
        :param config: The configuration for the waiter.

        :type operation_method: callable
        :param operation_method: A callable that accepts **kwargs
            and returns a response.  For example, this can be
            a method from a botocore client.

        """
        self._operation_method = operation_method
        # The two attributes are exposed to allow for introspection
        # and documentation.
        self.name = name
        self.config = config

    def wait(self, **kwargs):
        acceptors = list(self.config.acceptors)
        current_state = 'waiting'
        # pop the invocation specific config
        config = kwargs.pop('WaiterConfig', {})
        sleep_amount = config.get('Delay', self.config.delay)
        max_attempts = config.get('MaxAttempts', self.config.max_attempts)
        num_attempts = 0

        while True:
            response = self._operation_method(**kwargs)
            num_attempts += 1
            for acceptor in acceptors:
                if acceptor.matcher_func(response):
                    current_state = acceptor.state
                    break
            else:
                # If none of the acceptors matched, we should
                # transition to the failure state if an error
                # response was received.
                if 'Error' in response:
                    # Transition to a failure state, which we
                    # can just handle here by raising an exception.
                    raise WaiterError(
                        name=self.name,
                        reason=response['Error'].get('Message', 'Unknown'),
                        last_response=response
                    )
            if current_state == 'success':
                logger.debug("Waiting complete, waiter matched the "
                             "success state.")
                return
            if current_state == 'failure':
                raise WaiterError(
                    name=self.name,
                    reason='Waiter encountered a terminal failure state',
                    last_response=response,
                )
            if num_attempts >= max_attempts:
                raise WaiterError(
                    name=self.name,
                    reason='Max attempts exceeded',
                    last_response=response
                )
            time.sleep(sleep_amount)