1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111
|
# ------------------------------------
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
# ------------------------------------
import asyncio
import os
from azure.identity.aio import DefaultAzureCredential
from azure.keyvault.keys import KeyRotationLifetimeAction, KeyRotationPolicy, KeyRotationPolicyAction
from azure.keyvault.keys.aio import KeyClient
# ----------------------------------------------------------------------------------------------------------
# Prerequisites:
# 1. An Azure Key Vault (https://learn.microsoft.com/azure/key-vault/quick-create-cli)
#
# 2. azure-keyvault-keys and azure-identity libraries (pip install these)
#
# 3. Set environment variable VAULT_URL with the URL of your key vault
#
# 4. Set up your environment to use azure-identity's DefaultAzureCredential. For more information about how to configure
# the DefaultAzureCredential, refer to https://aka.ms/azsdk/python/identity/docs#azure.identity.DefaultAzureCredential
#
# 5. Key rotation permissions for your service principal in your vault
#
# ----------------------------------------------------------------------------------------------------------
# Sample - creates and updates a key's automated rotation policy, and rotates a key on-demand
#
# 1. Create a new key rotation policy (update_key_rotation_policy)
#
# 2. Get a key's current rotation policy (get_key_rotation_policy)
#
# 3. Update a key's rotation policy (update_key_rotation_policy)
#
# 4. Rotate a key on-demand (rotate_key)
#
# 5. Delete a key (delete_key)
# ----------------------------------------------------------------------------------------------------------
async def run_sample():
# Instantiate a key client that will be used to call the service.
# Here we use the DefaultAzureCredential, but any azure-identity credential can be used.
VAULT_URL = os.environ["VAULT_URL"]
credential = DefaultAzureCredential()
client = KeyClient(vault_url=VAULT_URL, credential=credential)
# First, create a key
key_name = "rotation-sample-key-async"
key = await client.create_rsa_key(key_name)
print(f"\nCreated a key; new version is {key.properties.version}")
# Set the key's automated rotation policy to rotate the key two months after the key was created.
# If you pass an empty KeyRotationPolicy() as the `policy` parameter, the rotation policy will be set to the
# default policy. Any keyword arguments will update specified properties of the policy.
actions = [KeyRotationLifetimeAction(KeyRotationPolicyAction.rotate, time_after_create="P2M")]
updated_policy = await client.update_key_rotation_policy(
key_name, KeyRotationPolicy(), expires_in="P90D", lifetime_actions=actions
)
assert updated_policy.expires_in == "P90D"
# The updated policy should have the specified lifetime action
policy_action = None
for i in range(len(updated_policy.lifetime_actions)):
if updated_policy.lifetime_actions[i].action == KeyRotationPolicyAction.rotate:
policy_action = updated_policy.lifetime_actions[i]
assert policy_action, "The specified action should exist in the key rotation policy"
assert policy_action.time_after_create == "P2M", "The action should have the specified time_after_create"
assert policy_action.time_before_expiry is None, "The action shouldn't have a time_before_expiry"
print(f"\nCreated a new key rotation policy: {policy_action.action} after {policy_action.time_after_create}")
# Get the key's current rotation policy
current_policy = await client.get_key_rotation_policy(key_name)
policy_action = None
for i in range(len(current_policy.lifetime_actions)):
if current_policy.lifetime_actions[i].action == KeyRotationPolicyAction.rotate:
policy_action = current_policy.lifetime_actions[i]
assert policy_action
print(f"\nCurrent rotation policy: {policy_action.action} after {policy_action.time_after_create}")
# Update the key's automated rotation policy to notify 10 days before the key expires
new_actions = [KeyRotationLifetimeAction(KeyRotationPolicyAction.notify, time_before_expiry="P10D")]
# To preserve an existing rotation policy, pass in the existing policy as the `policy` parameter.
# Any property specified as a keyword argument will be overridden completely by the provided value.
# In this case, the rotate action we created earlier will be removed from the policy.
new_policy = await client.update_key_rotation_policy(key_name, current_policy, lifetime_actions=new_actions)
assert new_policy.expires_in == "P90D", "The key's expiry time should have been preserved"
# The updated policy should include the new notify action
notify_action = None
for i in range(len(new_policy.lifetime_actions)):
if new_policy.lifetime_actions[i].action == KeyRotationPolicyAction.notify:
notify_action = new_policy.lifetime_actions[i]
assert notify_action, "The specified action should exist in the key rotation policy"
assert notify_action.time_after_create is None, "The action shouldn't have a time_after_create"
assert notify_action.time_before_expiry == "P10D", "The action should have the specified time_before_expiry"
print(f"\nNew policy action: {notify_action.action} {notify_action.time_before_expiry} before expiry")
# Finally, you can rotate a key on-demand by creating a new version of the key
rotated_key = await client.rotate_key(key_name)
print(f"\nRotated the key on-demand; new version is {rotated_key.properties.version}")
# To clean up, delete the key
await client.delete_key(key_name)
print("\nDeleted the key")
await credential.close()
await client.close()
if __name__ == "__main__":
asyncio.run(run_sample())
|