File: test_audit.py

package info (click to toggle)
python-hvac 2.3.0-5
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 2,800 kB
  • sloc: python: 29,360; makefile: 42; sh: 14
file content (86 lines) | stat: -rw-r--r-- 2,705 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
import logging
from unittest import TestCase

from parameterized import parameterized, param

from hvac import exceptions
from tests.utils.hvac_integration_test_case import HvacIntegrationTestCase


class TestAudit(HvacIntegrationTestCase, TestCase):
    TEST_AUDIT_DEVICE_PATH = "test-tempfile"

    def tearDown(self):
        self.client.sys.disable_audit_device(path=self.TEST_AUDIT_DEVICE_PATH)

    def test_audit_backend_manipulation(self):
        options = {"path": "/tmp/vault.audit.log"}

        self.client.sys.enable_audit_device(
            device_type="file",
            options=options,
            path=self.TEST_AUDIT_DEVICE_PATH,
        )
        self.assertIn(
            member="%s/" % self.TEST_AUDIT_DEVICE_PATH,
            container=self.client.sys.list_enabled_audit_devices()["data"],
        )

        self.client.sys.disable_audit_device(
            path=self.TEST_AUDIT_DEVICE_PATH,
        )
        self.assertNotIn(
            member="%s/" % self.TEST_AUDIT_DEVICE_PATH,
            container=self.client.sys.list_enabled_audit_devices()["data"],
        )

    @parameterized.expand(
        [
            param(
                "hash returned",
            ),
            param(
                "audit backend not enabled",
                enable_first=False,
                raises=exceptions.InvalidRequest,
                exception_message="unknown audit backend",
            ),
        ]
    )
    def test_audit_hash(
        self,
        label,
        enable_first=True,
        test_input="hvac-rox",
        raises=None,
        exception_message="",
    ):
        if enable_first:
            options = {"path": "/tmp/vault.audit.log"}
            self.client.sys.enable_audit_device(
                device_type="file",
                options=options,
                path=self.TEST_AUDIT_DEVICE_PATH,
            )

        if raises:
            with self.assertRaises(raises) as cm:
                self.client.sys.calculate_hash(
                    path=self.TEST_AUDIT_DEVICE_PATH,
                    input_to_hash=test_input,
                )
            if exception_message is not None:
                self.assertIn(
                    member=exception_message,
                    container=str(cm.exception),
                )
        else:
            audit_hash_response = self.client.sys.calculate_hash(
                path=self.TEST_AUDIT_DEVICE_PATH,
                input_to_hash=test_input,
            )
            logging.debug("audit_hash_response: %s" % audit_hash_response)
            self.assertIn(
                member="hmac-sha256:",
                container=audit_hash_response["data"]["hash"],
            )