File: token.py

package info (click to toggle)
python-consul 1.6.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 484 kB
  • sloc: python: 2,858; makefile: 197
file content (123 lines) | stat: -rw-r--r-- 4,831 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
from __future__ import annotations

import json
import typing

from consul.callback import CB

if typing.TYPE_CHECKING:
    import builtins


class Token:
    def __init__(self, agent) -> None:
        self.agent = agent

    def list(self, token: str | None = None):
        """
        Lists all the active ACL tokens. This is a privileged endpoint, and
        requires a management token. *token* will override this client's
        default token.
        Requires a token with acl:read capability. ACLPermissionDenied raised otherwise
        """
        headers = self.agent.prepare_headers(token)
        return self.agent.http.get(CB.json(), "/v1/acl/tokens", headers=headers)

    def read(self, accessor_id: str, token: str | None = None):
        """
        Returns the token information for *accessor_id*. Requires a token with acl:read capability.
        :param accessor_id: The accessor ID of the token to read
        :param token: token with acl:read capability
        :return: selected token information
        """
        headers = self.agent.prepare_headers(token)
        return self.agent.http.get(CB.json(), f"/v1/acl/token/{accessor_id}", headers=headers)

    def delete(self, accessor_id: str, token: str | None = None):
        """
        Deletes the token with *accessor_id*. This is a privileged endpoint, and requires a token with acl:write.
        :param accessor_id: The accessor ID of the token to delete
        :param token: token with acl:write capability
        :return: True if the token was deleted
        """
        headers = self.agent.prepare_headers(token)
        return self.agent.http.delete(CB.boolean(), f"/v1/acl/token/{accessor_id}", headers=headers)

    def clone(self, accessor_id: str, token: str | None = None, description: str = ""):
        """
        Clones the token identified by *accessor_id*. This is a privileged endpoint, and requires a token with acl:write.
        :param accessor_id: The accessor ID of the token to clone
        :param token: token with acl:write capability
        :param description: Optional new token description
        :return: The cloned token information
        """

        json_data = {"Description": description}
        headers = self.agent.prepare_headers(token)
        return self.agent.http.put(
            CB.json(),
            f"/v1/acl/token/{accessor_id}/clone",
            headers=headers,
            data=json.dumps(json_data),
        )

    def create(
        self,
        token: str | None = None,
        accessor_id: str | None = None,
        secret_id: str | None = None,
        policies_id: builtins.list[str] | None = None,
        description: str = "",
    ):
        """
        Create a token (optionally identified by *secret_id* and *accessor_id*).
        This is a privileged endpoint, and requires a token with acl:write.
        :param token: token with acl:write capability
        :param accessor_id: The accessor ID of the token to create
        :param secret_id: The secret ID of the token to create
        :param description: Optional new token description
        :param policies_id: Optional list of policies id
        :return: The cloned token information
        """

        json_data: dict[str, typing.Any] = {}
        if accessor_id:
            json_data["AccessorID"] = accessor_id
        if secret_id:
            json_data["SecretID"] = secret_id
        if description:
            json_data["Description"] = description
        if policies_id:
            json_data["Policies"] = [{"ID": policy} for policy in policies_id]

        headers = self.agent.prepare_headers(token)
        return self.agent.http.put(
            CB.json(),
            "/v1/acl/token",
            headers=headers,
            data=json.dumps(json_data),
        )

    def update(self, accessor_id: str, token: str | None = None, secret_id: str | None = None, description: str = ""):
        """
        Update a token (optionally identified by *secret_id* and *accessor_id*).
        This is a privileged endpoint, and requires a token with acl:write.
        :param accessor_id: The accessor ID of the token to update
        :param token: token with acl:write capability
        :param secret_id: Optional secret ID of the token to update
        :param description: Optional new token description
        :return: The updated token information
        """

        json_data = {"AccessorID": accessor_id}
        if secret_id:
            json_data["SecretID"] = secret_id
        if description:
            json_data["Description"] = description
        headers = self.agent.prepare_headers(token)
        return self.agent.http.put(
            CB.json(),
            f"/v1/acl/token/{accessor_id}",
            headers=headers,
            data=json.dumps(json_data),
        )