1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138
|
# Copyright 2024 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
import pytest
from botocore.config import Config
from botocore.handlers import get_bearer_auth_supported_services
from tests import create_session, mock
# In the future, a service may have a list of credentials requirements where one
# signature may fail and others may succeed. e.g. a service may want to use bearer
# auth but fall back to sigv4 if a token isn't available. There's currently no way to do
# this in botocore, so this test ensures we handle this gracefully when the need arises.
# Some services (such as Bedrock) have customizations that bypass this limitation
# by manually assigning a signer when credentials are unavailable. To avoid
# false positives, we filter these services out of the tests below.
# The dictionary's value here needs to be hashable to be added to the set below; any
# new auth types with multiple requirements should be added in a comma-separated list
AUTH_TYPE_REQUIREMENTS = {
'aws.auth#sigv4': 'credentials',
'aws.auth#sigv4a': 'credentials',
'smithy.api#httpBearerAuth': 'bearer_token',
'smithy.api#noAuth': 'none',
}
# Services with a `signing_name` that are known to have
# customizations for handling mixed authentication methods.
KNOWN_MIXED_AUTH_SERVICES = get_bearer_auth_supported_services()
KNOWN_MIXED_AUTH_SCHEMES = {'aws.auth#sigv4', 'smithy.api#httpBearerAuth'}
def _all_test_cases():
session = create_session()
loader = session.get_component('data_loader')
services = loader.list_available_services('service-2')
auth_services = []
auth_operations = []
for service in services:
service_model = session.get_service_model(service)
signing_name = service_model.signing_name
auth_config = service_model.metadata.get('auth', {})
if signing_name in KNOWN_MIXED_AUTH_SERVICES:
if set(auth_config) == KNOWN_MIXED_AUTH_SCHEMES:
# Skip service due to known mixed auth configurations.
continue
if auth_config:
auth_services.append([service, auth_config])
for operation in service_model.operation_names:
operation_model = service_model.operation_model(operation)
if operation_model.auth:
auth_operations.append([service, operation_model])
return auth_services, auth_operations
AUTH_SERVICES, AUTH_OPERATIONS = _all_test_cases()
@pytest.mark.validates_models
@pytest.mark.parametrize("auth_service, auth_config", AUTH_SERVICES)
def test_all_requirements_match_for_service(auth_service, auth_config):
# Validates that all service-level signature types have the same requirements
message = f'Found mixed signer requirements for service: {auth_service}'
assert_all_requirements_match(auth_config, message)
@pytest.mark.validates_models
@pytest.mark.parametrize("auth_service, operation_model", AUTH_OPERATIONS)
def test_all_requirements_match_for_operation(auth_service, operation_model):
# Validates that all operation-level signature types have the same requirements
message = f'Found mixed signer requirements for operation: {auth_service}.{operation_model.name}'
auth_config = operation_model.auth
assert_all_requirements_match(auth_config, message)
def assert_all_requirements_match(auth_config, message):
auth_requirements = set(
AUTH_TYPE_REQUIREMENTS[auth_type] for auth_type in auth_config
)
assert len(auth_requirements) == 1, message
def get_config_file_path(base_path, value):
if value is None:
return "file-does-not-exist"
tmp_config_file_path = base_path / "config"
tmp_config_file_path.write_text(
f"[default]\nsigv4a_signing_region_set={value}\n"
)
return tmp_config_file_path
def get_environ_mock(
request,
env_var_value=None,
config_file_value=None,
):
base_path = request.getfixturevalue("tmp_path")
config_file_path = get_config_file_path(base_path, config_file_value)
return {
"AWS_CONFIG_FILE": str(config_file_path),
"AWS_SIGV4A_SIGNING_REGION_SET": env_var_value,
}
@pytest.mark.parametrize(
"client_config, env_var_val, config_file_val, expected",
[
(Config(sigv4a_signing_region_set="foo"), "bar", "baz", "foo"),
(Config(sigv4a_signing_region_set="foo"), None, None, "foo"),
(None, "bar", "baz", "bar"),
(None, None, "baz", "baz"),
(Config(sigv4a_signing_region_set="foo"), None, "baz", "foo"),
(None, None, None, None),
],
)
def test_sigv4a_signing_region_set_config_from_environment(
client_config, env_var_val, config_file_val, expected, request
):
environ_mock = get_environ_mock(request, env_var_val, config_file_val)
with mock.patch('os.environ', environ_mock):
session = create_session()
s3 = session.create_client('s3', config=client_config)
assert s3.meta.config.sigv4a_signing_region_set == expected
|