1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146
|
import warnings
from hvac.api.system_backend.system_backend_mixin import SystemBackendMixin
from hvac.exceptions import ParamValidationError
class Init(SystemBackendMixin):
def read_init_status(self):
"""Read the initialization status of Vault.
Supported methods:
GET: /sys/init. Produces: 200 application/json
:return: The JSON response of the request.
:rtype: dict
"""
api_path = "/v1/sys/init"
return self._adapter.get(
url=api_path,
)
def is_initialized(self):
"""Determine is Vault is initialized or not.
:return: True if Vault is initialized, False otherwise.
:rtype: bool
"""
status = self.read_init_status()
return status["initialized"]
def initialize(
self,
secret_shares=None,
secret_threshold=None,
pgp_keys=None,
root_token_pgp_key=None,
stored_shares=None,
recovery_shares=None,
recovery_threshold=None,
recovery_pgp_keys=None,
):
"""Initialize a new Vault.
The Vault must not have been previously initialized. The recovery options, as well as the stored shares option,
are only available when using Vault HSM.
Supported methods:
PUT: /sys/init. Produces: 200 application/json
:param secret_shares: The number of shares to split the master key into.
:type secret_shares: int
:param secret_threshold: Specifies the number of shares required to reconstruct the master key. This must be
less than or equal secret_shares. If using Vault HSM with auto-unsealing, this value must be the same as
secret_shares, or omitted, depending on the version of Vault and the seal type.
:type secret_threshold: int
:param pgp_keys: List of PGP public keys used to encrypt the output unseal keys.
Ordering is preserved. The keys must be base64-encoded from their original binary representation.
The size of this array must be the same as secret_shares.
:type pgp_keys: list
:param root_token_pgp_key: Specifies a PGP public key used to encrypt the initial root token. The
key must be base64-encoded from its original binary representation.
:type root_token_pgp_key: str | unicode
:param stored_shares: <enterprise only> Specifies the number of shares that should be encrypted by the HSM and
stored for auto-unsealing. Currently must be the same as secret_shares.
:type stored_shares: int
:param recovery_shares: <enterprise only> Specifies the number of shares to split the recovery key into.
:type recovery_shares: int
:param recovery_threshold: <enterprise only> Specifies the number of shares required to reconstruct the recovery
key. This must be less than or equal to recovery_shares.
:type recovery_threshold: int
:param recovery_pgp_keys: <enterprise only> Specifies an array of PGP public keys used to encrypt the output
recovery keys. Ordering is preserved. The keys must be base64-encoded from their original binary
representation. The size of this array must be the same as recovery_shares.
:type recovery_pgp_keys: list
:return: The JSON response of the request.
:rtype: dict
"""
# TODO(v3.0.0): remove this
if recovery_shares is None and secret_shares is None:
msg = (
"The secret_shares parameter will default to None in hvac v3.0.0. "
"To use the old default with no warning, explicitly set this value to 5. "
"See https://github.com/hvac/hvac/issues/1030"
)
warnings.warn(
message=msg,
category=DeprecationWarning,
stacklevel=2,
)
secret_shares = 5
# TODO(v3.0.0): remove this
if recovery_threshold is None and secret_threshold is None:
msg = (
"The secret_threshold parameter will default to None in hvac v3.0.0. "
"To use the old default with no warning, explicitly set this value to 3. "
"See https://github.com/hvac/hvac/issues/1030"
)
warnings.warn(
message=msg,
category=DeprecationWarning,
stacklevel=2,
)
secret_threshold = 3
params = {
"secret_shares": secret_shares,
"secret_threshold": secret_threshold,
"root_token_pgp_key": root_token_pgp_key,
}
if pgp_keys is not None and secret_shares is not None:
if len(pgp_keys) != secret_shares:
raise ParamValidationError(
"length of pgp_keys list argument must equal secret_shares value"
)
params["pgp_keys"] = pgp_keys
if stored_shares is not None and secret_shares is not None:
if stored_shares != secret_shares:
raise ParamValidationError(
"value for stored_shares argument must equal secret_shares argument"
)
params["stored_shares"] = stored_shares
if recovery_shares is not None:
params["recovery_shares"] = recovery_shares
if recovery_threshold is not None:
if recovery_threshold > recovery_shares:
error_msg = "value for recovery_threshold argument must be less than or equal to recovery_shares argument"
raise ParamValidationError(error_msg)
params["recovery_threshold"] = recovery_threshold
if recovery_pgp_keys is not None:
if len(recovery_pgp_keys) != recovery_shares:
raise ParamValidationError(
"length of recovery_pgp_keys list argument must equal recovery_shares value"
)
params["recovery_pgp_keys"] = recovery_pgp_keys
api_path = "/v1/sys/init"
return self._adapter.put(
url=api_path,
json=params,
)
|