1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
|
# ------------------------------------
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
# ------------------------------------
import base64
import os
import pytest
from azure.keyvault.certificates import CertificatePolicy, WellKnownIssuerNames
from devtools_testutils import set_bodiless_matcher
from devtools_testutils.aio import recorded_by_proxy_async
from OpenSSL import crypto
from _async_test_case import AsyncCertificatesClientPreparer, get_decorator
from _shared.test_case_async import KeyVaultTestCase
all_api_versions = get_decorator(is_async=True)
class TestMergeCertificate(KeyVaultTestCase):
@pytest.mark.asyncio
@pytest.mark.parametrize("api_version", all_api_versions)
@AsyncCertificatesClientPreparer(logging_enable=True)
@recorded_by_proxy_async
async def test_merge_certificate(self, client, **kwargs):
set_bodiless_matcher()
cert_name = self.get_resource_name("mergeCertificate")
cert_policy = CertificatePolicy(
issuer_name=WellKnownIssuerNames.unknown, subject="CN=MyCert", certificate_transparency=False
)
dirname = os.path.dirname(os.path.abspath(__file__))
with open(os.path.abspath(os.path.join(dirname, "ca.key")), "rt") as f:
pkey = crypto.load_privatekey(crypto.FILETYPE_PEM, f.read())
with open(os.path.abspath(os.path.join(dirname, "ca.crt")), "rt") as f:
ca_cert = crypto.load_certificate(crypto.FILETYPE_PEM, f.read())
# the poller will stop immediately because the issuer is `Unknown`
await client.create_certificate(certificate_name=cert_name, policy=cert_policy)
certificate_operation = await client.get_certificate_operation(certificate_name=cert_name)
csr = (
"-----BEGIN CERTIFICATE REQUEST-----\n"
+ base64.b64encode(certificate_operation.csr).decode()
+ "\n-----END CERTIFICATE REQUEST-----"
)
req = crypto.load_certificate_request(crypto.FILETYPE_PEM, csr)
cert = crypto.X509()
cert.set_serial_number(1)
cert.gmtime_adj_notBefore(0)
cert.gmtime_adj_notAfter(60) # Testing certificates need not be long lived
cert.set_issuer(ca_cert.get_subject())
cert.set_subject(req.get_subject())
cert.set_pubkey(req.get_pubkey())
cert.sign(pkey, "sha256")
signed_certificate_bytes = crypto.dump_certificate(crypto.FILETYPE_PEM, cert).decode().replace("\n", "")
signed_certificate_bytes = signed_certificate_bytes.lstrip("-----BEGIN CERTIFICATE-----")
signed_certificate_bytes = signed_certificate_bytes.rstrip("-----END CERTIFICATE-----")
await client.merge_certificate(cert_name, [signed_certificate_bytes.encode()])
|