File: azure.py

package info (click to toggle)
python-hvac 2.3.0-5
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 2,800 kB
  • sloc: python: 29,360; makefile: 42; sh: 14
file content (201 lines) | stat: -rw-r--r-- 7,356 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
#!/usr/bin/env python
"""Azure secret engine methods module."""
import json

from hvac import exceptions, utils
from hvac.api.vault_api_base import VaultApiBase
from hvac.constants.azure import VALID_ENVIRONMENTS

DEFAULT_MOUNT_POINT = "azure"


class Azure(VaultApiBase):
    """Azure Secrets Engine (API).

    Reference: https://www.vaultproject.io/api/secret/azure/index.html
    """

    def configure(
        self,
        subscription_id,
        tenant_id,
        client_id=None,
        client_secret=None,
        environment=None,
        mount_point=DEFAULT_MOUNT_POINT,
    ):
        """Configure the credentials required for the plugin to perform API calls to Azure.

        These credentials will be used to query roles and create/delete service principals. Environment variables will
        override any parameters set in the config.

        Supported methods:
            POST: /{mount_point}/config. Produces: 204 (empty body)


        :param subscription_id: The subscription id for the Azure Active Directory
        :type subscription_id: str | unicode
        :param tenant_id: The tenant id for the Azure Active Directory.
        :type tenant_id: str | unicode
        :param client_id: The OAuth2 client id to connect to Azure.
        :type client_id: str | unicode
        :param client_secret: The OAuth2 client secret to connect to Azure.
        :type client_secret: str | unicode
        :param environment: The Azure environment. If not specified, Vault will use Azure Public Cloud.
        :type environment: str | unicode
        :param mount_point: The OAuth2 client secret to connect to Azure.
        :type mount_point: str | unicode
        :return: The response of the request.
        :rtype: requests.Response
        """
        if environment is not None and environment not in VALID_ENVIRONMENTS:
            error_msg = 'invalid environment argument provided "{arg}", supported environments: "{environments}"'
            raise exceptions.ParamValidationError(
                error_msg.format(
                    arg=environment,
                    environments=",".join(VALID_ENVIRONMENTS),
                )
            )
        params = {
            "subscription_id": subscription_id,
            "tenant_id": tenant_id,
        }
        params.update(
            utils.remove_nones(
                {
                    "client_id": client_id,
                    "client_secret": client_secret,
                    "environment": environment,
                }
            )
        )
        api_path = utils.format_url("/v1/{mount_point}/config", mount_point=mount_point)
        return self._adapter.post(
            url=api_path,
            json=params,
        )

    def read_config(self, mount_point=DEFAULT_MOUNT_POINT):
        """Read the stored configuration, omitting client_secret.

        Supported methods:
            GET: /{mount_point}/config. Produces: 200 application/json


        :param mount_point: The "path" the method/backend was mounted on.
        :type mount_point: str | unicode
        :return: The data key from the JSON response of the request.
        :rtype: dict
        """
        api_path = utils.format_url("/v1/{mount_point}/config", mount_point=mount_point)
        response = self._adapter.get(
            url=api_path,
        )
        return response.get("data")

    def delete_config(self, mount_point=DEFAULT_MOUNT_POINT):
        """Delete the stored Azure configuration and credentials.

        Supported methods:
            DELETE: /auth/{mount_point}/config. Produces: 204 (empty body)


        :param mount_point: The "path" the method/backend was mounted on.
        :type mount_point: str | unicode
        :return: The response of the request.
        :rtype: requests.Response
        """
        api_path = utils.format_url("/v1/{mount_point}/config", mount_point=mount_point)
        return self._adapter.delete(
            url=api_path,
        )

    def create_or_update_role(
        self, name, azure_roles, ttl=None, max_ttl=None, mount_point=DEFAULT_MOUNT_POINT
    ):
        """Create or update a Vault role.

        The provided Azure roles must exist for this call to succeed. See the Azure secrets roles docs for more
        information about roles.

        Supported methods:
            POST: /{mount_point}/roles/{name}. Produces: 204 (empty body)


        :param name: Name of the role.
        :type name: str | unicode
        :param azure_roles:  List of Azure roles to be assigned to the generated service principal.
        :type azure_roles: list(dict)
        :param ttl: Specifies the default TTL for service principals generated using this role. Accepts time suffixed
            strings ("1h") or an integer number of seconds. Defaults to the system/engine default TTL time.
        :type ttl: str | unicode
        :param max_ttl: Specifies the maximum TTL for service principals generated using this role. Accepts time
            suffixed strings ("1h") or an integer number of seconds. Defaults to the system/engine max TTL time.
        :type max_ttl: str | unicode
        :param mount_point: The "path" the method/backend was mounted on.
        :type mount_point: str | unicode
        :return: The response of the request.
        :rtype: requests.Response
        """
        params = {
            "azure_roles": json.dumps(azure_roles),
        }
        params.update(
            utils.remove_nones(
                {
                    "ttl": ttl,
                    "max_ttl": max_ttl,
                }
            )
        )
        api_path = utils.format_url(
            "/v1/{mount_point}/roles/{name}",
            mount_point=mount_point,
            name=name,
        )
        return self._adapter.post(
            url=api_path,
            json=params,
        )

    def list_roles(self, mount_point=DEFAULT_MOUNT_POINT):
        """List all of the roles that are registered with the plugin.

        Supported methods:
            LIST: /{mount_point}/roles. Produces: 200 application/json


        :param mount_point: The "path" the method/backend was mounted on.
        :type mount_point: str | unicode
        :return: The data key from the JSON response of the request.
        :rtype: dict
        """
        api_path = utils.format_url("/v1/{mount_point}/roles", mount_point=mount_point)
        response = self._adapter.list(
            url=api_path,
        )
        return response.get("data")

    def generate_credentials(self, name, mount_point=DEFAULT_MOUNT_POINT):
        """Generate a new service principal based on the named role.

        Supported methods:
            GET: /{mount_point}/creds/{name}. Produces: 200 application/json


        :param name: Specifies the name of the role to create credentials against.
        :type name: str | unicode
        :param mount_point: The "path" the method/backend was mounted on.
        :type mount_point: str | unicode
        :return: The data key from the JSON response of the request.
        :rtype: dict
        """
        api_path = utils.format_url(
            "/v1/{mount_point}/creds/{name}",
            mount_point=mount_point,
            name=name,
        )
        response = self._adapter.get(
            url=api_path,
        )
        return response.get("data")