File: test_storage.py

package info (click to toggle)
python-azure 20181112%2Bgit-2
  • links: PTS, VCS
  • area: main
  • in suites: buster
  • size: 407,300 kB
  • sloc: python: 717,190; makefile: 201; sh: 76
file content (192 lines) | stat: -rw-r--r-- 11,982 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
import uuid
from devtools_testutils import AzureMgmtTestCase, ResourceGroupPreparer, StorageAccountPreparer
from keyvault_preparer import KeyVaultPreparer
from keyvault_testcase import KeyvaultTestCase
from azure.mgmt.authorization import AuthorizationManagementClient
from azure.mgmt.authorization.models import RoleAssignmentCreateParameters
from azure.keyvault.models import StorageAccountAttributes


class KeyVaultSecretTest(KeyvaultTestCase):

    @ResourceGroupPreparer()
    @StorageAccountPreparer(name_prefix='kvsa1')
    @KeyVaultPreparer()
    def test_e2e(self, vault, storage_account, resource_group, **kwargs):

        if not self.is_live:
            sa_id = '{}/providers/Microsoft.Storage/storageAccounts/{}'.format(resource_group.id, storage_account.name)
        else:
            sa_id = storage_account.id

            # find the role definition for "Storage Account Key Operator Service Role"
            filter_str = 'roleName eq \'Storage Account Key Operator Service Role\''
            authorization_mgmt_client = self.create_mgmt_client(AuthorizationManagementClient)
            role_id = list(authorization_mgmt_client.role_definitions.list(scope='/', filter=filter_str))[0].id

            # create a role assignment granting the key vault service principal this role
            role_params = RoleAssignmentCreateParameters(role_definition_id=role_id,
                                                         # the Azure Key Vault service id
                                                         principal_id='93c27d83-f79b-4cb2-8dd4-4aa716542e74')

            authorization_mgmt_client.role_assignments.create(scope=sa_id,
                                                              role_assignment_name='d7607bd3-a467-4a14-ab5f-f4b016ffbfff',
                                                              parameters=role_params)

        # add the storage account to the vault using the users KeyVaultClient
        attributes = StorageAccountAttributes(enabled=True)
        self.client.set_storage_account(vault_base_url=vault.properties.vault_uri,
                                        storage_account_name=storage_account.name,
                                        resource_id=sa_id,
                                        active_key_name='key1',
                                        auto_regenerate_key=True,
                                        regeneration_period='P30D',
                                        storage_account_attributes=attributes)

        # update active key for the storage account
        self.client.update_storage_account(vault_base_url=vault.properties.vault_uri,
                                           storage_account_name=storage_account.name,
                                           active_key_name='key2')

        self.client.regenerate_storage_account_key(vault_base_url=vault.properties.vault_uri,
                                                   storage_account_name=storage_account.name,
                                                   key_name='key1')

        self.create_account_sas_definition(storage_account.name, vault.properties.vault_uri)

        self.create_blob_sas_defintion(storage_account.name, vault.properties.vault_uri)

        self.get_sas_definitions(storage_account.name, vault.properties.vault_uri)

        self.client.delete_storage_account(vault_base_url=vault.properties.vault_uri,
                                           storage_account_name=storage_account.name)


    def create_account_sas_definition(self, storage_account_name, vault_url):
        """
        Creates an account sas definition, to manage storage account and its entities.
        """
        from azure.storage.common import SharedAccessSignature, CloudStorageAccount
        from azure.keyvault.models import SasTokenType, SasDefinitionAttributes
        from azure.keyvault import SecretId

        # To create an account sas definition in the vault we must first create the template. The
        # template_uri for an account sas definition is the intended account sas token signed with an arbitrary key.
        # Use the SharedAccessSignature class from azure.storage.common to create a account sas token
        sas = SharedAccessSignature(account_name=storage_account_name,
                                    # don't sign the template with the storage account key use key 00000000
                                    account_key='00000000')
        account_sas_template = sas.generate_account(services='bfqt',  # all services blob, file, queue and table
                                                    resource_types='sco',  # all resources service, template, object
                                                    permission='acdlpruw',
                                                    # all permissions add, create, list, process, read, update, write
                                                    expiry='2020-01-01')  # expiry will be ignored and validity period will determine token expiry

        # use the created template to create a sas definition in the vault
        attributes = SasDefinitionAttributes(enabled=True)
        sas_def = self.client.set_sas_definition(vault_base_url=vault_url,
                                                          storage_account_name=storage_account_name,
                                                          sas_definition_name='acctall',
                                                          template_uri=account_sas_template,
                                                          sas_type=SasTokenType.account,
                                                          validity_period='PT2H',
                                                          sas_definition_attributes=attributes)

        # When the sas definition is created a corresponding managed secret is also created in the vault, the. This
        # secret is used to provision sas tokens according to the sas definition.  Users retrieve the sas token
        # via the get_secret method.

        # get the secret id from the returned SasDefinitionBundle
        sas_secret_id = SecretId(uri=sas_def.secret_id)
        # call get_secret and the value of the returned SecretBundle will be a newly issued sas token
        acct_sas_token = self.client.get_secret(vault_base_url=sas_secret_id.vault,
                                                         secret_name=sas_secret_id.name,
                                                         secret_version=sas_secret_id.version).value

        # create the cloud storage account object
        cloud_storage_account = CloudStorageAccount(account_name=storage_account_name,
                                                    sas_token=acct_sas_token)

        # create a blob with the account sas token
        blob_service = cloud_storage_account.create_block_blob_service()
        blob_service.create_container('blobcontainer')
        blob_service.create_blob_from_text(container_name='blobcontainer',
                                           blob_name='blob1',
                                           text=u'test blob1 data')

    def create_blob_sas_defintion(self, storage_account_name, vault_url):
        """
        Creates a service SAS definition with access to a blob container.
        """

        from azure.storage.blob import BlockBlobService, ContainerPermissions
        from azure.keyvault.models import SasTokenType, SasDefinitionAttributes
        from azure.keyvault import SecretId

        # create the blob sas definition template
        # the sas template uri for service sas definitions contains the storage entity url with the template token
        # this sample demonstrates constructing the template uri for a blob container, but a similar approach can
        # be used for all other storage service, i.e. File, Queue, Table

        # create a template sas token for the container
        service = BlockBlobService(account_name=storage_account_name,
                                   # don't sign the template with the storage account key use key 00000000
                                   account_key='00000000')
        permissions = ContainerPermissions(read=True, write=True, delete=True, list=True)
        temp_token = service.generate_container_shared_access_signature(container_name='blobcontainer',
                                                                        permission=permissions,
                                                                        expiry='2020-01-01')

        # use the BlockBlobService to construct the template uri for the container sas definition
        blob_sas_template_uri = service.make_container_url(container_name='blobcontainer',
                                                           protocol='https',
                                                           sas_token=temp_token)
        # create the sas definition in the vault
        attributes = SasDefinitionAttributes(enabled=True)
        blob_sas_def = self.client.set_sas_definition(vault_base_url=vault_url,
                                                               storage_account_name=storage_account_name,
                                                               sas_definition_name='blobcontall',
                                                               template_uri=blob_sas_template_uri,
                                                               sas_type=SasTokenType.service,
                                                               validity_period='PT2H',
                                                               sas_definition_attributes=attributes)

        # use the sas definition to provision a sas token and use it to  create a BlockBlobClient
        # which can interact with blobs in the container

        # get the secret_id of the container sas definition and get the token from the vault as a secret
        sas_secret_id = SecretId(uri=blob_sas_def.secret_id)
        blob_sas_token = self.client.get_secret(vault_base_url=sas_secret_id.vault,
                                                         secret_name=sas_secret_id.name,
                                                         secret_version=sas_secret_id.version).value
        service = BlockBlobService(account_name=storage_account_name,
                                   sas_token=blob_sas_token)
        service.create_blob_from_text(container_name='blobcontainer',
                                      blob_name='blob2',
                                      text=u'test blob2 data')
        blobs = list(service.list_blobs(container_name='blobcontainer'))

        for blob in blobs:
            service.delete_blob(container_name='blobcontainer',
                                blob_name=blob.name)

    def get_sas_definitions(self, storage_account_name, vault_url):
        """
        List the sas definitions for the storage account, and get each.
        """
        from azure.keyvault import StorageSasDefinitionId

        # list the sas definitions for the storage account
        print('list and get sas definitions for the managed storage account')
        sas_defs = list(self.client.get_sas_definitions(vault_base_url=vault_url,
                                                                 storage_account_name=storage_account_name,
                                                                 maxresults=5))

        # for each sas definition parse the id and get the SasDefinitionBundle
        for s in sas_defs:
            sas_def_id = StorageSasDefinitionId(uri=s.id)
            sas_def = self.client.get_sas_definition(vault_base_url=sas_def_id.vault,
                                                              storage_account_name=sas_def_id.account_name,
                                                              sas_definition_name=sas_def_id.sas_definition)
            print(sas_def_id.sas_definition, sas_def.template_uri)