1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144
|
import asyncio
from botocore.docs.docstring import WaiterDocstring
# WaiterModel is required for client.py import
from botocore.exceptions import ClientError
from botocore.utils import get_service_module_name
from botocore.waiter import (
NormalizedOperationMethod as _NormalizedOperationMethod,
)
from botocore.waiter import WaiterModel # noqa: F401 lgtm[py/unused-import]
from botocore.waiter import (
Waiter,
WaiterError,
is_valid_waiter_error,
logger,
xform_name,
)
def create_waiter_with_client(waiter_name, waiter_model, client):
"""
:type waiter_name: str
:param waiter_name: The name of the waiter. The name should match
the name (including the casing) of the key name in the waiter
model file (typically this is CamelCasing).
:type waiter_model: botocore.waiter.WaiterModel
:param waiter_model: The model for the waiter configuration.
:type client: botocore.client.BaseClient
:param client: The botocore client associated with the service.
:rtype: botocore.waiter.Waiter
:return: The waiter object.
"""
single_waiter_config = waiter_model.get_waiter(waiter_name)
operation_name = xform_name(single_waiter_config.operation)
operation_method = NormalizedOperationMethod(
getattr(client, operation_name)
)
# Create a new wait method that will serve as a proxy to the underlying
# Waiter.wait method. This is needed to attach a docstring to the
# method.
async def wait(self, **kwargs):
return await AIOWaiter.wait(self, **kwargs)
wait.__doc__ = WaiterDocstring(
waiter_name=waiter_name,
event_emitter=client.meta.events,
service_model=client.meta.service_model,
service_waiter_model=waiter_model,
include_signature=False,
)
# Rename the waiter class based on the type of waiter.
waiter_class_name = str(
'%s.Waiter.%s'
% (get_service_module_name(client.meta.service_model), waiter_name)
)
# Create the new waiter class
documented_waiter_cls = type(waiter_class_name, (Waiter,), {'wait': wait})
# Return an instance of the new waiter class.
return documented_waiter_cls(
waiter_name, single_waiter_config, operation_method
)
class NormalizedOperationMethod(_NormalizedOperationMethod):
async def __call__(self, **kwargs):
try:
return await self._client_method(**kwargs)
except ClientError as e:
return e.response
class AIOWaiter(Waiter):
async def wait(self, **kwargs):
acceptors = list(self.config.acceptors)
current_state = 'waiting'
# pop the invocation specific config
config = kwargs.pop('WaiterConfig', {})
sleep_amount = config.get('Delay', self.config.delay)
max_attempts = config.get('MaxAttempts', self.config.max_attempts)
last_matched_acceptor = None
num_attempts = 0
while True:
response = await self._operation_method(**kwargs)
num_attempts += 1
for acceptor in acceptors:
if acceptor.matcher_func(response):
last_matched_acceptor = acceptor
current_state = acceptor.state
break
else:
# If none of the acceptors matched, we should
# transition to the failure state if an error
# response was received.
if is_valid_waiter_error(response):
# Transition to a failure state, which we
# can just handle here by raising an exception.
raise WaiterError(
name=self.name,
reason='An error occurred (%s): %s'
% (
response['Error'].get('Code', 'Unknown'),
response['Error'].get('Message', 'Unknown'),
),
last_response=response,
)
if current_state == 'success':
logger.debug(
"Waiting complete, waiter matched the " "success state."
)
return response
if current_state == 'failure':
reason = 'Waiter encountered a terminal failure state: %s' % (
acceptor.explanation
)
raise WaiterError(
name=self.name,
reason=reason,
last_response=response,
)
if num_attempts >= max_attempts:
if last_matched_acceptor is None:
reason = 'Max attempts exceeded'
else:
reason = (
'Max attempts exceeded. Previously accepted state: %s'
% (acceptor.explanation)
)
raise WaiterError(
name=self.name,
reason=reason,
last_response=response,
)
await asyncio.sleep(sleep_amount)
|