1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407
|
from hvac.api.system_backend.system_backend_mixin import SystemBackendMixin
from hvac.exceptions import ParamValidationError
class Key(SystemBackendMixin):
def read_root_generation_progress(self):
"""Read the configuration and process of the current root generation attempt.
Supported methods:
GET: /sys/generate-root/attempt. Produces: 200 application/json
:return: The JSON response of the request.
:rtype: dict
"""
api_path = "/v1/sys/generate-root/attempt"
return self._adapter.get(
url=api_path,
)
def start_root_token_generation(self, otp=None, pgp_key=None):
"""Initialize a new root generation attempt.
Only a single root generation attempt can take place at a time. One (and only one) of otp or pgp_key are
required.
Supported methods:
PUT: /sys/generate-root/attempt. Produces: 200 application/json
:param otp: Specifies a base64-encoded 16-byte value. The raw bytes of the token will be XOR'd with this value
before being returned to the final unseal key provider.
:type otp: str | unicode
:param pgp_key: Specifies a base64-encoded PGP public key. The raw bytes of the token will be encrypted with
this value before being returned to the final unseal key provider.
:type pgp_key: str | unicode
:return: The JSON response of the request.
:rtype: dict
"""
params = {}
if otp is not None and pgp_key is not None:
raise ParamValidationError(
"one (and only one) of otp or pgp_key arguments are required"
)
if otp is not None:
params["otp"] = otp
if pgp_key is not None:
params["pgp_key"] = pgp_key
api_path = "/v1/sys/generate-root/attempt"
return self._adapter.put(url=api_path, json=params)
def generate_root(self, key, nonce):
"""Enter a single master key share to progress the root generation attempt.
If the threshold number of master key shares is reached, Vault will complete the root generation and issue the
new token. Otherwise, this API must be called multiple times until that threshold is met. The attempt nonce must
be provided with each call.
Supported methods:
PUT: /sys/generate-root/update. Produces: 200 application/json
:param key: Specifies a single master key share.
:type key: str | unicode
:param nonce: The nonce of the attempt.
:type nonce: str | unicode
:return: The JSON response of the request.
:rtype: dict
"""
params = {
"key": key,
"nonce": nonce,
}
api_path = "/v1/sys/generate-root/update"
return self._adapter.put(
url=api_path,
json=params,
)
def cancel_root_generation(self):
"""Cancel any in-progress root generation attempt.
This clears any progress made. This must be called to change the OTP or PGP key being used.
Supported methods:
DELETE: /sys/generate-root/attempt. Produces: 204 (empty body)
:return: The response of the request.
:rtype: request.Response
"""
api_path = "/v1/sys/generate-root/attempt"
return self._adapter.delete(
url=api_path,
)
def get_encryption_key_status(self):
"""Read information about the current encryption key used by Vault.
Supported methods:
GET: /sys/key-status. Produces: 200 application/json
:return: JSON response with information regarding the current encryption key used by Vault.
:rtype: dict
"""
api_path = "/v1/sys/key-status"
return self._adapter.get(
url=api_path,
)
def rotate_encryption_key(self):
"""Trigger a rotation of the backend encryption key.
This is the key that is used to encrypt data written to the storage backend, and is not provided to operators.
This operation is done online. Future values are encrypted with the new key, while old values are decrypted with
previous encryption keys.
This path requires sudo capability in addition to update.
Supported methods:
PUT: /sys/rorate. Produces: 204 (empty body)
:return: The response of the request.
:rtype: requests.Response
"""
api_path = "/v1/sys/rotate"
return self._adapter.put(
url=api_path,
)
def read_rekey_progress(self, recovery_key=False):
"""Read the configuration and progress of the current rekey attempt.
Supported methods:
GET: /sys/rekey-recovery-key/init. Produces: 200 application/json
GET: /sys/rekey/init. Produces: 200 application/json
:param recovery_key: If true, send requests to "rekey-recovery-key" instead of "rekey" api path.
:type recovery_key: bool
:return: The JSON response of the request.
:rtype: requests.Response
"""
api_path = "/v1/sys/rekey/init"
if recovery_key:
api_path = "/v1/sys/rekey-recovery-key/init"
return self._adapter.get(
url=api_path,
)
def start_rekey(
self,
secret_shares=5,
secret_threshold=3,
pgp_keys=None,
backup=False,
require_verification=False,
recovery_key=False,
):
"""Initializes a new rekey attempt.
Only a single recovery key rekeyattempt can take place at a time, and changing the parameters of a rekey
requires canceling and starting a new rekey, which will also provide a new nonce.
Supported methods:
PUT: /sys/rekey/init. Produces: 204 (empty body)
PUT: /sys/rekey-recovery-key/init. Produces: 204 (empty body)
:param secret_shares: Specifies the number of shares to split the master key into.
:type secret_shares: int
:param secret_threshold: Specifies the number of shares required to reconstruct the master key. This must be
less than or equal to secret_shares.
:type secret_threshold: int
:param pgp_keys: Specifies an array of PGP public keys used to encrypt the output unseal keys. Ordering is
preserved. The keys must be base64-encoded from their original binary representation. The size of this array
must be the same as secret_shares.
:type pgp_keys: list
:param backup: Specifies if using PGP-encrypted keys, whether Vault should also store a plaintext backup of the
PGP-encrypted keys at core/unseal-keys-backup in the physical storage backend. These can then be retrieved
and removed via the sys/rekey/backup endpoint.
:type backup: bool
:param require_verification: This turns on verification functionality. When verification is turned on, after
successful authorization with the current unseal keys, the new unseal keys are returned but the master key
is not actually rotated. The new keys must be provided to authorize the actual rotation of the master key.
This ensures that the new keys have been successfully saved and protects against a risk of the keys being
lost after rotation but before they can be persisted. This can be used with without pgp_keys, and when used
with it, it allows ensuring that the returned keys can be successfully decrypted before committing to the
new shares, which the backup functionality does not provide.
:param recovery_key: If true, send requests to "rekey-recovery-key" instead of "rekey" api path.
:type recovery_key: bool
:type require_verification: bool
:return: The JSON dict of the response.
:rtype: dict | request.Response
"""
params = {
"secret_shares": secret_shares,
"secret_threshold": secret_threshold,
"require_verification": require_verification,
}
if pgp_keys:
if len(pgp_keys) != secret_shares:
raise ParamValidationError(
"length of pgp_keys argument must equal secret shares value"
)
params["pgp_keys"] = pgp_keys
params["backup"] = backup
api_path = "/v1/sys/rekey/init"
if recovery_key:
api_path = "/v1/sys/rekey-recovery-key/init"
return self._adapter.put(
url=api_path,
json=params,
)
def cancel_rekey(self, recovery_key=False):
"""Cancel any in-progress rekey.
This clears the rekey settings as well as any progress made. This must be called to change the parameters of the
rekey.
Note: Verification is still a part of a rekey. If rekeying is canceled during the verification flow, the current
unseal keys remain valid.
Supported methods:
DELETE: /sys/rekey/init. Produces: 204 (empty body)
DELETE: /sys/rekey-recovery-key/init. Produces: 204 (empty body)
:param recovery_key: If true, send requests to "rekey-recovery-key" instead of "rekey" api path.
:type recovery_key: bool
:return: The response of the request.
:rtype: requests.Response
"""
api_path = "/v1/sys/rekey/init"
if recovery_key:
api_path = "/v1/sys/rekey-recovery-key/init"
return self._adapter.delete(
url=api_path,
)
def rekey(self, key, nonce=None, recovery_key=False):
"""Enter a single recovery key share to progress the rekey of the Vault.
If the threshold number of recovery key shares is reached, Vault will complete the rekey. Otherwise, this API
must be called multiple times until that threshold is met. The rekey nonce operation must be provided with each
call.
Supported methods:
PUT: /sys/rekey/update. Produces: 200 application/json
PUT: /sys/rekey-recovery-key/update. Produces: 200 application/json
:param key: Specifies a single recovery share key.
:type key: str | unicode
:param nonce: Specifies the nonce of the rekey operation.
:type nonce: str | unicode
:param recovery_key: If true, send requests to "rekey-recovery-key" instead of "rekey" api path.
:type recovery_key: bool
:return: The JSON response of the request.
:rtype: dict
"""
params = {
"key": key,
}
if nonce is not None:
params["nonce"] = nonce
api_path = "/v1/sys/rekey/update"
if recovery_key:
api_path = "/v1/sys/rekey-recovery-key/update"
return self._adapter.put(
url=api_path,
json=params,
)
def rekey_multi(self, keys, nonce=None, recovery_key=False):
"""Enter multiple recovery key shares to progress the rekey of the Vault.
If the threshold number of recovery key shares is reached, Vault will complete the rekey.
:param keys: Specifies multiple recovery share keys.
:type keys: list
:param nonce: Specifies the nonce of the rekey operation.
:type nonce: str | unicode
:param recovery_key: If true, send requests to "rekey-recovery-key" instead of "rekey" api path.
:type recovery_key: bool
:return: The last response of the rekey request.
:rtype: response.Request
"""
result = None
for key in keys:
result = self.rekey(
key=key,
nonce=nonce,
recovery_key=recovery_key,
)
if result.get("complete"):
break
return result
def read_backup_keys(self, recovery_key=False):
"""Retrieve the backup copy of PGP-encrypted unseal keys.
The returned value is the nonce of the rekey operation and a map of PGP key fingerprint to hex-encoded
PGP-encrypted key.
Supported methods:
PUT: /sys/rekey/backup. Produces: 200 application/json
PUT: /sys/rekey-recovery-key/backup. Produces: 200 application/json
:param recovery_key: If true, send requests to "rekey-recovery-key" instead of "rekey" api path.
:type recovery_key: bool
:return: The JSON response of the request.
:rtype: dict
"""
api_path = "/v1/sys/rekey/backup"
if recovery_key:
api_path = "/v1/sys/rekey/recovery-key-backup"
return self._adapter.get(
url=api_path,
)
def cancel_rekey_verify(self):
"""Cancel any in-progress rekey verification.
This clears any progress made and resets the nonce. Unlike cancel_rekey, this only resets
the current verification operation, not the entire rekey atttempt.
The return value is the same as GET along with the new nonce.
Supported methods:
DELETE: /sys/rekey/verify. Produces: 204 (empty body)
:return: The response of the request.
:rtype: requests.Response
"""
api_path = "/v1/sys/rekey/verify"
return self._adapter.delete(
url=api_path,
)
def rekey_verify(self, key, nonce):
"""Enter a single new recovery key share to progress the rekey verification of the Vault.
If the threshold number of new recovery key shares is reached, Vault will complete the
rekey. Otherwise, this API must be called multiple times until that threshold is met.
The rekey verification nonce must be provided with each call.
Supported methods:
PUT: /sys/rekey/verify. Produces: 200 application/json
:param key: Specifies multiple recovery share keys.
:type key: str | unicode
:param nonce: Specifies the nonce of the rekey verify operation.
:type nonce: str | unicode
:return: The JSON response of the request.
:rtype: dict
"""
params = {
"key": key,
"nonce": nonce,
}
api_path = "/v1/sys/rekey/verify"
return self._adapter.put(
url=api_path,
json=params,
)
def rekey_verify_multi(self, keys, nonce):
"""Enter multiple new recovery key shares to progress the rekey verification of the Vault.
If the threshold number of new recovery key shares is reached, Vault will complete the
rekey. Otherwise, this API must be called multiple times until that threshold is met.
The rekey verification nonce must be provided with each call.
Supported methods:
PUT: /sys/rekey/verify. Produces: 200 application/json
:param keys: Specifies multiple recovery share keys.
:type keys: list
:param nonce: Specifies the nonce of the rekey verify operation.
:type nonce: str | unicode
:return: The JSON response of the request.
:rtype: dict
"""
result = None
for key in keys:
result = self.rekey_verify(
key=key,
nonce=nonce,
)
if result.get("complete"):
break
return result
def read_rekey_verify_progress(self):
"""Read the configuration and progress of the current rekey verify attempt.
Supported methods:
GET: /sys/rekey/verify. Produces: 200 application/json
:return: The JSON response of the request.
:rtype: requests.Response
"""
api_path = "/v1/sys/rekey/verify"
return self._adapter.get(
url=api_path,
)
|