File: aws.py

package info (click to toggle)
python-hvac 2.3.0-5
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 2,800 kB
  • sloc: python: 29,360; makefile: 42; sh: 14
file content (407 lines) | stat: -rw-r--r-- 17,328 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
#!/usr/bin/env python
"""Aws methods module."""
import json

from hvac import exceptions, utils
from hvac.api.vault_api_base import VaultApiBase
from hvac.constants.aws import (
    DEFAULT_MOUNT_POINT,
    ALLOWED_CREDS_ENDPOINTS,
    ALLOWED_CREDS_TYPES,
)


class Aws(VaultApiBase):
    """AWS Secrets Engine (API).

    Reference: https://www.vaultproject.io/api/secret/aws/index.html
    """

    def configure_root_iam_credentials(
        self,
        access_key,
        secret_key,
        region=None,
        iam_endpoint=None,
        sts_endpoint=None,
        max_retries=None,
        mount_point=DEFAULT_MOUNT_POINT,
    ):
        """Configure the root IAM credentials to communicate with AWS.

        There are multiple ways to pass root IAM credentials to the Vault server, specified below with the highest
        precedence first. If credentials already exist, this will overwrite them.

        The official AWS SDK is used for sourcing credentials from env vars, shared files, or IAM/ECS instances.

            * Static credentials provided to the API as a payload
            * Credentials in the AWS_ACCESS_KEY, AWS_SECRET_KEY, and AWS_REGION environment variables on the server
            * Shared credentials files
            * Assigned IAM role or ECS task role credentials

        At present, this endpoint does not confirm that the provided AWS credentials are valid AWS credentials with
        proper permissions.

        Supported methods:
            POST: /{mount_point}/config/root. Produces: 204 (empty body)

        :param access_key: Specifies the AWS access key ID.
        :type access_key: str | unicode
        :param secret_key: Specifies the AWS secret access key.
        :type secret_key: str | unicode
        :param region: Specifies the AWS region. If not set it will use the AWS_REGION env var, AWS_DEFAULT_REGION env
            var, or us-east-1 in that order.
        :type region: str | unicode
        :param iam_endpoint: Specifies a custom HTTP IAM endpoint to use.
        :type iam_endpoint: str | unicode
        :param sts_endpoint: Specifies a custom HTTP STS endpoint to use.
        :type sts_endpoint: str | unicode
        :param max_retries: Number of max retries the client should use for recoverable errors. The default (-1) falls
            back to the AWS SDK's default behavior.
        :type max_retries: int
        :param mount_point: The "path" the method/backend was mounted on.
        :type mount_point: str | unicode
        :return: The response of the request.
        :rtype: requests.Response
        """
        params = {
            "access_key": access_key,
            "secret_key": secret_key,
            "max_retries": max_retries,
        }
        params.update(
            utils.remove_nones(
                {
                    "region": region,
                    "iam_endpoint": iam_endpoint,
                    "sts_endpoint": sts_endpoint,
                }
            )
        )
        api_path = utils.format_url(
            "/v1/{mount_point}/config/root", mount_point=mount_point
        )
        return self._adapter.post(
            url=api_path,
            json=params,
        )

    def rotate_root_iam_credentials(self, mount_point=DEFAULT_MOUNT_POINT):
        """Rotate static root IAM credentials.

        When you have configured Vault with static credentials, you can use this endpoint to have Vault rotate the
        access key it used. Note that, due to AWS eventual consistency, after calling this endpoint, subsequent calls
        from Vault to AWS may fail for a few seconds until AWS becomes consistent again.

        In order to call this endpoint, Vault's AWS access key MUST be the only access key on the IAM user; otherwise,
        generation of a new access key will fail. Once this method is called, Vault will now be the only entity that
        knows the AWS secret key is used to access AWS.

        Supported methods:
            POST: /{mount_point}/config/rotate-root. Produces: 200 application/json

        :return: The JSON response of the request.
        :rtype: dict
        """
        api_path = utils.format_url(
            "/v1/{mount_point}/config/rotate-root", mount_point=mount_point
        )
        return self._adapter.post(
            url=api_path,
        )

    def configure_lease(self, lease, lease_max, mount_point=DEFAULT_MOUNT_POINT):
        """Configure lease settings for the AWS secrets engine.

        It is optional, as there are default values for lease and lease_max.

        Supported methods:
            POST: /{mount_point}/config/lease. Produces: 204 (empty body)

        :param lease: Specifies the lease value provided as a string duration with time suffix. "h" (hour) is the
            largest suffix.
        :type lease: str | unicode
        :param lease_max: Specifies the maximum lease value provided as a string duration with time suffix. "h" (hour)
            is the largest suffix.
        :type lease_max: str | unicode
        :param mount_point: The "path" the method/backend was mounted on.
        :type mount_point: str | unicode
        :return: The response of the request.
        :rtype: requests.Response
        """
        params = {
            "lease": lease,
            "lease_max": lease_max,
        }
        api_path = utils.format_url(
            "/v1/{mount_point}/config/lease", mount_point=mount_point
        )
        return self._adapter.post(
            url=api_path,
            json=params,
        )

    def read_lease_config(self, mount_point=DEFAULT_MOUNT_POINT):
        """Read the current lease settings for the AWS secrets engine.

        Supported methods:
            GET: /{mount_point}/config/lease. Produces: 200 application/json

        :param mount_point: The "path" the method/backend was mounted on.
        :type mount_point: str | unicode
        :return: The JSON response of the request.
        :rtype: dict
        """
        api_path = utils.format_url(
            "/v1/{mount_point}/config/lease", mount_point=mount_point
        )
        return self._adapter.get(
            url=api_path,
        )

    def create_or_update_role(
        self,
        name,
        credential_type,
        policy_document=None,
        default_sts_ttl=None,
        max_sts_ttl=None,
        role_arns=None,
        policy_arns=None,
        legacy_params=False,
        iam_tags=None,
        mount_point=DEFAULT_MOUNT_POINT,
    ):
        """Create or update the role with the given name.

        If a role with the name does not exist, it will be created. If the role exists, it will be updated with the new
        attributes.

        Supported methods:
            POST: /{mount_point}/roles/{name}. Produces: 204 (empty body)

        :param name: Specifies the name of the role to create. This is part of the request URL.
        :type name: str | unicode
        :param credential_type: Specifies the type of credential to be used when retrieving credentials from the role.
            Must be one of iam_user, assumed_role, or federation_token.
        :type credential_type: str | unicode
        :param policy_document: The IAM policy document for the role. The behavior depends on the credential type. With
            iam_user, the policy document will be attached to the IAM user generated and augment the permissions the IAM
            user has. With assumed_role and federation_token, the policy document will act as a filter on what the
            credentials can do.
        :type policy_document: dict | str | unicode
        :param default_sts_ttl: The default TTL for STS credentials. When a TTL is not specified when STS credentials
            are requested, and a default TTL is specified on the role, then this default TTL will be used. Valid only
            when credential_type is one of assumed_role or federation_token.
        :type default_sts_ttl: str | unicode
        :param max_sts_ttl: The max allowed TTL for STS credentials (credentials TTL are capped to max_sts_ttl). Valid
            only when credential_type is one of assumed_role or federation_token.
        :type max_sts_ttl: str | unicode
        :param role_arns: Specifies the ARNs of the AWS roles this Vault role is allowed to assume. Required when
            credential_type is assumed_role and prohibited otherwise. This is a comma-separated string or JSON array.
            String types supported for Vault legacy parameters.
        :type role_arns: list | str | unicode
        :param policy_arns: Specifies the ARNs of the AWS managed policies to be attached to IAM users when they are
            requested. Valid only when credential_type is iam_user. When credential_type is iam_user, at least one of
            policy_arns or policy_document must be specified. This is a comma-separated string or JSON array.
        :type policy_arns: list
        :param legacy_params: Flag to send legacy (Vault versions < 0.11.0) parameters in the request. When this is set
            to True, policy_document and policy_arns are the only parameters used from this method.
        :type legacy_params: bool
        :param iam_tags: A list of strings representing a key/value pair to be used for any IAM user that is created by
            this role. Format is a key and value separated by an =.
        :type iam_tags: list
        :param mount_point: The "path" the method/backend was mounted on.
        :type mount_point: str | unicode
        :return: The response of the request.
        :rtype: requests.Response
        """
        if credential_type not in ALLOWED_CREDS_TYPES:
            error_msg = 'invalid credential_type argument provided "{arg}", supported types: "{allowed_types}"'
            raise exceptions.ParamValidationError(
                error_msg.format(
                    arg=credential_type,
                    allowed_types=", ".join(ALLOWED_CREDS_TYPES),
                )
            )
        if isinstance(policy_document, dict):
            policy_document = json.dumps(policy_document, indent=4, sort_keys=True)

        if legacy_params:
            # Support for Vault <0.11.0
            params = {
                "policy": policy_document,
                "arn": policy_arns[0] if isinstance(policy_arns, list) else policy_arns,
            }
        else:
            params = {
                "credential_type": credential_type,
            }
            params.update(
                utils.remove_nones(
                    {
                        "policy_document": policy_document,
                        "default_sts_ttl": default_sts_ttl,
                        "max_sts_ttl": max_sts_ttl,
                        "role_arns": role_arns,
                        "policy_arns": policy_arns,
                        "iam_tags": iam_tags,
                    }
                )
            )
        api_path = utils.format_url(
            "/v1/{mount_point}/roles/{name}",
            mount_point=mount_point,
            name=name,
        )
        return self._adapter.post(
            url=api_path,
            json=params,
        )

    def read_role(self, name, mount_point=DEFAULT_MOUNT_POINT):
        """Query an existing role by the given name.

        If the role does not exist, a 404 is returned.

        Supported methods:
            GET: /{mount_point}/roles/{name}. Produces: 200 application/json

        :param name: Specifies the name of the role to read. This is part of the request URL.
        :type name: str | unicode
        :param mount_point: The "path" the method/backend was mounted on.
        :type mount_point: str | unicode
        :return: The JSON response of the request.
        :rtype: dict
        """
        api_path = utils.format_url(
            "/v1/{mount_point}/roles/{name}",
            mount_point=mount_point,
            name=name,
        )
        return self._adapter.get(
            url=api_path,
        )

    def list_roles(self, mount_point=DEFAULT_MOUNT_POINT):
        """List all existing roles in the secrets engine.

        Supported methods:
            LIST: /{mount_point}/roles. Produces: 200 application/json

        :param mount_point: The "path" the method/backend was mounted on.
        :type mount_point: str | unicode
        :return: The JSON response of the request.
        :rtype: dict
        """
        api_path = utils.format_url("/v1/{mount_point}/roles", mount_point=mount_point)
        return self._adapter.list(
            url=api_path,
        )

    def delete_role(self, name, mount_point=DEFAULT_MOUNT_POINT):
        """Delete an existing role by the given name.

        If the role does not exist, a 404 is returned.

        Supported methods:
            DELETE: /{mount_point}/roles/{name}. Produces: 204 (empty body)

        :param name: the name of the role to delete. This
            is part of the request URL.
        :type name: str | unicode
        :param mount_point: The "path" the method/backend was mounted on.
        :type mount_point: str | unicode
        :return: The response of the request.
        :rtype: requests.Response
        """
        api_path = utils.format_url(
            "/v1/{mount_point}/roles/{name}",
            mount_point=mount_point,
            name=name,
        )
        return self._adapter.delete(
            url=api_path,
        )

    def generate_credentials(
        self,
        name,
        role_arn=None,
        ttl=None,
        endpoint="creds",
        mount_point=DEFAULT_MOUNT_POINT,
        role_session_name=None,
    ):
        """Generates credential based on the named role.

        This role must be created before queried.

        The ``/aws/creds`` and ``/aws/sts`` endpoints are almost identical. The exception is when retrieving credentials for a
        role that was specified with the legacy arn or policy parameter. In this case, credentials retrieved through
        ``/aws/sts`` must be of either the ``assumed_role`` or ``federation_token`` types, and credentials retrieved through
        ``/aws/creds`` must be of the ``iam_user`` type.

        :param name: Specifies the name of the role to generate credentials against. This is part of the request URL.
        :type name: str | unicode
        :param role_arn: The ARN of the role to assume if ``credential_type`` on the Vault role is assumed_role. Must match
            one of the allowed role ARNs in the Vault role. Optional if the Vault role only allows a single AWS role
            ARN; required otherwise.
        :type role_arn: str | unicode
        :param ttl: Specifies the TTL for the use of the STS token. This is specified as a string with a duration
            suffix. Valid only when ``credential_type`` is ``assumed_role`` or ``federation_token``. When not specified, the default
            sts_ttl set for the role will be used. If that is also not set, then the default value of ``3600s`` will be
            used. AWS places limits on the maximum TTL allowed. See the AWS documentation on the ``DurationSeconds``
            parameter for AssumeRole (for ``assumed_role`` credential types) and GetFederationToken (for ``federation_token``
            credential types) for more details.
        :type ttl: str | unicode
        :param endpoint: Supported endpoints are ``creds`` and ``sts``:
            GET: ``/{mount_point}/creds/{name}``. Produces: 200 application/json
            POST: ``/{mount_point}/sts/{name}``. Produces: 200 application/json
        :type endpoint: str | unicode
        :param mount_point: The "path" the method/backend was mounted on.
        :type mount_point: str | unicode
        :param role_session_name: The role session name to attach to the assumed role ARN.
            ``role_session_name`` is limited to 64 characters; if exceeded, the ``role_session_name`` in the assumed role
            ARN will be truncated to 64 characters. If ``role_session_name`` is not provided, then it will be generated
            dynamically by default.
        :type role_session_name: str | unicode

        :return: The JSON response of the request.
        :rtype: dict
        """
        if endpoint not in ALLOWED_CREDS_ENDPOINTS:
            error_msg = 'invalid endpoint argument provided "{arg}", supported types: "{allowed_endpoints}"'
            raise exceptions.ParamValidationError(
                error_msg.format(
                    arg=endpoint,
                    allowed_endpoints=", ".join(ALLOWED_CREDS_ENDPOINTS),
                )
            )
        params = {}
        params.update(
            utils.remove_nones(
                {
                    "role_arn": role_arn,
                    "role_session_name": role_session_name,
                    "ttl": ttl,
                }
            )
        )
        api_path = utils.format_url(
            "/v1/{mount_point}/{endpoint}/{name}",
            mount_point=mount_point,
            endpoint=endpoint,
            name=name,
        )

        if endpoint == "sts":
            return self._adapter.post(
                url=api_path,
                json=params,
            )
        else:
            return self._adapter.get(
                url=api_path,
                params=params,
            )