File: _vault_signer.py

package info (click to toggle)
python-securesystemslib 1.3.0-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 3,316 kB
  • sloc: python: 5,319; sh: 38; makefile: 5
file content (137 lines) | stat: -rw-r--r-- 4,278 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
"""Signer implementation for HashiCorp Vault (Transit secrets engine)"""

from __future__ import annotations

from base64 import b64decode, b64encode
from urllib import parse

from securesystemslib.exceptions import UnsupportedLibraryError
from securesystemslib.signer._key import Key, SSlibKey
from securesystemslib.signer._signer import SecretsHandler, Signature, Signer

VAULT_IMPORT_ERROR = None
try:
    import hvac
    from cryptography.hazmat.primitives.asymmetric.ed25519 import (
        Ed25519PublicKey,
    )

except ImportError:
    VAULT_IMPORT_ERROR = "Signing with HashiCorp Vault requires hvac and cryptography."


class VaultSigner(Signer):
    """Signer for HashiCorp Vault Transit secrets engine

    The signer uses "ambient" credentials to connect to vault, most notably
    the environment variables ``VAULT_ADDR`` and ``VAULT_TOKEN`` must be set:
    https://developer.hashicorp.com/vault/docs/commands#environment-variables

    Priv key uri format is: ``hv:<KEY NAME>/<KEY VERSION>``.

    Arguments:
        hv_key_name: Name of vault key used for signing.
        public_key: Related public key instance.
        hv_key_version: Version of vault key used for signing.

    Raises:
        UnsupportedLibraryError: hvac or cryptography are not installed.
    """

    SCHEME = "hv"

    def __init__(self, hv_key_name: str, public_key: SSlibKey, hv_key_version: int):
        if VAULT_IMPORT_ERROR:
            raise UnsupportedLibraryError(VAULT_IMPORT_ERROR)

        self.hv_key_name = hv_key_name
        self._public_key = public_key
        self.hv_key_version = hv_key_version

        # Client caches ambient settings in __init__. This means settings are
        # stable for subsequent calls to sign, also if the environment changes.
        self._client = hvac.Client()

    def sign(self, payload: bytes) -> Signature:
        """Signs payload with HashiCorp Vault Transit secrets engine.

        Arguments:
            payload: bytes to be signed.

        Raises:
            Various errors from hvac.

        Returns:
            Signature.
        """
        resp = self._client.secrets.transit.sign_data(
            self.hv_key_name,
            hash_input=b64encode(payload).decode(),
            key_version=self.hv_key_version,
        )

        sig_b64 = resp["data"]["signature"].split(":")[2]
        sig = b64decode(sig_b64).hex()

        return Signature(self.public_key.keyid, sig)

    @property
    def public_key(self) -> SSlibKey:
        return self._public_key

    @classmethod
    def from_priv_key_uri(
        cls,
        priv_key_uri: str,
        public_key: Key,
        secrets_handler: SecretsHandler | None = None,
    ) -> VaultSigner:
        if not isinstance(public_key, SSlibKey):
            raise ValueError(f"Expected SSlibKey for {priv_key_uri}")

        uri = parse.urlparse(priv_key_uri)

        if uri.scheme != cls.SCHEME:
            raise ValueError(f"VaultSigner does not support {priv_key_uri}")

        name, version = uri.path.split("/")

        return cls(name, public_key, int(version))

    @classmethod
    def import_(cls, hv_key_name: str) -> tuple[str, SSlibKey]:
        """Load key and signer details from HashiCorp Vault.

        If multiple keys exist in the vault under the passed name, only the
        newest key is returned. Supported key type is: ed25519

        See class documentation for details about settings and uri format.

        Arguments:
            hv_key_name: Name of vault key to import.

        Raises:
            UnsupportedLibraryError: hvac or cryptography are not installed.
            Various errors from hvac.

        Returns:
            Private key uri and public key.

        """
        if VAULT_IMPORT_ERROR:
            raise UnsupportedLibraryError(VAULT_IMPORT_ERROR)

        client = hvac.Client()
        resp = client.secrets.transit.read_key(hv_key_name)

        # Pick key with highest version number
        version, key_info = sorted(resp["data"]["keys"].items())[-1]

        crypto_key = Ed25519PublicKey.from_public_bytes(
            b64decode(key_info["public_key"])
        )

        key = SSlibKey.from_crypto(crypto_key)
        uri = f"{VaultSigner.SCHEME}:{hv_key_name}/{version}"

        return uri, key