1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127
|
from unittest import TestCase
from parameterized import parameterized, param
from hvac import exceptions
from tests.utils.hvac_integration_test_case import HvacIntegrationTestCase
class TestAws(HvacIntegrationTestCase, TestCase):
TEST_MOUNT_POINT = "aws-test"
def setUp(self):
super().setUp()
if "%s/" % self.TEST_MOUNT_POINT not in self.client.sys.list_auth_methods():
self.client.sys.enable_auth_method(
method_type="aws",
path=self.TEST_MOUNT_POINT,
)
def tearDown(self):
super().tearDown()
self.client.sys.disable_auth_method(
path=self.TEST_MOUNT_POINT,
)
@parameterized.expand(
[
param(
"no params",
),
param(
"valid iam metadata input 1",
iam_metadata="default",
),
param(
"valid iam metadata input 2",
iam_metadata=["auth_type", "client_arn", "inferred_aws_region"],
),
param(
"valid ec2 metadata input 1",
ec2_metadata=["region", "ami_id", "account_id"],
),
param("valid ec2 metadata input 2", ec2_metadata="default"),
param("valid ec2 alias input 1", ec2_alias="instance_id"),
param("valid ec2 alias input 2", ec2_alias="role_id"),
param("valid iam alias input 1", iam_alias="full_arn"),
param("valid iam alias input 2", iam_alias="role_id"),
param(
"valid combination",
ec2_metadata=["region", "instance_id", "auth_type"],
iam_metadata=[
"inferred_entity_type",
"inferred_entity_id",
"canonical_arn",
"client_user_id",
"account_id",
],
ec2_alias="image_id",
iam_alias="unique_id",
),
]
)
def test_configure_identity_integration_succeeds(
self, label, ec2_metadata="", iam_metadata="", ec2_alias=None, iam_alias=None
):
configure_response = self.client.auth.aws.configure_identity_integration(
mount_point=self.TEST_MOUNT_POINT,
ec2_metadata=ec2_metadata,
iam_metadata=iam_metadata,
ec2_alias=ec2_alias,
iam_alias=iam_alias,
)
self.assertEqual(
first=bool(configure_response),
second=True,
)
@parameterized.expand(
[
param(
"invalid ec2 metadata",
raises=exceptions.InvalidRequest,
exception_message="contains an unavailable field, please select from",
ec2_metadata="something invalid",
),
param(
"invalid iam metadata",
iam_metadata="something invalid",
raises=exceptions.InvalidRequest,
exception_message="contains an unavailable field, please select from",
),
param(
"invalid iam alias",
iam_alias="something invalid",
raises=exceptions.ParamValidationError,
exception_message="invalid iam alias type provided",
),
param(
"invalid ec2 alias",
ec2_alias="something invalid",
raises=exceptions.ParamValidationError,
exception_message="invalid ec2 alias type provided",
),
]
)
def test_configure_identity_integration_fails(
self,
label,
raises,
exception_message,
ec2_metadata=None,
iam_metadata=None,
ec2_alias=None,
iam_alias=None,
):
with self.assertRaises(raises) as cm:
self.client.auth.aws.configure_identity_integration(
mount_point=self.TEST_MOUNT_POINT,
ec2_metadata=ec2_metadata,
iam_metadata=iam_metadata,
ec2_alias=ec2_alias,
iam_alias=iam_alias,
)
self.assertIn(
member=exception_message,
container=str(cm.exception),
)
|