File: test_samples_keys_async.py

package info (click to toggle)
python-azure 20201208%2Bgit-6
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 1,437,920 kB
  • sloc: python: 4,287,452; javascript: 269; makefile: 198; sh: 187; xml: 106
file content (216 lines) | stat: -rw-r--r-- 7,355 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
# ------------------------------------
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
# ------------------------------------
import asyncio
import functools

from azure.keyvault.keys.aio import KeyClient
from devtools_testutils import ResourceGroupPreparer, KeyVaultPreparer
from _shared.preparer_async import KeyVaultClientPreparer as _KeyVaultClientPreparer
from _shared.test_case_async import KeyVaultTestCase


# pre-apply the client_cls positional argument so it needn't be explicitly passed below
KeyVaultClientPreparer = functools.partial(_KeyVaultClientPreparer, KeyClient)


def print(*args):
    assert all(arg is not None for arg in args)


def test_create_key_client():
    vault_url = "vault_url"
    # pylint:disable=unused-variable
    # [START create_key_client]
    from azure.identity.aio import DefaultAzureCredential
    from azure.keyvault.keys.aio import KeyClient

    # Create a KeyClient using default Azure credentials
    credential = DefaultAzureCredential()
    key_client = KeyClient(vault_url, credential)
    # [END create_key_client]


class TestExamplesKeyVault(KeyVaultTestCase):
    @ResourceGroupPreparer(random_name_enabled=True)
    @KeyVaultPreparer()
    @KeyVaultClientPreparer()
    async def test_example_key_crud_operations(self, client, **kwargs):
        key_client = client
        # [START create_key]
        from dateutil import parser as date_parse

        key_size = 2048
        key_ops = ["encrypt", "decrypt", "sign", "verify", "wrapKey", "unwrapKey"]
        expires_on = date_parse.parse("2050-02-02T08:00:00.000Z")

        # create a key with optional arguments
        key = await key_client.create_key(
            "key-name", "RSA", size=key_size, key_operations=key_ops, expires_on=expires_on
        )

        print(key.id)
        print(key.name)
        print(key.key_type)
        print(key.properties.enabled)
        print(key.properties.expires_on)
        # [END create_key]

        # [START create_rsa_key]
        # create an rsa key in a hardware security module
        key = await key_client.create_rsa_key("key-name", hardware_protected=True, size=2048)

        print(key.id)
        print(key.name)
        print(key.key_type)
        # [END create_rsa_key]

        # [START create_ec_key]
        # create an elliptic curve (ec) key
        key_curve = "P-256"
        ec_key = await key_client.create_ec_key("key-name", curve=key_curve)

        print(ec_key.id)
        print(ec_key.name)
        print(ec_key.key_type)
        print(ec_key.key.crv)
        # [END create_ec_key]

        # [START get_key]
        # get the latest version of a key
        key = await key_client.get_key("key-name")

        # alternatively, specify a version
        key_version = key.properties.version
        key = await key_client.get_key("key-name", key_version)

        print(key.id)
        print(key.name)
        print(key.properties.version)
        print(key.key_type)
        print(key.properties.vault_url)
        # [END get_key]

        # [START update_key]
        # update attributes of an existing key
        expires_on = date_parse.parse("2050-01-02T08:00:00.000Z")
        tags = {"foo": "updated tag"}
        updated_key = await key_client.update_key_properties(key.name, expires_on=expires_on, tags=tags)

        print(updated_key.properties.version)
        print(updated_key.properties.updated_on)
        print(updated_key.properties.expires_on)
        print(updated_key.properties.tags)
        print(updated_key.key_type)
        # [END update_key]

        # [START delete_key]
        # delete a key
        deleted_key = await key_client.delete_key("key-name")

        print(deleted_key.name)

        # if the vault has soft-delete enabled, the key's
        # scheduled purge date, deleted_date and recovery id are set
        print(deleted_key.deleted_date)
        print(deleted_key.scheduled_purge_date)
        print(deleted_key.recovery_id)
        # [END delete_key]

    @ResourceGroupPreparer(random_name_enabled=True)
    @KeyVaultPreparer()
    @KeyVaultClientPreparer()
    async def test_example_key_list_operations(self, client, **kwargs):
        key_client = client

        for i in range(4):
            await key_client.create_ec_key("key{}".format(i))
        for i in range(4):
            await key_client.create_rsa_key("key{}".format(i))

        # [START list_keys]
        # list keys
        keys = key_client.list_properties_of_keys()

        async for key in keys:
            print(key.id)
            print(key.created_on)
            print(key.name)
            print(key.updated_on)
            print(key.enabled)
        # [END list_keys]

        # [START list_properties_of_key_versions]
        # get an iterator of all versions of a key
        key_versions = key_client.list_properties_of_key_versions("key-name")

        async for key in key_versions:
            print(key.id)
            print(key.updated_on)
            print(key.properties.version)
            print(key.expires_on)
        # [END list_properties_of_key_versions]

        # [START list_deleted_keys]
        # get an iterator of deleted keys (requires soft-delete enabled for the vault)
        deleted_keys = key_client.list_deleted_keys()

        async for key in deleted_keys:
            print(key.id)
            print(key.name)
            print(key.scheduled_purge_date)
            print(key.recovery_id)
            print(key.deleted_date)
        # [END list_deleted_keys]

    @ResourceGroupPreparer(random_name_enabled=True)
    @KeyVaultPreparer()
    @KeyVaultClientPreparer()
    async def test_example_keys_backup_restore(self, client, **kwargs):
        key_client = client
        key_name = "test-key"
        await key_client.create_key(key_name, "RSA")
        # [START backup_key]
        # backup key
        key_backup = await key_client.backup_key(key_name)

        # returns the raw bytes of the backup
        print(key_backup)
        # [END backup_key]

        await key_client.delete_key(key_name)
        await key_client.purge_deleted_key(key_name)

        if self.is_live:
            await asyncio.sleep(60)

        # [START restore_key_backup]
        # restores a backup
        restored_key = await key_client.restore_key_backup(key_backup)
        print(restored_key.id)
        print(restored_key.name)
        print(restored_key.properties.version)
        # [END restore_key_backup]

    @ResourceGroupPreparer(random_name_enabled=True)
    @KeyVaultPreparer()
    @KeyVaultClientPreparer()
    async def test_example_keys_recover(self, client, **kwargs):
        key_client = client
        created_key = await key_client.create_key("key-name", "RSA")

        await key_client.delete_key(created_key.name)

        # [START get_deleted_key]
        # get a deleted key (requires soft-delete enabled for the vault)
        deleted_key = await key_client.get_deleted_key("key-name")
        print(deleted_key.name)
        # [END get_deleted_key]

        # [START recover_deleted_key]
        # recover deleted key to its latest version (requires soft-delete enabled for the vault)
        recovered_key = await key_client.recover_deleted_key("key-name")
        print(recovered_key.id)
        print(recovered_key.name)
        # [END recover_deleted_key]