1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196
|
# Copyright 2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
import os
from botocore import UNSIGNED
from botocore.session import get_session
from awscli.customizations import globalargs
from awscli.customizations.exceptions import ParamValidationError
from awscli.testutils import mock, unittest
class FakeParsedArgs:
def __init__(self, **kwargs):
self.__dict__.update(kwargs)
def __getattr__(self, arg):
return None
class FakeSession:
def __init__(self, session_vars=None, config_file_vars=None):
if session_vars is None:
session_vars = {}
self.session_var_map = session_vars
if config_file_vars is None:
config_file_vars = {}
self.config_file_vars = config_file_vars
def get_config_variable(
self, name, methods=('env', 'config'), default=None
):
value = None
config_name, envvar_name = self.session_var_map[name]
if methods is not None:
if 'env' in methods and value is None:
value = os.environ.get(envvar_name)
if 'config' in methods and value is None:
value = self.config_file_vars.get(config_name)
else:
value = default
return value
class TestGlobalArgsCustomization(unittest.TestCase):
def test_parse_query(self):
parsed_args = FakeParsedArgs(query='foo.bar')
globalargs.resolve_types(parsed_args)
# Assert that it looks like a jmespath parsed expression.
self.assertFalse(isinstance(parsed_args.query, str))
self.assertTrue(hasattr(parsed_args.query, 'search'))
def test_parse_query_error_message(self):
# Invalid jmespath expression.
parsed_args = FakeParsedArgs(query='foo.bar.')
with self.assertRaises(ParamValidationError):
globalargs.resolve_types(parsed_args)
globalargs.resolve_types('query')
def test_parse_verify_ssl_default_value(self):
with mock.patch('os.environ', {}):
parsed_args = FakeParsedArgs(verify_ssl=True, ca_bundle=None)
session_var_map = {'ca_bundle': ('ca_bundle', 'AWS_CA_BUNDLE')}
session = FakeSession(session_vars=session_var_map)
globalargs.resolve_verify_ssl(parsed_args, session)
# None, so that botocore can apply it's default logic.
self.assertIsNone(parsed_args.verify_ssl)
def test_parse_verify_ssl_verify_turned_off(self):
with mock.patch('os.environ', {}):
parsed_args = FakeParsedArgs(verify_ssl=False, ca_bundle=None)
session_var_map = {'ca_bundle': ('ca_bundle', 'AWS_CA_BUNDLE')}
session = FakeSession(session_vars=session_var_map)
globalargs.resolve_verify_ssl(parsed_args, session)
self.assertFalse(parsed_args.verify_ssl)
def test_cli_overrides_cert_bundle(self):
environ = {}
with mock.patch('os.environ', environ):
parsed_args = FakeParsedArgs(
verify_ssl=True, ca_bundle='/path/to/cli_bundle.pem'
)
config_file_vars = {}
session_var_map = {'ca_bundle': ('ca_bundle', 'AWS_CA_BUNDLE')}
session = FakeSession(
session_vars=session_var_map, config_file_vars=config_file_vars
)
globalargs.resolve_verify_ssl(parsed_args, session)
self.assertEqual(parsed_args.verify_ssl, '/path/to/cli_bundle.pem')
def test_cli_overrides_env_cert_bundle(self):
environ = {
'AWS_CA_BUNDLE': '/path/to/env_bundle.pem',
}
with mock.patch('os.environ', environ):
parsed_args = FakeParsedArgs(
verify_ssl=True, ca_bundle='/path/to/cli_bundle.pem'
)
config_file_vars = {}
session_var_map = {'ca_bundle': ('ca_bundle', 'AWS_CA_BUNDLE')}
session = FakeSession(
session_vars=session_var_map, config_file_vars=config_file_vars
)
globalargs.resolve_verify_ssl(parsed_args, session)
self.assertEqual(parsed_args.verify_ssl, '/path/to/cli_bundle.pem')
def test_no_verify_ssl_overrides_cli_cert_bundle(self):
environ = {
'AWS_CA_BUNDLE': '/path/to/env_bundle.pem',
}
with mock.patch('os.environ', environ):
parsed_args = FakeParsedArgs(
verify_ssl=False, ca_bundle='/path/to/cli_bundle.pem'
)
config_file_vars = {}
session_var_map = {'ca_bundle': ('ca_bundle', 'AWS_CA_BUNDLE')}
session = FakeSession(
session_vars=session_var_map, config_file_vars=config_file_vars
)
globalargs.resolve_verify_ssl(parsed_args, session)
self.assertFalse(parsed_args.verify_ssl)
def test_no_sign_request_if_option_specified(self):
args = FakeParsedArgs(sign_request=False)
session = mock.Mock()
with mock.patch(
'awscli.customizations.globalargs._update_default_client_config'
) as mock_update:
globalargs.no_sign_request(args, session)
mock_update.assert_called_once_with(
session, 'signature_version', UNSIGNED
)
def test_request_signed_by_default(self):
args = FakeParsedArgs(sign_request=True)
session = mock.Mock()
globalargs.no_sign_request(args, session)
self.assertFalse(session.register.called)
def test_invalid_endpoint_url(self):
# Invalid endpoint URL
parsed_args = FakeParsedArgs(endpoint_url='missing-scheme.com')
with self.assertRaises(ParamValidationError):
globalargs.resolve_types(parsed_args)
def test_valid_endpoint_url(self):
parsed_args = FakeParsedArgs(endpoint_url='http://custom-endpoint.com')
globalargs.resolve_types(parsed_args)
self.assertEqual(
parsed_args.endpoint_url, 'http://custom-endpoint.com'
)
def test_cli_read_timeout(self):
parsed_args = FakeParsedArgs(read_timeout='60')
session = get_session()
globalargs.resolve_cli_read_timeout(parsed_args, session)
self.assertEqual(parsed_args.read_timeout, 60)
self.assertEqual(session.get_default_client_config().read_timeout, 60)
def test_cli_connect_timeout(self):
parsed_args = FakeParsedArgs(connect_timeout='60')
session = get_session()
globalargs.resolve_cli_connect_timeout(parsed_args, session)
self.assertEqual(parsed_args.connect_timeout, 60)
self.assertEqual(
session.get_default_client_config().connect_timeout, 60
)
def test_cli_read_timeout_for_blocking(self):
parsed_args = FakeParsedArgs(read_timeout='0')
session = get_session()
globalargs.resolve_cli_read_timeout(parsed_args, session)
self.assertEqual(parsed_args.read_timeout, None)
self.assertEqual(
session.get_default_client_config().read_timeout, None
)
def test_cli_connect_timeout_for_blocking(self):
parsed_args = FakeParsedArgs(connect_timeout='0')
session = get_session()
globalargs.resolve_cli_connect_timeout(parsed_args, session)
self.assertEqual(parsed_args.connect_timeout, None)
self.assertEqual(
session.get_default_client_config().connect_timeout, None
)
|