File: keyvault_preparer.py

package info (click to toggle)
python-azure 20201208%2Bgit-6
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 1,437,920 kB
  • sloc: python: 4,287,452; javascript: 269; makefile: 198; sh: 187; xml: 106
file content (148 lines) | stat: -rw-r--r-- 6,034 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
# ------------------------------------
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
# ------------------------------------
import time
import random

from string import ascii_letters

from azure.mgmt.keyvault import KeyVaultManagementClient
from azure.mgmt.keyvault.models import (
    SecretPermissions,
    KeyPermissions,
    CertificatePermissions,
    StoragePermissions,
    Permissions,
    Sku,
    SkuName,
    AccessPolicyEntry,
    VaultProperties,
    VaultCreateOrUpdateParameters,
)

try:
    from azure.mgmt.keyvault.models import (
        SkuFamily
    )
except ImportError:
    pass

from azure_devtools.scenario_tests.exceptions import(
    AzureTestError, NameInUseError, ReservedResourceNameError
)

from . import AzureMgmtPreparer, ResourceGroupPreparer
from .resource_testcase import RESOURCE_GROUP_PARAM


DEFAULT_PERMISSIONS = Permissions(
    keys=[perm.value for perm in KeyPermissions],
    secrets=[perm.value for perm in SecretPermissions],
    certificates=[perm.value for perm in CertificatePermissions],
    storage=[perm.value for perm in StoragePermissions],
)
DEFAULT_SKU = SkuName.premium.value
CLIENT_OID = "00000000-0000-0000-0000-000000000000"

class KeyVaultPreparer(AzureMgmtPreparer):
    def __init__(
        self,
        name_prefix=random.choice(ascii_letters).lower(),
        sku=DEFAULT_SKU,
        permissions=DEFAULT_PERMISSIONS,
        enabled_for_deployment=True,
        enabled_for_disk_encryption=True,
        enabled_for_template_deployment=True,
        enable_soft_delete=None,
        location='westus',
        parameter_name='vault_uri',
        resource_group_parameter_name=RESOURCE_GROUP_PARAM,
        disable_recording=True,
        playback_fake_resource=None,
        client_kwargs=None,
        random_name_enabled=True
    ):
        super(KeyVaultPreparer, self).__init__(name_prefix, 24,
                                                     disable_recording=disable_recording,
                                                     playback_fake_resource=playback_fake_resource,
                                                     client_kwargs=client_kwargs,
                                                     random_name_enabled=random_name_enabled)
        self.location = location
        self.sku = sku
        self.permissions = permissions
        self.enabled_for_deployment = enabled_for_deployment
        self.enabled_for_disk_encryption = enabled_for_disk_encryption
        self.enabled_for_template_deployment = enabled_for_template_deployment
        self.enable_soft_delete = enable_soft_delete
        self.resource_group_parameter_name = resource_group_parameter_name
        self.parameter_name = parameter_name
        if random_name_enabled:
            self.resource_moniker = "vaultname"
        self.client_oid = None

    def create_resource(self, name, **kwargs):
        self.client_oid = self.test_class_instance.set_value_to_scrub("CLIENT_OID", CLIENT_OID)
        if self.is_live:
            group = self._get_resource_group(**kwargs).name
            access_policies = [
                AccessPolicyEntry(
                    tenant_id=self.test_class_instance.get_settings_value("TENANT_ID"),
                    object_id=self.client_oid,
                    permissions=self.permissions,
                )
            ]
            properties = VaultProperties(
                tenant_id=self.test_class_instance.get_settings_value("TENANT_ID"),
                sku=Sku(name=self.sku, family=SkuFamily.A) if SkuFamily else Sku(name=self.sku),
                access_policies=access_policies,
                vault_uri=None,
                enabled_for_deployment=self.enabled_for_deployment,
                enabled_for_disk_encryption=self.enabled_for_disk_encryption,
                enabled_for_template_deployment=self.enabled_for_template_deployment,
                enable_soft_delete=self.enable_soft_delete,
                enable_purge_protection=None,
            )
            parameters = VaultCreateOrUpdateParameters(location=self.location, properties=properties)
            self.client = self.create_mgmt_client(KeyVaultManagementClient)

            # ARM may return not found at first even though the resource group has been created
            retries = 4
            for i in range(retries):
                try:
                    vault = self.client.vaults.begin_create_or_update(group, name, parameters).result()
                    break
                except Exception as ex:
                    if "VaultAlreadyExists" in str(ex):
                        raise NameInUseError(name)
                    if "ReservedResourceName" in str(ex):
                        raise ReservedResourceNameError(name)
                    if "ResourceGroupNotFound" not in str(ex) or i == retries - 1:
                        raise
                    time.sleep(3)
            self.test_class_instance.scrubber.register_name_pair(
                name,
                self.resource_moniker
            )
            vault_uri = vault.properties.vault_uri
        else:
            # playback => we need only the uri used in the recording
            vault_uri = "https://{}.vault.azure.net/".format(name)
        return {self.parameter_name: vault_uri}

    def remove_resource(self, name, **kwargs):
        if self.is_live:
            group = self._get_resource_group(**kwargs).name
            self.client.vaults.delete(group, name)
            if self.enable_soft_delete:
                self.client.vaults.purge_deleted(name, self.location).wait()

    def _get_resource_group(self, **kwargs):
        try:
            return kwargs[self.resource_group_parameter_name]
        except KeyError:
            template = (
                "To create a key vault a resource group is required. Please add "
                "decorator @{} in front of this storage account preparer."
            )
            raise AzureTestError(template.format(ResourceGroupPreparer.__name__))