1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91
|
from botocore.handlers import check_for_200_error as boto_check_for_200_error
from botocore.handlers import (
inject_presigned_url_ec2 as boto_inject_presigned_url_ec2,
)
from botocore.handlers import (
inject_presigned_url_rds as boto_inject_presigned_url_rds,
)
from botocore.handlers import (
parse_get_bucket_location as boto_parse_get_bucket_location,
)
from botocore.hooks import HierarchicalEmitter, logger
from botocore.signers import (
add_generate_db_auth_token as boto_add_generate_db_auth_token,
)
from botocore.signers import (
add_generate_presigned_post as boto_add_generate_presigned_post,
)
from botocore.signers import (
add_generate_presigned_url as boto_add_generate_presigned_url,
)
from ._helpers import resolve_awaitable
from .handlers import (
check_for_200_error,
inject_presigned_url_ec2,
inject_presigned_url_rds,
parse_get_bucket_location,
)
from .signers import (
add_generate_db_auth_token,
add_generate_presigned_post,
add_generate_presigned_url,
)
_HANDLER_MAPPING = {
boto_inject_presigned_url_ec2: inject_presigned_url_ec2,
boto_inject_presigned_url_rds: inject_presigned_url_rds,
boto_add_generate_presigned_url: add_generate_presigned_url,
boto_add_generate_presigned_post: add_generate_presigned_post,
boto_add_generate_db_auth_token: add_generate_db_auth_token,
boto_parse_get_bucket_location: parse_get_bucket_location,
boto_check_for_200_error: check_for_200_error,
}
class AioHierarchicalEmitter(HierarchicalEmitter):
async def _emit(self, event_name, kwargs, stop_on_response=False):
responses = []
# Invoke the event handlers from most specific
# to least specific, each time stripping off a dot.
handlers_to_call = self._lookup_cache.get(event_name)
if handlers_to_call is None:
handlers_to_call = self._handlers.prefix_search(event_name)
self._lookup_cache[event_name] = handlers_to_call
elif not handlers_to_call:
# Short circuit and return an empty response is we have
# no handlers to call. This is the common case where
# for the majority of signals, nothing is listening.
return []
kwargs['event_name'] = event_name
responses = []
for handler in handlers_to_call:
logger.debug('Event %s: calling handler %s', event_name, handler)
# Await the handler if its a coroutine.
response = await resolve_awaitable(handler(**kwargs))
responses.append((handler, response))
if stop_on_response and response is not None:
return responses
return responses
async def emit_until_response(self, event_name, **kwargs):
responses = await self._emit(event_name, kwargs, stop_on_response=True)
if responses:
return responses[-1]
else:
return None, None
def _verify_and_register(
self,
event_name,
handler,
unique_id,
register_method,
unique_id_uses_count,
):
handler = _HANDLER_MAPPING.get(handler, handler)
self._verify_is_callable(handler)
self._verify_accept_kwargs(handler)
register_method(event_name, handler, unique_id, unique_id_uses_count)
|