1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123
|
from __future__ import annotations
import json
import typing
from consul.callback import CB
if typing.TYPE_CHECKING:
import builtins
class Token:
def __init__(self, agent) -> None:
self.agent = agent
def list(self, token: str | None = None):
"""
Lists all the active ACL tokens. This is a privileged endpoint, and
requires a management token. *token* will override this client's
default token.
Requires a token with acl:read capability. ACLPermissionDenied raised otherwise
"""
headers = self.agent.prepare_headers(token)
return self.agent.http.get(CB.json(), "/v1/acl/tokens", headers=headers)
def read(self, accessor_id: str, token: str | None = None):
"""
Returns the token information for *accessor_id*. Requires a token with acl:read capability.
:param accessor_id: The accessor ID of the token to read
:param token: token with acl:read capability
:return: selected token information
"""
headers = self.agent.prepare_headers(token)
return self.agent.http.get(CB.json(), f"/v1/acl/token/{accessor_id}", headers=headers)
def delete(self, accessor_id: str, token: str | None = None):
"""
Deletes the token with *accessor_id*. This is a privileged endpoint, and requires a token with acl:write.
:param accessor_id: The accessor ID of the token to delete
:param token: token with acl:write capability
:return: True if the token was deleted
"""
headers = self.agent.prepare_headers(token)
return self.agent.http.delete(CB.boolean(), f"/v1/acl/token/{accessor_id}", headers=headers)
def clone(self, accessor_id: str, token: str | None = None, description: str = ""):
"""
Clones the token identified by *accessor_id*. This is a privileged endpoint, and requires a token with acl:write.
:param accessor_id: The accessor ID of the token to clone
:param token: token with acl:write capability
:param description: Optional new token description
:return: The cloned token information
"""
json_data = {"Description": description}
headers = self.agent.prepare_headers(token)
return self.agent.http.put(
CB.json(),
f"/v1/acl/token/{accessor_id}/clone",
headers=headers,
data=json.dumps(json_data),
)
def create(
self,
token: str | None = None,
accessor_id: str | None = None,
secret_id: str | None = None,
policies_id: builtins.list[str] | None = None,
description: str = "",
):
"""
Create a token (optionally identified by *secret_id* and *accessor_id*).
This is a privileged endpoint, and requires a token with acl:write.
:param token: token with acl:write capability
:param accessor_id: The accessor ID of the token to create
:param secret_id: The secret ID of the token to create
:param description: Optional new token description
:param policies_id: Optional list of policies id
:return: The cloned token information
"""
json_data: dict[str, typing.Any] = {}
if accessor_id:
json_data["AccessorID"] = accessor_id
if secret_id:
json_data["SecretID"] = secret_id
if description:
json_data["Description"] = description
if policies_id:
json_data["Policies"] = [{"ID": policy} for policy in policies_id]
headers = self.agent.prepare_headers(token)
return self.agent.http.put(
CB.json(),
"/v1/acl/token",
headers=headers,
data=json.dumps(json_data),
)
def update(self, accessor_id: str, token: str | None = None, secret_id: str | None = None, description: str = ""):
"""
Update a token (optionally identified by *secret_id* and *accessor_id*).
This is a privileged endpoint, and requires a token with acl:write.
:param accessor_id: The accessor ID of the token to update
:param token: token with acl:write capability
:param secret_id: Optional secret ID of the token to update
:param description: Optional new token description
:return: The updated token information
"""
json_data = {"AccessorID": accessor_id}
if secret_id:
json_data["SecretID"] = secret_id
if description:
json_data["Description"] = description
headers = self.agent.prepare_headers(token)
return self.agent.http.put(
CB.json(),
f"/v1/acl/token/{accessor_id}",
headers=headers,
data=json.dumps(json_data),
)
|