File: discovery.py

package info (click to toggle)
python-aiobotocore 2.13.1-1.1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 832 kB
  • sloc: python: 10,572; makefile: 71
file content (87 lines) | stat: -rw-r--r-- 3,613 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
import inspect

from botocore.discovery import (
    EndpointDiscoveryHandler,
    EndpointDiscoveryManager,
    EndpointDiscoveryRefreshFailed,
    HTTPClientError,
    logger,
)


class AioEndpointDiscoveryManager(EndpointDiscoveryManager):
    async def _refresh_current_endpoints(self, **kwargs):
        cache_key = self._create_cache_key(**kwargs)
        try:
            response = self._describe_endpoints(**kwargs)

            if inspect.isawaitable(response):
                response = await response

            endpoints = self._parse_endpoints(response)
            self._cache[cache_key] = endpoints
            self._failed_attempts.pop(cache_key, None)
            return endpoints
        except (ConnectionError, HTTPClientError):
            self._failed_attempts[cache_key] = self._time() + 60
            return None

    async def describe_endpoint(self, **kwargs):
        operation = kwargs['Operation']
        discovery_required = self._model.discovery_required_for(operation)

        if not self._always_discover and not discovery_required:
            # Discovery set to only run on required operations
            logger.debug(
                'Optional discovery disabled. Skipping discovery for Operation: %s'
                % operation
            )
            return None

        # Get the endpoint for the provided operation and identifiers
        cache_key = self._create_cache_key(**kwargs)
        endpoints = self._get_current_endpoints(cache_key)
        if endpoints:
            return self._select_endpoint(endpoints)
        # All known endpoints are stale
        recently_failed = self._recently_failed(cache_key)
        if not recently_failed:
            # We haven't failed to discover recently, go ahead and refresh
            endpoints = await self._refresh_current_endpoints(**kwargs)
            if endpoints:
                return self._select_endpoint(endpoints)
        # Discovery has failed recently, do our best to get an endpoint
        logger.debug('Endpoint Discovery has failed for: %s', kwargs)
        stale_entries = self._cache.get(cache_key, None)
        if stale_entries:
            # We have stale entries, use those while discovery is failing
            return self._select_endpoint(stale_entries)
        if discovery_required:
            # It looks strange to be checking recently_failed again but,
            # this informs us as to whether or not we tried to refresh earlier
            if recently_failed:
                # Discovery is required and we haven't already refreshed
                endpoints = await self._refresh_current_endpoints(**kwargs)
                if endpoints:
                    return self._select_endpoint(endpoints)
            # No endpoints even refresh, raise hard error
            raise EndpointDiscoveryRefreshFailed()
        # Discovery is optional, just use the default endpoint for now
        return None


class AioEndpointDiscoveryHandler(EndpointDiscoveryHandler):
    async def discover_endpoint(self, request, operation_name, **kwargs):
        ids = request.context.get('discovery', {}).get('identifiers')
        if ids is None:
            return
        endpoint = await self._manager.describe_endpoint(
            Operation=operation_name, Identifiers=ids
        )
        if endpoint is None:
            logger.debug('Failed to discover and inject endpoint')
            return
        if not endpoint.startswith('http'):
            endpoint = 'https://' + endpoint
        logger.debug('Injecting discovered endpoint: %s', endpoint)
        request.url = endpoint