File: waiter.py

package info (click to toggle)
python-aiobotocore 2.13.1-1.1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 832 kB
  • sloc: python: 10,572; makefile: 71
file content (144 lines) | stat: -rw-r--r-- 5,169 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
import asyncio

from botocore.docs.docstring import WaiterDocstring

# WaiterModel is required for client.py import
from botocore.exceptions import ClientError
from botocore.utils import get_service_module_name
from botocore.waiter import (
    NormalizedOperationMethod as _NormalizedOperationMethod,
)
from botocore.waiter import WaiterModel  # noqa: F401 lgtm[py/unused-import]
from botocore.waiter import (
    Waiter,
    WaiterError,
    is_valid_waiter_error,
    logger,
    xform_name,
)


def create_waiter_with_client(waiter_name, waiter_model, client):
    """

    :type waiter_name: str
    :param waiter_name: The name of the waiter.  The name should match
        the name (including the casing) of the key name in the waiter
        model file (typically this is CamelCasing).

    :type waiter_model: botocore.waiter.WaiterModel
    :param waiter_model: The model for the waiter configuration.

    :type client: botocore.client.BaseClient
    :param client: The botocore client associated with the service.

    :rtype: botocore.waiter.Waiter
    :return: The waiter object.

    """
    single_waiter_config = waiter_model.get_waiter(waiter_name)
    operation_name = xform_name(single_waiter_config.operation)
    operation_method = NormalizedOperationMethod(
        getattr(client, operation_name)
    )

    # Create a new wait method that will serve as a proxy to the underlying
    # Waiter.wait method. This is needed to attach a docstring to the
    # method.
    async def wait(self, **kwargs):
        return await AIOWaiter.wait(self, **kwargs)

    wait.__doc__ = WaiterDocstring(
        waiter_name=waiter_name,
        event_emitter=client.meta.events,
        service_model=client.meta.service_model,
        service_waiter_model=waiter_model,
        include_signature=False,
    )

    # Rename the waiter class based on the type of waiter.
    waiter_class_name = str(
        '%s.Waiter.%s'
        % (get_service_module_name(client.meta.service_model), waiter_name)
    )

    # Create the new waiter class
    documented_waiter_cls = type(waiter_class_name, (Waiter,), {'wait': wait})

    # Return an instance of the new waiter class.
    return documented_waiter_cls(
        waiter_name, single_waiter_config, operation_method
    )


class NormalizedOperationMethod(_NormalizedOperationMethod):
    async def __call__(self, **kwargs):
        try:
            return await self._client_method(**kwargs)
        except ClientError as e:
            return e.response


class AIOWaiter(Waiter):
    async def wait(self, **kwargs):
        acceptors = list(self.config.acceptors)
        current_state = 'waiting'
        # pop the invocation specific config
        config = kwargs.pop('WaiterConfig', {})
        sleep_amount = config.get('Delay', self.config.delay)
        max_attempts = config.get('MaxAttempts', self.config.max_attempts)
        last_matched_acceptor = None
        num_attempts = 0

        while True:
            response = await self._operation_method(**kwargs)
            num_attempts += 1
            for acceptor in acceptors:
                if acceptor.matcher_func(response):
                    last_matched_acceptor = acceptor
                    current_state = acceptor.state
                    break
            else:
                # If none of the acceptors matched, we should
                # transition to the failure state if an error
                # response was received.
                if is_valid_waiter_error(response):
                    # Transition to a failure state, which we
                    # can just handle here by raising an exception.
                    raise WaiterError(
                        name=self.name,
                        reason='An error occurred (%s): %s'
                        % (
                            response['Error'].get('Code', 'Unknown'),
                            response['Error'].get('Message', 'Unknown'),
                        ),
                        last_response=response,
                    )
            if current_state == 'success':
                logger.debug(
                    "Waiting complete, waiter matched the " "success state."
                )
                return response
            if current_state == 'failure':
                reason = 'Waiter encountered a terminal failure state: %s' % (
                    acceptor.explanation
                )
                raise WaiterError(
                    name=self.name,
                    reason=reason,
                    last_response=response,
                )
            if num_attempts >= max_attempts:
                if last_matched_acceptor is None:
                    reason = 'Max attempts exceeded'
                else:
                    reason = (
                        'Max attempts exceeded. Previously accepted state: %s'
                        % (acceptor.explanation)
                    )
                raise WaiterError(
                    name=self.name,
                    reason=reason,
                    last_response=response,
                )
            await asyncio.sleep(sleep_amount)