1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96
|
import copy
import botocore.client
from botocore.exceptions import ParamValidationError
from ._constants import DEFAULT_KEEPALIVE_TIMEOUT
from .endpoint import DEFAULT_HTTP_SESSION_CLS
from .httpxsession import HttpxSession
TIMEOUT_ARGS = frozenset(
('keepalive_timeout', 'write_timeout', 'pool_timeout')
)
class AioConfig(botocore.client.Config):
def __init__(
self,
connector_args=None,
http_session_cls=DEFAULT_HTTP_SESSION_CLS,
**kwargs,
):
super().__init__(**kwargs)
self._validate_connector_args(connector_args, http_session_cls)
self.connector_args = copy.copy(connector_args)
self.http_session_cls = http_session_cls
if not self.connector_args:
self.connector_args = dict()
if 'keepalive_timeout' not in self.connector_args:
self.connector_args['keepalive_timeout'] = (
DEFAULT_KEEPALIVE_TIMEOUT
)
def merge(self, other_config):
# Adapted from parent class
config_options = copy.copy(self._user_provided_options)
config_options.update(other_config._user_provided_options)
return AioConfig(self.connector_args, **config_options)
@staticmethod
def _validate_connector_args(connector_args, http_session_cls):
if connector_args is None:
return
for k, v in connector_args.items():
# verify_ssl is handled by verify parameter to create_client
if k == 'use_dns_cache':
if http_session_cls is HttpxSession:
raise ParamValidationError(
report='Httpx does not support dns caching. https://github.com/encode/httpx/discussions/2211'
)
if not isinstance(v, bool):
raise ParamValidationError(
report=f'{k} value must be a boolean'
)
elif k == 'ttl_dns_cache':
if v is not None and not isinstance(v, int):
raise ParamValidationError(
report=f'{k} value must be an int or None'
)
elif k in TIMEOUT_ARGS:
if v is not None and not isinstance(v, (float, int)):
raise ParamValidationError(
report=f'{k} value must be a float/int or None'
)
elif k == 'force_close':
if http_session_cls is HttpxSession:
raise ParamValidationError(
report=f'Httpx backend does not currently support {k}.'
)
if not isinstance(v, bool):
raise ParamValidationError(
report=f'{k} value must be a boolean'
)
# limit is handled by max_pool_connections
elif k == 'ssl_context':
import ssl
if not isinstance(v, ssl.SSLContext):
raise ParamValidationError(
report=f'{k} must be an SSLContext instance'
)
elif k == "resolver":
from aiohttp.abc import AbstractResolver
if http_session_cls is HttpxSession:
raise ParamValidationError(
report=f'Httpx backend does not support {k}.'
)
if not isinstance(v, AbstractResolver):
raise ParamValidationError(
report=f'{k} must be an instance of a AbstractResolver'
)
else:
raise ParamValidationError(report=f'invalid connector_arg:{k}')
|