File: test_samples_keys.py

package info (click to toggle)
python-azure 20250603%2Bgit-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 851,724 kB
  • sloc: python: 7,362,925; ansic: 804; javascript: 287; makefile: 195; sh: 145; xml: 109
file content (250 lines) | stat: -rw-r--r-- 8,870 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
# ------------------------------------
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
# ------------------------------------
import os
import time

import pytest
from azure.keyvault.keys import KeyType
from devtools_testutils import recorded_by_proxy

from _shared.test_case import KeyVaultTestCase
from _test_case import KeysClientPreparer, get_decorator
from _keys_test_case import KeysTestCase

all_api_versions = get_decorator(only_vault=True)
only_hsm = get_decorator(only_hsm=True)


def print(*args):
    assert all(arg is not None for arg in args)


@pytest.mark.playback_test_only("Can't run in live pipelines, and there's no reason to.")
def test_create_key_client():
    vault_url = "vault_url"
    # pylint:disable=unused-variable
    # [START create_key_client]
    from azure.identity import DefaultAzureCredential
    from azure.keyvault.keys import KeyClient

    # Create a KeyClient using default Azure credentials
    credential = DefaultAzureCredential()
    key_client = KeyClient(vault_url, credential)
    # [END create_key_client]


class TestExamplesKeyVault(KeyVaultTestCase, KeysTestCase):
    @pytest.mark.parametrize("api_version,is_hsm",all_api_versions)
    @KeysClientPreparer()
    @recorded_by_proxy
    def test_example_key_crud_operations(self, key_client, **kwargs):
        if (self.is_live and os.environ["KEYVAULT_SKU"] != "premium"):
            pytest.skip("This test is not supported on standard SKU vaults. Follow up with service team")

        key_name = self.get_resource_name("key-name")

        # [START create_key]
        from dateutil import parser as date_parse

        expires_on = date_parse.parse("2050-02-02T08:00:00.000Z")

        # create a key with optional arguments
        key = key_client.create_key(key_name, KeyType.rsa_hsm, expires_on=expires_on)

        print(key.name)
        print(key.id)
        print(key.key_type)
        print(key.properties.expires_on)
        # [END create_key]

        # [START create_rsa_key]
        key_size = 2048
        key_ops = ["encrypt", "decrypt", "sign", "verify", "wrapKey", "unwrapKey"]

        # create an rsa key with size specification
        # RSA key can be created with default size of '2048'
        key = key_client.create_rsa_key(key_name, hardware_protected=True, size=key_size, key_operations=key_ops)

        print(key.id)
        print(key.name)
        print(key.key_type)
        print(key.key_operations)
        # [END create_rsa_key]

        # [START create_ec_key]
        key_curve = "P-256"

        # create an EC (Elliptic curve) key with curve specification
        # EC key can be created with default curve of 'P-256'
        ec_key = key_client.create_ec_key(key_name, curve=key_curve)

        print(ec_key.id)
        print(ec_key.properties.version)
        print(ec_key.key_type)
        print(ec_key.key.crv)
        # [END create_ec_key]

        # [START get_key]
        # get the latest version of a key
        key = key_client.get_key(key_name)

        # alternatively, specify a version
        key_version = key.properties.version
        key = key_client.get_key(key_name, key_version)

        print(key.id)
        print(key.name)
        print(key.properties.version)
        print(key.key_type)
        print(key.properties.vault_url)
        # [END get_key]

        # [START update_key]
        # update attributes of an existing key
        expires_on = date_parse.parse("2050-01-02T08:00:00.000Z")
        tags = {"foo": "updated tag"}
        updated_key = key_client.update_key_properties(key.name, expires_on=expires_on, tags=tags)

        print(updated_key.properties.version)
        print(updated_key.properties.updated_on)
        print(updated_key.properties.expires_on)
        print(updated_key.properties.tags)
        print(key.key_type)
        # [END update_key]

        # [START delete_key]
        # delete a key
        deleted_key_poller = key_client.begin_delete_key(key_name)
        deleted_key = deleted_key_poller.result()

        print(deleted_key.name)

        # if the vault has soft-delete enabled, the key's deleted_date,
        # scheduled purge date and recovery id are set
        print(deleted_key.deleted_date)
        print(deleted_key.scheduled_purge_date)
        print(deleted_key.recovery_id)

        # if you want to block until deletion is complete, call wait() on the poller
        deleted_key_poller.wait()
        # [END delete_key]

    @pytest.mark.parametrize("api_version,is_hsm",only_hsm)
    @KeysClientPreparer()
    @recorded_by_proxy
    def test_example_create_oct_key(self, key_client, **kwargs):
        key_name = self.get_resource_name("key")

        # [START create_oct_key]
        key = key_client.create_oct_key(key_name, size=256, hardware_protected=True)

        print(key.id)
        print(key.name)
        print(key.key_type)
        # [END create_oct_key]

    @pytest.mark.parametrize("api_version,is_hsm",all_api_versions)
    @KeysClientPreparer()
    @recorded_by_proxy
    def test_example_key_list_operations(self, key_client, **kwargs):
        for i in range(4):
            key_name = self.get_resource_name(f"key{i}")
            key_client.create_ec_key(key_name)
        for i in range(4):
            key_name = self.get_resource_name(f"key{i}")
            key_client.create_rsa_key(key_name)

        # [START list_keys]
        # get an iterator of keys
        keys = key_client.list_properties_of_keys()

        for key in keys:
            print(key.id)
            print(key.name)
        # [END list_keys]

        # [START list_properties_of_key_versions]
        # get an iterator of a key's versions
        key_versions = key_client.list_properties_of_key_versions("key-name")

        for key in key_versions:
            print(key.id)
            print(key.name)
        # [END list_properties_of_key_versions]

        # [START list_deleted_keys]
        # get an iterator of deleted keys (requires soft-delete enabled for the vault)
        deleted_keys = key_client.list_deleted_keys()

        for key in deleted_keys:
            print(key.id)
            print(key.name)
            print(key.scheduled_purge_date)
            print(key.recovery_id)
            print(key.deleted_date)
        # [END list_deleted_keys]

    @pytest.mark.parametrize("api_version,is_hsm",all_api_versions)
    @KeysClientPreparer()
    @recorded_by_proxy
    def test_example_keys_backup_restore(self, key_client, **kwargs):
        key_name = self.get_resource_name("keyrec")
        key_client.create_key(key_name, "RSA")
        # [START backup_key]
        # backup key
        key_backup = key_client.backup_key(key_name)

        # returns the raw bytes of the backed up key
        print(key_backup)
        # [END backup_key]

        key_client.begin_delete_key(key_name).wait()
        key_client.purge_deleted_key(key_name)

        if self.is_live:
            # perform operations to prevent our connection from getting closed while waiting
            time.sleep(60)
            wait_key_name = self.get_resource_name("waitkey")
            key_client.create_key(wait_key_name, "RSA")
            time.sleep(60)
            key_client.get_key(wait_key_name)
            time.sleep(60)

        # [START restore_key_backup]
        # restore a key backup
        restored_key = key_client.restore_key_backup(key_backup)
        print(restored_key.id)
        print(restored_key.properties.version)
        # [END restore_key_backup]

    @pytest.mark.parametrize("api_version,is_hsm",all_api_versions)
    @KeysClientPreparer()
    @recorded_by_proxy
    def test_example_keys_recover(self, key_client, **kwargs):
        key_name = self.get_resource_name("key-name")
        created_key = key_client.create_key(key_name, "RSA")
        key_client.begin_delete_key(created_key.name).wait()
        # [START get_deleted_key]
        # get a deleted key (requires soft-delete enabled for the vault)
        deleted_key = key_client.get_deleted_key(key_name)
        print(deleted_key.name)

        # if the vault has soft-delete enabled, the key's deleted_date
        # scheduled purge date and recovery id are set
        print(deleted_key.deleted_date)
        print(deleted_key.scheduled_purge_date)
        print(deleted_key.recovery_id)
        # [END get_deleted_key]

        # [START recover_deleted_key]
        # recover a deleted key to its latest version (requires soft-delete enabled for the vault)
        recover_key_poller = key_client.begin_recover_deleted_key(key_name)
        recovered_key = recover_key_poller.result()
        print(recovered_key.id)
        print(recovered_key.name)

        # if you want to block until key is recovered server-side, call wait() on the poller
        recover_key_poller.wait()
        # [END recover_deleted_key]