File: init.py

package info (click to toggle)
python-hvac 2.3.0-5
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 2,800 kB
  • sloc: python: 29,360; makefile: 42; sh: 14
file content (146 lines) | stat: -rw-r--r-- 6,253 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
import warnings
from hvac.api.system_backend.system_backend_mixin import SystemBackendMixin
from hvac.exceptions import ParamValidationError


class Init(SystemBackendMixin):
    def read_init_status(self):
        """Read the initialization status of Vault.

        Supported methods:
            GET: /sys/init. Produces: 200 application/json

        :return: The JSON response of the request.
        :rtype: dict
        """
        api_path = "/v1/sys/init"
        return self._adapter.get(
            url=api_path,
        )

    def is_initialized(self):
        """Determine is Vault is initialized or not.

        :return: True if Vault is initialized, False otherwise.
        :rtype: bool
        """
        status = self.read_init_status()
        return status["initialized"]

    def initialize(
        self,
        secret_shares=None,
        secret_threshold=None,
        pgp_keys=None,
        root_token_pgp_key=None,
        stored_shares=None,
        recovery_shares=None,
        recovery_threshold=None,
        recovery_pgp_keys=None,
    ):
        """Initialize a new Vault.

        The Vault must not have been previously initialized. The recovery options, as well as the stored shares option,
        are only available when using Vault HSM.

        Supported methods:
            PUT: /sys/init. Produces: 200 application/json

        :param secret_shares: The number of shares to split the master key into.
        :type secret_shares: int
        :param secret_threshold: Specifies the number of shares required to reconstruct the master key. This must be
            less than or equal secret_shares. If using Vault HSM with auto-unsealing, this value must be the same as
            secret_shares, or omitted, depending on the version of Vault and the seal type.
        :type secret_threshold: int
        :param pgp_keys: List of PGP public keys used to encrypt the output unseal keys.
            Ordering is preserved. The keys must be base64-encoded from their original binary representation.
            The size of this array must be the same as secret_shares.
        :type pgp_keys: list
        :param root_token_pgp_key: Specifies a PGP public key used to encrypt the initial root token. The
            key must be base64-encoded from its original binary representation.
        :type root_token_pgp_key: str | unicode
        :param stored_shares: <enterprise only> Specifies the number of shares that should be encrypted by the HSM and
            stored for auto-unsealing. Currently must be the same as secret_shares.
        :type stored_shares: int
        :param recovery_shares: <enterprise only> Specifies the number of shares to split the recovery key into.
        :type recovery_shares: int
        :param recovery_threshold: <enterprise only> Specifies the number of shares required to reconstruct the recovery
            key. This must be less than or equal to recovery_shares.
        :type recovery_threshold: int
        :param recovery_pgp_keys: <enterprise only> Specifies an array of PGP public keys used to encrypt the output
            recovery keys. Ordering is preserved. The keys must be base64-encoded from their original binary
            representation. The size of this array must be the same as recovery_shares.
        :type recovery_pgp_keys: list
        :return: The JSON response of the request.
        :rtype: dict
        """

        # TODO(v3.0.0): remove this
        if recovery_shares is None and secret_shares is None:
            msg = (
                "The secret_shares parameter will default to None in hvac v3.0.0. "
                "To use the old default with no warning, explicitly set this value to 5. "
                "See https://github.com/hvac/hvac/issues/1030"
            )
            warnings.warn(
                message=msg,
                category=DeprecationWarning,
                stacklevel=2,
            )
            secret_shares = 5

        # TODO(v3.0.0): remove this
        if recovery_threshold is None and secret_threshold is None:
            msg = (
                "The secret_threshold parameter will default to None in hvac v3.0.0. "
                "To use the old default with no warning, explicitly set this value to 3. "
                "See https://github.com/hvac/hvac/issues/1030"
            )
            warnings.warn(
                message=msg,
                category=DeprecationWarning,
                stacklevel=2,
            )
            secret_threshold = 3

        params = {
            "secret_shares": secret_shares,
            "secret_threshold": secret_threshold,
            "root_token_pgp_key": root_token_pgp_key,
        }

        if pgp_keys is not None and secret_shares is not None:
            if len(pgp_keys) != secret_shares:
                raise ParamValidationError(
                    "length of pgp_keys list argument must equal secret_shares value"
                )
            params["pgp_keys"] = pgp_keys

        if stored_shares is not None and secret_shares is not None:
            if stored_shares != secret_shares:
                raise ParamValidationError(
                    "value for stored_shares argument must equal secret_shares argument"
                )
            params["stored_shares"] = stored_shares

        if recovery_shares is not None:
            params["recovery_shares"] = recovery_shares

            if recovery_threshold is not None:
                if recovery_threshold > recovery_shares:
                    error_msg = "value for recovery_threshold argument must be less than or equal to recovery_shares argument"
                    raise ParamValidationError(error_msg)
                params["recovery_threshold"] = recovery_threshold

            if recovery_pgp_keys is not None:
                if len(recovery_pgp_keys) != recovery_shares:
                    raise ParamValidationError(
                        "length of recovery_pgp_keys list argument must equal recovery_shares value"
                    )
                params["recovery_pgp_keys"] = recovery_pgp_keys

        api_path = "/v1/sys/init"
        return self._adapter.put(
            url=api_path,
            json=params,
        )