File: key_rotation_async.py

package info (click to toggle)
python-azure 20250603%2Bgit-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 851,724 kB
  • sloc: python: 7,362,925; ansic: 804; javascript: 287; makefile: 195; sh: 145; xml: 109
file content (111 lines) | stat: -rw-r--r-- 5,818 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# ------------------------------------
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
# ------------------------------------
import asyncio
import os

from azure.identity.aio import DefaultAzureCredential
from azure.keyvault.keys import KeyRotationLifetimeAction, KeyRotationPolicy, KeyRotationPolicyAction
from azure.keyvault.keys.aio import KeyClient

# ----------------------------------------------------------------------------------------------------------
# Prerequisites:
# 1. An Azure Key Vault (https://learn.microsoft.com/azure/key-vault/quick-create-cli)
#
# 2. azure-keyvault-keys and azure-identity libraries (pip install these)
#
# 3. Set environment variable VAULT_URL with the URL of your key vault
#
# 4. Set up your environment to use azure-identity's DefaultAzureCredential. For more information about how to configure
#    the DefaultAzureCredential, refer to https://aka.ms/azsdk/python/identity/docs#azure.identity.DefaultAzureCredential
#
# 5. Key rotation permissions for your service principal in your vault
#
# ----------------------------------------------------------------------------------------------------------
# Sample - creates and updates a key's automated rotation policy, and rotates a key on-demand
#
# 1. Create a new key rotation policy (update_key_rotation_policy)
#
# 2. Get a key's current rotation policy (get_key_rotation_policy)
#
# 3. Update a key's rotation policy (update_key_rotation_policy)
#
# 4. Rotate a key on-demand (rotate_key)
#
# 5. Delete a key (delete_key)
# ----------------------------------------------------------------------------------------------------------

async def run_sample():
    # Instantiate a key client that will be used to call the service.
    # Here we use the DefaultAzureCredential, but any azure-identity credential can be used.
    VAULT_URL = os.environ["VAULT_URL"]
    credential = DefaultAzureCredential()
    client = KeyClient(vault_url=VAULT_URL, credential=credential)

    # First, create a key
    key_name = "rotation-sample-key-async"
    key = await client.create_rsa_key(key_name)
    print(f"\nCreated a key; new version is {key.properties.version}")

    # Set the key's automated rotation policy to rotate the key two months after the key was created.
    # If you pass an empty KeyRotationPolicy() as the `policy` parameter, the rotation policy will be set to the
    # default policy. Any keyword arguments will update specified properties of the policy.
    actions = [KeyRotationLifetimeAction(KeyRotationPolicyAction.rotate, time_after_create="P2M")]
    updated_policy = await client.update_key_rotation_policy(
        key_name, KeyRotationPolicy(), expires_in="P90D", lifetime_actions=actions
    )
    assert updated_policy.expires_in == "P90D"

    # The updated policy should have the specified lifetime action
    policy_action = None
    for i in range(len(updated_policy.lifetime_actions)):
        if updated_policy.lifetime_actions[i].action == KeyRotationPolicyAction.rotate:
            policy_action = updated_policy.lifetime_actions[i]
    assert policy_action, "The specified action should exist in the key rotation policy"
    assert policy_action.time_after_create == "P2M", "The action should have the specified time_after_create"
    assert policy_action.time_before_expiry is None, "The action shouldn't have a time_before_expiry"
    print(f"\nCreated a new key rotation policy: {policy_action.action} after {policy_action.time_after_create}")

    # Get the key's current rotation policy
    current_policy = await client.get_key_rotation_policy(key_name)
    policy_action = None
    for i in range(len(current_policy.lifetime_actions)):
        if current_policy.lifetime_actions[i].action == KeyRotationPolicyAction.rotate:
            policy_action = current_policy.lifetime_actions[i]
    assert policy_action
    print(f"\nCurrent rotation policy: {policy_action.action} after {policy_action.time_after_create}")

    # Update the key's automated rotation policy to notify 10 days before the key expires
    new_actions = [KeyRotationLifetimeAction(KeyRotationPolicyAction.notify, time_before_expiry="P10D")]
    # To preserve an existing rotation policy, pass in the existing policy as the `policy` parameter.
    # Any property specified as a keyword argument will be overridden completely by the provided value.
    # In this case, the rotate action we created earlier will be removed from the policy.
    new_policy = await client.update_key_rotation_policy(key_name, current_policy, lifetime_actions=new_actions)
    assert new_policy.expires_in == "P90D", "The key's expiry time should have been preserved"

    # The updated policy should include the new notify action
    notify_action = None
    for i in range(len(new_policy.lifetime_actions)):
        if new_policy.lifetime_actions[i].action == KeyRotationPolicyAction.notify:
            notify_action = new_policy.lifetime_actions[i]

    assert notify_action, "The specified action should exist in the key rotation policy"
    assert notify_action.time_after_create is None, "The action shouldn't have a time_after_create"
    assert notify_action.time_before_expiry == "P10D", "The action should have the specified time_before_expiry"
    print(f"\nNew policy action: {notify_action.action} {notify_action.time_before_expiry} before expiry")

    # Finally, you can rotate a key on-demand by creating a new version of the key
    rotated_key = await client.rotate_key(key_name)
    print(f"\nRotated the key on-demand; new version is {rotated_key.properties.version}")

    # To clean up, delete the key
    await client.delete_key(key_name)
    print("\nDeleted the key")

    await credential.close()
    await client.close()


if __name__ == "__main__":
    asyncio.run(run_sample())