1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197
|
#!/usr/bin/env python
import logging
import re
import time
from tests.utils import get_config_file_path, create_client, is_enterprise
from tests.utils.server_manager import ServerManager
import shutil
from hvac import Client
class HvacIntegrationTestCase:
"""Base class intended to be used by all hvac integration test cases."""
manager: ServerManager = None
client: Client = None
enable_vault_ha: bool = False
use_env: bool = False
server_retry_count: int = 2 # num retries not total tries
server_retry_delay_seconds: float = 0.1
@classmethod
def setUpClass(cls):
"""Use the ServerManager class to launch a vault server process."""
config_paths = [get_config_file_path("vault-tls.hcl")]
if shutil.which("consul") is None and cls.enable_vault_ha:
logging.warning(
"Unable to run Vault in HA mode, consul binary not found in path."
)
cls.enable_vault_ha = False
if is_enterprise():
# TODO: figure out why this bit isn't working
logging.warning(
"Unable to run Vault in HA mode, enterprise Vault version not currently supported."
)
cls.enable_vault_ha = False
if cls.enable_vault_ha:
config_paths = [
get_config_file_path("vault-ha-node1.hcl"),
get_config_file_path("vault-ha-node2.hcl"),
]
cls.manager = ServerManager(
config_paths=config_paths,
use_consul=cls.enable_vault_ha,
)
while True:
try:
cls.manager.start()
cls.manager.initialize()
cls.manager.unseal()
except Exception as e:
cls.manager.stop()
logging.debug(
f"Failure in ServerManager (retries remaining: {cls.server_retry_count})\n{str(e)}"
)
if cls.server_retry_count > 0:
cls.server_retry_count -= 1
time.sleep(cls.server_retry_delay_seconds)
else:
raise
else:
break
@classmethod
def tearDownClass(cls):
"""Stop the vault server process at the conclusion of a test class."""
if cls.manager:
cls.manager.stop()
def setUp(self):
"""Set the client attribute to an authenticated hvac Client instance."""
self.client = self.manager.client
def tearDown(self):
"""Ensure the hvac Client instance's root token is reset after any auth method tests that may have modified it.
This allows subclass's to include additional tearDown logic to reset the state of the vault server when needed.
"""
self.client.token = self.manager.root_token
@staticmethod
def convert_python_ttl_value_to_expected_vault_response(ttl_value):
"""Convert any acceptable Vault TTL *input* to the expected value that Vault would return.
Vault accepts TTL values in the form r'^(?P<duration>[0-9]+)(?P<unit>[smh])?$ (number of seconds/minutes/hours).
However it returns those values as integers corresponding to seconds when retrieving configuration.
This method converts the "go duration format" arguments Vault accepts into the number (integer) of seconds
corresponding to what Vault returns.
:param ttl_value: A TTL string accepted by vault; number of seconds/minutes/hours
:type ttl_value: string
:return: The provided TTL value in the form returned by the Vault API.
:rtype: int
"""
expected_ttl = 0
if not isinstance(ttl_value, int) and ttl_value != "":
regexp_matches = re.findall(
r"(?P<duration>[0-9]+)(?P<unit>[smh])", ttl_value
)
if regexp_matches:
for regexp_match in regexp_matches:
duration, unit = regexp_match
if unit == "m":
# convert minutes to seconds
expected_ttl += int(duration) * 60
elif unit == "h":
# convert hours to seconds
expected_ttl += int(duration) * 60 * 60
else:
expected_ttl += int(duration)
elif ttl_value == "":
expected_ttl = 0
return expected_ttl
def prep_policy(self, name):
"""Add a common policy used by a subset of integration test cases."""
text = """
path "sys" {
policy = "deny"
}
path "secret" {
policy = "write"
}
"""
obj = {"path": {"sys": {"policy": "deny"}, "secret": {"policy": "write"}}}
self.client.sys.create_or_update_policy(name, text)
return text, obj
def get_vault_addr_by_standby_status(self, standby_status=True):
"""Get an address for a Vault HA node currently in standby.
:param standby_status: Value of the 'standby' key from the health status response to match.
:type standby_status: bool
:return: Standby Vault address.
:rtype: str
"""
vault_addresses = self.manager.get_active_vault_addresses()
for vault_address in vault_addresses:
health_status = create_client(url=vault_address).sys.read_health_status(
method="GET"
)
if not isinstance(health_status, dict):
health_status = health_status.json()
if health_status["standby"] == standby_status:
return vault_address
def add_admin_approle_role(
self, role_id, role_name="test-admin-role", path="approle"
):
test_admin_policy = {
"path": {
"*": {
"capabilities": [
"sudo",
"create",
"read",
"update",
"delete",
"list",
],
},
},
}
test_admin_policy_name = "test-admin-approle-policy"
self.client.sys.create_or_update_policy(
name=test_admin_policy_name,
policy=test_admin_policy,
)
self.client.auth.approle.create_or_update_approle(
role_name=role_name,
mount_point=path,
token_policies=[test_admin_policy_name],
)
self.client.auth.approle.update_role_id(
role_name=role_name,
role_id=role_id,
mount_point=path,
)
secret_id_resp = self.client.auth.approle.generate_secret_id(
role_name=role_name,
mount_point=self.TEST_APPROLE_PATH,
)
return secret_id_resp["data"]["secret_id"]
def login_using_admin_approle_role(
self, role_id, role_name="test-admin-role", path="approle"
):
secret_id = self.add_admin_approle_role(
role_id=role_id, role_name=role_name, path=path
)
self.client.auth.approle.login(
role_id=role_id,
secret_id=secret_id,
mount_point=path,
)
|