1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107
|
import logging
from unittest import TestCase
from hvac import exceptions
from tests import utils
from tests.utils.hvac_integration_test_case import HvacIntegrationTestCase
class TestLease(HvacIntegrationTestCase, TestCase):
def setUp(self):
super().setUp()
# Set up a test pki backend and issue a cert against some role so we.
utils.configure_pki(client=self.client)
def tearDown(self):
# Reset integration test state.
utils.disable_pki(client=self.client)
super().tearDown()
def test_read_lease(self):
pki_issue_response = self.client.write(
path="pki/issue/my-role",
common_name="test.hvac.com",
)
# Read the lease of our test cert that was just issued.
read_lease_response = self.client.sys.read_lease(
lease_id=pki_issue_response["lease_id"],
)
logging.debug("read_lease_response: %s" % read_lease_response)
# Validate we received the expected lease ID back in our response.
self.assertEqual(
first=pki_issue_response["lease_id"],
second=read_lease_response["data"]["id"],
)
def test_list_leases(self):
self.client.write(
path="pki/issue/my-role",
common_name="test.hvac.com",
)
# List the lease of our test cert that was just issued.
list_leases_response = self.client.sys.list_leases(
prefix="pki",
)
logging.debug("list_leases_response: %s" % list_leases_response)
self.assertIn(
member="issue/",
container=list_leases_response["data"]["keys"],
)
def test_revoke_lease(self):
pki_issue_response = self.client.write(
path="pki/issue/my-role",
common_name="test.hvac.com",
)
# Revoke the lease of our test cert that was just issued.
revoke_lease_response = self.client.sys.revoke_lease(
lease_id=pki_issue_response["lease_id"],
)
logging.debug("revoke_lease_response: %s" % revoke_lease_response)
self.assertEqual(
first=bool(revoke_lease_response),
second=True,
)
with self.assertRaises(exceptions.InvalidPath):
self.client.sys.list_leases(
prefix="pki",
)
def test_revoke_prefix(self):
pki_issue_response = self.client.write(
path="pki/issue/my-role",
common_name="test.hvac.com",
)
# Revoke the lease prefix of our test cert that was just issued.
revoke_prefix_response = self.client.sys.revoke_prefix(
prefix=pki_issue_response["lease_id"],
)
logging.debug("revoke_prefix_response: %s" % revoke_prefix_response)
self.assertEqual(
first=bool(revoke_prefix_response),
second=True,
)
def test_revoke_force(self):
pki_issue_response = self.client.write(
path="pki/issue/my-role",
common_name="test.hvac.com",
)
# Force revoke the lease of our test cert that was just issued.
revoke_force_response = self.client.sys.revoke_force(
pki_issue_response["lease_id"]
)
logging.debug("revoke_force_response: %s" % revoke_force_response)
self.assertEqual(
first=bool(revoke_force_response),
second=True,
)
|