File: key.py

package info (click to toggle)
python-hvac 2.3.0-5
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 2,800 kB
  • sloc: python: 29,360; makefile: 42; sh: 14
file content (407 lines) | stat: -rw-r--r-- 15,889 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
from hvac.api.system_backend.system_backend_mixin import SystemBackendMixin
from hvac.exceptions import ParamValidationError


class Key(SystemBackendMixin):
    def read_root_generation_progress(self):
        """Read the configuration and process of the current root generation attempt.

        Supported methods:
            GET: /sys/generate-root/attempt. Produces: 200 application/json

        :return: The JSON response of the request.
        :rtype: dict
        """
        api_path = "/v1/sys/generate-root/attempt"
        return self._adapter.get(
            url=api_path,
        )

    def start_root_token_generation(self, otp=None, pgp_key=None):
        """Initialize a new root generation attempt.

        Only a single root generation attempt can take place at a time. One (and only one) of otp or pgp_key are
        required.

        Supported methods:
            PUT: /sys/generate-root/attempt. Produces: 200 application/json

        :param otp: Specifies a base64-encoded 16-byte value. The raw bytes of the token will be XOR'd with this value
            before being returned to the final unseal key provider.
        :type otp: str | unicode
        :param pgp_key: Specifies a base64-encoded PGP public key. The raw bytes of the token will be encrypted with
            this value before being returned to the final unseal key provider.
        :type pgp_key: str | unicode
        :return: The JSON response of the request.
        :rtype: dict
        """
        params = {}
        if otp is not None and pgp_key is not None:
            raise ParamValidationError(
                "one (and only one) of otp or pgp_key arguments are required"
            )
        if otp is not None:
            params["otp"] = otp
        if pgp_key is not None:
            params["pgp_key"] = pgp_key

        api_path = "/v1/sys/generate-root/attempt"
        return self._adapter.put(url=api_path, json=params)

    def generate_root(self, key, nonce):
        """Enter a single master key share to progress the root generation attempt.

        If the threshold number of master key shares is reached, Vault will complete the root generation and issue the
        new token. Otherwise, this API must be called multiple times until that threshold is met. The attempt nonce must
        be provided with each call.

        Supported methods:
            PUT: /sys/generate-root/update. Produces: 200 application/json

        :param key: Specifies a single master key share.
        :type key: str | unicode
        :param nonce: The nonce of the attempt.
        :type nonce: str | unicode
        :return: The JSON response of the request.
        :rtype: dict
        """
        params = {
            "key": key,
            "nonce": nonce,
        }
        api_path = "/v1/sys/generate-root/update"
        return self._adapter.put(
            url=api_path,
            json=params,
        )

    def cancel_root_generation(self):
        """Cancel any in-progress root generation attempt.

        This clears any progress made. This must be called to change the OTP or PGP key being used.

        Supported methods:
            DELETE: /sys/generate-root/attempt. Produces: 204 (empty body)

        :return: The response of the request.
        :rtype: request.Response
        """
        api_path = "/v1/sys/generate-root/attempt"
        return self._adapter.delete(
            url=api_path,
        )

    def get_encryption_key_status(self):
        """Read information about the current encryption key used by Vault.

        Supported methods:
            GET: /sys/key-status. Produces: 200 application/json

        :return: JSON response with information regarding the current encryption key used by Vault.
        :rtype: dict
        """
        api_path = "/v1/sys/key-status"
        return self._adapter.get(
            url=api_path,
        )

    def rotate_encryption_key(self):
        """Trigger a rotation of the backend encryption key.

        This is the key that is used to encrypt data written to the storage backend, and is not provided to operators.
        This operation is done online. Future values are encrypted with the new key, while old values are decrypted with
        previous encryption keys.

        This path requires sudo capability in addition to update.

        Supported methods:
            PUT: /sys/rorate. Produces: 204 (empty body)

        :return: The response of the request.
        :rtype: requests.Response
        """
        api_path = "/v1/sys/rotate"
        return self._adapter.put(
            url=api_path,
        )

    def read_rekey_progress(self, recovery_key=False):
        """Read the configuration and progress of the current rekey attempt.

        Supported methods:
            GET: /sys/rekey-recovery-key/init. Produces: 200 application/json
            GET: /sys/rekey/init. Produces: 200 application/json

        :param recovery_key: If true, send requests to "rekey-recovery-key" instead of "rekey" api path.
        :type recovery_key: bool
        :return: The JSON response of the request.
        :rtype: requests.Response
        """
        api_path = "/v1/sys/rekey/init"
        if recovery_key:
            api_path = "/v1/sys/rekey-recovery-key/init"
        return self._adapter.get(
            url=api_path,
        )

    def start_rekey(
        self,
        secret_shares=5,
        secret_threshold=3,
        pgp_keys=None,
        backup=False,
        require_verification=False,
        recovery_key=False,
    ):
        """Initializes a new rekey attempt.

        Only a single recovery key rekeyattempt can take place at a time, and changing the parameters of a rekey
        requires canceling and starting a new rekey, which will also provide a new nonce.

        Supported methods:
            PUT: /sys/rekey/init. Produces: 204 (empty body)
            PUT: /sys/rekey-recovery-key/init. Produces: 204 (empty body)

        :param secret_shares: Specifies the number of shares to split the master key into.
        :type secret_shares: int
        :param secret_threshold: Specifies the number of shares required to reconstruct the master key. This must be
            less than or equal to secret_shares.
        :type secret_threshold: int
        :param pgp_keys: Specifies an array of PGP public keys used to encrypt the output unseal keys. Ordering is
            preserved. The keys must be base64-encoded from their original binary representation. The size of this array
            must be the same as secret_shares.
        :type pgp_keys: list
        :param backup: Specifies if using PGP-encrypted keys, whether Vault should also store a plaintext backup of the
            PGP-encrypted keys at core/unseal-keys-backup in the physical storage backend. These can then be retrieved
            and removed via the sys/rekey/backup endpoint.
        :type backup: bool
        :param require_verification: This turns on verification functionality. When verification is turned on, after
            successful authorization with the current unseal keys, the new unseal keys are returned but the master key
            is not actually rotated. The new keys must be provided to authorize the actual rotation of the master key.
            This ensures that the new keys have been successfully saved and protects against a risk of the keys being
            lost after rotation but before they can be persisted. This can be used with without pgp_keys, and when used
            with it, it allows ensuring that the returned keys can be successfully decrypted before committing to the
            new shares, which the backup functionality does not provide.
        :param recovery_key: If true, send requests to "rekey-recovery-key" instead of "rekey" api path.
        :type recovery_key: bool
        :type require_verification: bool
        :return: The JSON dict of the response.
        :rtype: dict | request.Response
        """
        params = {
            "secret_shares": secret_shares,
            "secret_threshold": secret_threshold,
            "require_verification": require_verification,
        }

        if pgp_keys:
            if len(pgp_keys) != secret_shares:
                raise ParamValidationError(
                    "length of pgp_keys argument must equal secret shares value"
                )

            params["pgp_keys"] = pgp_keys
            params["backup"] = backup

        api_path = "/v1/sys/rekey/init"
        if recovery_key:
            api_path = "/v1/sys/rekey-recovery-key/init"
        return self._adapter.put(
            url=api_path,
            json=params,
        )

    def cancel_rekey(self, recovery_key=False):
        """Cancel any in-progress rekey.

        This clears the rekey settings as well as any progress made. This must be called to change the parameters of the
        rekey.

        Note: Verification is still a part of a rekey. If rekeying is canceled during the verification flow, the current
        unseal keys remain valid.

        Supported methods:
            DELETE: /sys/rekey/init. Produces: 204 (empty body)
            DELETE: /sys/rekey-recovery-key/init. Produces: 204 (empty body)

        :param recovery_key: If true, send requests to "rekey-recovery-key" instead of "rekey" api path.
        :type recovery_key: bool
        :return: The response of the request.
        :rtype: requests.Response
        """
        api_path = "/v1/sys/rekey/init"
        if recovery_key:
            api_path = "/v1/sys/rekey-recovery-key/init"
        return self._adapter.delete(
            url=api_path,
        )

    def rekey(self, key, nonce=None, recovery_key=False):
        """Enter a single recovery key share to progress the rekey of the Vault.

        If the threshold number of recovery key shares is reached, Vault will complete the rekey. Otherwise, this API
        must be called multiple times until that threshold is met. The rekey nonce operation must be provided with each
        call.

        Supported methods:
            PUT: /sys/rekey/update. Produces: 200 application/json
            PUT: /sys/rekey-recovery-key/update. Produces: 200 application/json

        :param key: Specifies a single recovery share key.
        :type key: str | unicode
        :param nonce: Specifies the nonce of the rekey operation.
        :type nonce: str | unicode
        :param recovery_key: If true, send requests to "rekey-recovery-key" instead of "rekey" api path.
        :type recovery_key: bool
        :return: The JSON response of the request.
        :rtype: dict
        """
        params = {
            "key": key,
        }

        if nonce is not None:
            params["nonce"] = nonce

        api_path = "/v1/sys/rekey/update"
        if recovery_key:
            api_path = "/v1/sys/rekey-recovery-key/update"
        return self._adapter.put(
            url=api_path,
            json=params,
        )

    def rekey_multi(self, keys, nonce=None, recovery_key=False):
        """Enter multiple recovery key shares to progress the rekey of the Vault.

        If the threshold number of recovery key shares is reached, Vault will complete the rekey.

        :param keys: Specifies multiple recovery share keys.
        :type keys: list
        :param nonce: Specifies the nonce of the rekey operation.
        :type nonce: str | unicode
        :param recovery_key: If true, send requests to "rekey-recovery-key" instead of "rekey" api path.
        :type recovery_key: bool
        :return: The last response of the rekey request.
        :rtype: response.Request
        """
        result = None

        for key in keys:
            result = self.rekey(
                key=key,
                nonce=nonce,
                recovery_key=recovery_key,
            )
            if result.get("complete"):
                break

        return result

    def read_backup_keys(self, recovery_key=False):
        """Retrieve the backup copy of PGP-encrypted unseal keys.

        The returned value is the nonce of the rekey operation and a map of PGP key fingerprint to hex-encoded
        PGP-encrypted key.

        Supported methods:
            PUT: /sys/rekey/backup. Produces: 200 application/json
            PUT: /sys/rekey-recovery-key/backup. Produces: 200 application/json

        :param recovery_key: If true, send requests to "rekey-recovery-key" instead of "rekey" api path.
        :type recovery_key: bool
        :return: The JSON response of the request.
        :rtype: dict
        """
        api_path = "/v1/sys/rekey/backup"
        if recovery_key:
            api_path = "/v1/sys/rekey/recovery-key-backup"
        return self._adapter.get(
            url=api_path,
        )

    def cancel_rekey_verify(self):
        """Cancel any in-progress rekey verification.
        This clears any progress made and resets the nonce. Unlike cancel_rekey, this only resets
        the current verification operation, not the entire rekey atttempt.
        The return value is the same as GET along with the new nonce.

        Supported methods:
            DELETE: /sys/rekey/verify. Produces: 204 (empty body)

        :return: The response of the request.
        :rtype: requests.Response
        """
        api_path = "/v1/sys/rekey/verify"
        return self._adapter.delete(
            url=api_path,
        )

    def rekey_verify(self, key, nonce):
        """Enter a single new recovery key share to progress the rekey verification of the Vault.
        If the threshold number of new recovery key shares is reached, Vault will complete the
        rekey. Otherwise, this API must be called multiple times until that threshold is met.
        The rekey verification nonce must be provided with each call.

        Supported methods:
            PUT: /sys/rekey/verify. Produces: 200 application/json

        :param key: Specifies multiple recovery share keys.
        :type key: str | unicode
        :param nonce: Specifies the nonce of the rekey verify operation.
        :type nonce: str | unicode
        :return: The JSON response of the request.
        :rtype: dict
        """
        params = {
            "key": key,
            "nonce": nonce,
        }

        api_path = "/v1/sys/rekey/verify"
        return self._adapter.put(
            url=api_path,
            json=params,
        )

    def rekey_verify_multi(self, keys, nonce):
        """Enter multiple new recovery key shares to progress the rekey verification of the Vault.
        If the threshold number of new recovery key shares is reached, Vault will complete the
        rekey. Otherwise, this API must be called multiple times until that threshold is met.
        The rekey verification nonce must be provided with each call.

        Supported methods:
            PUT: /sys/rekey/verify. Produces: 200 application/json

        :param keys: Specifies multiple recovery share keys.
        :type keys: list
        :param nonce: Specifies the nonce of the rekey verify operation.
        :type nonce: str | unicode
        :return: The JSON response of the request.
        :rtype: dict
        """
        result = None

        for key in keys:
            result = self.rekey_verify(
                key=key,
                nonce=nonce,
            )
            if result.get("complete"):
                break

        return result

    def read_rekey_verify_progress(self):
        """Read the configuration and progress of the current rekey verify attempt.

        Supported methods:
            GET: /sys/rekey/verify. Produces: 200 application/json

        :return: The JSON response of the request.
        :rtype: requests.Response
        """
        api_path = "/v1/sys/rekey/verify"
        return self._adapter.get(
            url=api_path,
        )