1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87
|
import inspect
from botocore.discovery import (
EndpointDiscoveryHandler,
EndpointDiscoveryManager,
EndpointDiscoveryRefreshFailed,
HTTPClientError,
logger,
)
class AioEndpointDiscoveryManager(EndpointDiscoveryManager):
async def _refresh_current_endpoints(self, **kwargs):
cache_key = self._create_cache_key(**kwargs)
try:
response = self._describe_endpoints(**kwargs)
if inspect.isawaitable(response):
response = await response
endpoints = self._parse_endpoints(response)
self._cache[cache_key] = endpoints
self._failed_attempts.pop(cache_key, None)
return endpoints
except (ConnectionError, HTTPClientError):
self._failed_attempts[cache_key] = self._time() + 60
return None
async def describe_endpoint(self, **kwargs):
operation = kwargs['Operation']
discovery_required = self._model.discovery_required_for(operation)
if not self._always_discover and not discovery_required:
# Discovery set to only run on required operations
logger.debug(
'Optional discovery disabled. Skipping discovery for Operation: %s'
% operation
)
return None
# Get the endpoint for the provided operation and identifiers
cache_key = self._create_cache_key(**kwargs)
endpoints = self._get_current_endpoints(cache_key)
if endpoints:
return self._select_endpoint(endpoints)
# All known endpoints are stale
recently_failed = self._recently_failed(cache_key)
if not recently_failed:
# We haven't failed to discover recently, go ahead and refresh
endpoints = await self._refresh_current_endpoints(**kwargs)
if endpoints:
return self._select_endpoint(endpoints)
# Discovery has failed recently, do our best to get an endpoint
logger.debug('Endpoint Discovery has failed for: %s', kwargs)
stale_entries = self._cache.get(cache_key, None)
if stale_entries:
# We have stale entries, use those while discovery is failing
return self._select_endpoint(stale_entries)
if discovery_required:
# It looks strange to be checking recently_failed again but,
# this informs us as to whether or not we tried to refresh earlier
if recently_failed:
# Discovery is required and we haven't already refreshed
endpoints = await self._refresh_current_endpoints(**kwargs)
if endpoints:
return self._select_endpoint(endpoints)
# No endpoints even refresh, raise hard error
raise EndpointDiscoveryRefreshFailed()
# Discovery is optional, just use the default endpoint for now
return None
class AioEndpointDiscoveryHandler(EndpointDiscoveryHandler):
async def discover_endpoint(self, request, operation_name, **kwargs):
ids = request.context.get('discovery', {}).get('identifiers')
if ids is None:
return
endpoint = await self._manager.describe_endpoint(
Operation=operation_name, Identifiers=ids
)
if endpoint is None:
logger.debug('Failed to discover and inject endpoint')
return
if not endpoint.startswith('http'):
endpoint = 'https://' + endpoint
logger.debug('Injecting discovered endpoint: %s', endpoint)
request.url = endpoint
|