1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157
|
# ------------------------------------
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
# ------------------------------------
import os
import pytest
from azure.keyvault.administration import ApiVersion
from azure.keyvault.administration._internal.client_base import DEFAULT_VERSION
from azure.identity import ManagedIdentityCredential
from devtools_testutils import AzureRecordedTestCase
class BaseClientPreparer(AzureRecordedTestCase):
def __init__(self, **kwargs) -> None:
hsm_playback_url = "https://managedhsmvaultname.managedhsm.azure.net"
container_playback_uri = "https://storagename.blob.core.windows.net/container"
playback_sas_token = "fake-sas"
if self.is_live:
hsm = os.environ.get("AZURE_MANAGEDHSM_URL")
self.managed_hsm_url = hsm if hsm else None
storage_url = os.environ.get("BLOB_STORAGE_URL")
container_name = os.environ.get("BLOB_CONTAINER_NAME")
self.container_uri = f"{storage_url}/{container_name}"
self.sas_token = os.environ.get("BLOB_STORAGE_SAS_TOKEN")
else:
self.managed_hsm_url = hsm_playback_url
self.container_uri = container_playback_uri
self.sas_token = playback_sas_token
self.managed_identity_client_id = os.environ.get("MANAGED_IDENTITY_CLIENT_ID")
use_pwsh = os.environ.get("AZURE_TEST_USE_PWSH_AUTH", "false")
use_cli = os.environ.get("AZURE_TEST_USE_CLI_AUTH", "false")
use_vscode = os.environ.get("AZURE_TEST_USE_VSCODE_AUTH", "false")
use_azd = os.environ.get("AZURE_TEST_USE_AZD_AUTH", "false")
# Only set service principal credentials if user-based auth is not requested
if use_pwsh == use_cli == use_vscode == use_azd == "false":
self._set_mgmt_settings_real_values()
def _skip_if_not_configured(self, api_version, **kwargs):
if self.is_live and api_version != DEFAULT_VERSION:
pytest.skip("This test only uses the default API version for live tests")
if self.is_live and self.managed_hsm_url is None:
pytest.skip("No HSM endpoint for live testing")
def _set_mgmt_settings_real_values(self):
if self.is_live:
os.environ["AZURE_TENANT_ID"] = os.getenv("KEYVAULT_TENANT_ID", "") # empty in pipelines
os.environ["AZURE_CLIENT_ID"] = os.getenv("KEYVAULT_CLIENT_ID", "") # empty in pipelines
os.environ["AZURE_CLIENT_SECRET"] = os.getenv("KEYVAULT_CLIENT_SECRET", "") # empty for user-based auth
class KeyVaultBackupClientPreparer(BaseClientPreparer):
def __init__(self, **kwargs) -> None:
super().__init__(**kwargs)
def __call__(self, fn):
def _preparer(test_class, api_version, **kwargs):
self._skip_if_not_configured(api_version)
kwargs["container_uri"] = self.container_uri
kwargs["managed_hsm_url"] = self.managed_hsm_url
client = self.create_backup_client(self.managed_identity_client_id, api_version=api_version, **kwargs)
with client:
fn(test_class, client, **kwargs)
return _preparer
def create_backup_client(self, managed_identity_client_id, **kwargs):
from azure.keyvault.administration import KeyVaultBackupClient
if self.is_live:
credential = ManagedIdentityCredential(client_id=managed_identity_client_id)
else:
credential = self.get_credential(KeyVaultBackupClient)
return self.create_client_from_credential(
KeyVaultBackupClient, credential=credential, vault_url=self.managed_hsm_url, **kwargs
)
class KeyVaultBackupClientSasPreparer(BaseClientPreparer):
def __init__(self, **kwargs) -> None:
super().__init__(**kwargs)
def __call__(self, fn):
def _preparer(test_class, api_version, **kwargs):
self._skip_if_not_configured(api_version)
kwargs["container_uri"] = self.container_uri
kwargs["sas_token"] = self.sas_token
kwargs["managed_hsm_url"] = self.managed_hsm_url
client = self.create_backup_client(api_version=api_version, **kwargs)
with client:
fn(test_class, client, **kwargs)
return _preparer
def create_backup_client(self, **kwargs):
from azure.keyvault.administration import KeyVaultBackupClient
credential = self.get_credential(KeyVaultBackupClient)
return self.create_client_from_credential(
KeyVaultBackupClient, credential=credential, vault_url=self.managed_hsm_url, **kwargs
)
class KeyVaultAccessControlClientPreparer(BaseClientPreparer):
def __init__(self, **kwargs) -> None:
super().__init__(**kwargs)
def __call__(self, fn):
def _preparer(test_class, api_version, **kwargs):
self._skip_if_not_configured(api_version)
client = self.create_access_control_client(api_version=api_version, **kwargs)
with client:
fn(test_class, client, **kwargs)
return _preparer
def create_access_control_client(self, **kwargs):
from azure.keyvault.administration import KeyVaultAccessControlClient
credential = self.get_credential(KeyVaultAccessControlClient)
return self.create_client_from_credential(
KeyVaultAccessControlClient, credential=credential, vault_url=self.managed_hsm_url, **kwargs
)
class KeyVaultSettingsClientPreparer(BaseClientPreparer):
def __init__(self, **kwargs) -> None:
super().__init__(**kwargs)
def __call__(self, fn):
def _preparer(test_class, api_version, **kwargs):
self._skip_if_not_configured(api_version)
client = self.create_settings_client(api_version=api_version, **kwargs)
with client:
fn(test_class, client, **kwargs)
return _preparer
def create_settings_client(self, **kwargs):
from azure.keyvault.administration import KeyVaultSettingsClient
credential = self.get_credential(KeyVaultSettingsClient)
return self.create_client_from_credential(
KeyVaultSettingsClient, credential=credential, vault_url=self.managed_hsm_url, **kwargs
)
def get_decorator(**kwargs):
"""returns a test decorator for test parameterization"""
versions = kwargs.pop("api_versions", None) or ApiVersion
params = [pytest.param(api_version) for api_version in versions]
return params
|