File: test_policy.py

package info (click to toggle)
python-hvac 2.3.0-5
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 2,800 kB
  • sloc: python: 29,360; makefile: 42; sh: 14
file content (116 lines) | stat: -rw-r--r-- 3,441 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
import json
import logging
from unittest import TestCase, skipIf

from parameterized import parameterized, param

from tests import utils
from tests.utils.hvac_integration_test_case import HvacIntegrationTestCase


@skipIf(
    utils.vault_version_lt("0.9.0"),
    "Policy class uses new parameters added >= Vault 0.9.0",
)
class TestPolicy(HvacIntegrationTestCase, TestCase):
    TEST_POLICY_NAME = "test-policy-policy"

    def tearDown(self):
        self.client.sys.delete_policy(
            name=self.TEST_POLICY_NAME,
        )
        super().tearDown()

    @parameterized.expand(
        [
            param(
                "success",
            ),
            param(
                "pretty print false",
                pretty_print=False,
            ),
        ]
    )
    @skipIf(
        utils.vault_version_eq("0.11.0"),
        "Policy parsing broken in Vault version 0.11.0",
    )
    def test_create_or_update_policy(self, label, pretty_print=True):
        test_policy = {
            "path": {
                "test-path": {
                    "capabilities": ["read"],
                },
            },
        }
        create_policy_response = self.client.sys.create_or_update_policy(
            name=self.TEST_POLICY_NAME,
            policy=test_policy,
            pretty_print=pretty_print,
        )
        logging.debug("create_policy_response: %s" % create_policy_response)
        self.assertEqual(
            first=bool(create_policy_response),
            second=True,
        )

        read_policy_response = self.client.sys.read_policy(
            name=self.TEST_POLICY_NAME,
        )
        logging.debug("read_policy_response: %s" % read_policy_response)
        self.assertDictEqual(
            d1=json.loads(read_policy_response["data"]["rules"]),
            d2=test_policy,
        )

    def test_policy_manipulation(self):
        self.assertIn(
            member="root",
            container=self.client.sys.list_policies()["data"]["policies"],
        )
        self.assertIsNone(self.client.get_policy("test"))
        policy, parsed_policy = self.prep_policy("test")
        self.assertIn(
            member="test",
            container=self.client.sys.list_policies()["data"]["policies"],
        )
        self.assertEqual(policy, self.client.sys.read_policy("test")["data"]["rules"])
        self.assertEqual(parsed_policy, self.client.get_policy("test", parse=True))

        self.client.sys.delete_policy(
            name="test",
        )
        self.assertNotIn(
            member="test",
            container=self.client.sys.list_policies()["data"]["policies"],
        )

    def test_json_policy_manipulation(self):
        self.assertIn(
            member="root",
            container=self.client.sys.list_policies()["data"]["policies"],
        )

        policy = """
            path "sys" {
                policy = "deny"
            }
            path "secret" {
                policy = "write"
            }
        """
        self.client.sys.create_or_update_policy(
            name="test",
            policy=policy,
        )
        self.assertIn(
            member="test",
            container=self.client.sys.list_policies()["data"]["policies"],
        )

        self.client.sys.delete_policy("test")
        self.assertNotIn(
            member="test",
            container=self.client.sys.list_policies()["data"]["policies"],
        )