File: test_key.py

package info (click to toggle)
python-hvac 2.3.0-5
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 2,800 kB
  • sloc: python: 29,360; makefile: 42; sh: 14
file content (126 lines) | stat: -rw-r--r-- 4,757 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
import logging
from unittest import TestCase

from hvac import exceptions
from tests import utils
from tests.utils.hvac_integration_test_case import HvacIntegrationTestCase


class TestKey(HvacIntegrationTestCase, TestCase):
    def test_start_generate_root_with_completion(self):
        test_otp = utils.get_generate_root_otp()

        self.assertFalse(self.client.sys.read_root_generation_progress()["started"])
        start_generate_root_response = self.client.sys.start_root_token_generation(
            otp=test_otp,
        )
        logging.debug("generate_root_response: %s" % start_generate_root_response)
        self.assertTrue(self.client.sys.read_root_generation_progress()["started"])

        nonce = start_generate_root_response["nonce"]

        last_generate_root_response = {}
        for key in self.manager.keys[0:3]:
            last_generate_root_response = self.client.sys.generate_root(
                key=key,
                nonce=nonce,
            )
        logging.debug("last_generate_root_response: %s" % last_generate_root_response)
        self.assertFalse(self.client.sys.read_root_generation_progress()["started"])

        new_root_token = utils.decode_generated_root_token(
            encoded_token=last_generate_root_response["encoded_root_token"],
            otp=test_otp,
            url=self.client.url,
        )
        logging.debug("new_root_token: %s" % new_root_token)
        token_lookup_resp = self.client.lookup_token(token=new_root_token)
        logging.debug("token_lookup_resp: %s" % token_lookup_resp)

        # Assert our new root token is properly formed and authenticated
        self.client.token = new_root_token
        if self.client.is_authenticated():
            self.manager.root_token = new_root_token
        else:
            # If our new token was unable to authenticate, set the test client's token back to the original value
            self.client.token = self.manager.root_token
            self.fail("Unable to authenticate with the newly generated root token.")

    def test_start_generate_root_then_cancel(self):
        test_otp = utils.get_generate_root_otp()

        self.assertFalse(self.client.sys.read_root_generation_progress()["started"])
        self.client.sys.start_root_token_generation(
            otp=test_otp,
        )
        self.assertTrue(self.client.sys.read_root_generation_progress()["started"])

        self.client.sys.cancel_root_generation()
        self.assertFalse(self.client.sys.read_root_generation_progress()["started"])

    def test_rotate(self):
        status = self.client.key_status

        self.client.sys.rotate_encryption_key()

        self.assertGreater(
            self.client.key_status["term"],
            status["term"],
        )

    def test_rekey_multi(self):
        cls = type(self)

        self.assertFalse(self.client.rekey_status["started"])

        self.client.sys.start_rekey()
        self.assertTrue(self.client.rekey_status["started"])

        self.client.sys.cancel_rekey()
        self.assertFalse(self.client.rekey_status["started"])

        result = self.client.sys.start_rekey()

        keys = cls.manager.keys

        result = self.client.sys.rekey_multi(keys, nonce=result["nonce"])
        self.assertTrue(result["complete"])

        cls.manager.keys = result["keys"]
        cls.manager.unseal()

    def test_rekey_verify_multi(self):
        cls = type(self)

        # Start rekey process with verification required and use operator keys
        self.assertFalse(self.client.sys.read_rekey_progress()["started"])
        result = self.client.sys.start_rekey(require_verification=True)
        result = self.client.sys.rekey_multi(cls.manager.keys, nonce=result["nonce"])
        self.assertTrue(result["complete"])
        cls.manager.keys = result["keys"]

        # get the initial verification nonce
        result = self.client.sys.read_rekey_verify_progress()
        first_nonce = result["nonce"]

        # now cancel the process and verify we have a new verification nonce
        result = self.client.sys.cancel_rekey_verify()
        second_nonce = result["nonce"]
        self.assertNotEqual(first_nonce, second_nonce)

        # finally complete the verification process
        result = self.client.sys.rekey_verify_multi(
            cls.manager.keys, nonce=result["nonce"]
        )
        self.assertTrue(result["complete"])

        # now we unseal
        cls.manager.unseal()

    def test_get_backed_up_keys(self):
        with self.assertRaises(exceptions.InvalidRequest) as cm:
            self.client.sys.read_backup_keys()
            self.assertEqual(
                first="no backed-up keys found",
                second=str(cm.exception),
            )