1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457
|
#!/usr/bin/env python
"""JWT/OIDC methods module."""
from hvac import utils
from hvac.api.vault_api_base import VaultApiBase
class JWT(VaultApiBase):
"""JWT auth method which can be used to authenticate with Vault by providing a JWT.
The OIDC method allows authentication via a configured OIDC provider using the user's web browser.
This method may be initiated from the Vault UI or the command line. Alternatively, a JWT can be provided directly.
The JWT is cryptographically verified using locally-provided keys, or, if configured, an OIDC Discovery service can
be used to fetch the appropriate keys. The choice of method is configured per role.
Reference: https://www.vaultproject.io/api/auth/jwt
"""
DEFAULT_PATH = "jwt"
def resolve_path(self, path):
"""Return the class's default path if no explicit path is specified.
:param path: The "path" the method/backend was mounted on.
:type path: str | unicode
:return: The default path for this auth method if no explicit path is specified.
:rtype: str
"""
return path if path is not None else self.DEFAULT_PATH
def configure(
self,
oidc_discovery_url=None,
oidc_discovery_ca_pem=None,
oidc_client_id=None,
oidc_client_secret=None,
oidc_response_mode=None,
oidc_response_types=None,
jwks_url=None,
jwks_ca_pem=None,
jwt_validation_pubkeys=None,
bound_issuer=None,
jwt_supported_algs=None,
default_role=None,
provider_config=None,
path=None,
namespace_in_state=None,
):
"""Configure the validation information to be used globally across all roles.
One (and only one) of oidc_discovery_url and jwt_validation_pubkeys must be set.
Supported methods:
POST: /auth/{path}/config.
:param oidc_discovery_url: The OIDC Discovery URL, without any .well-known component (base path). Cannot be
used with "jwks_url" or "jwt_validation_pubkeys".
:type oidc_discovery_url: str | unicode
:param oidc_discovery_ca_pem: The CA certificate or chain of certificates, in PEM format, to use to validate
connections to the OIDC Discovery URL. If not set, system certificates are used.
:type oidc_discovery_ca_pem: str | unicode
:param oidc_client_id: The OAuth Client ID from the provider for OIDC roles.
:type oidc_client_id: str | unicode
:param oidc_client_secret: The OAuth Client Secret from the provider for OIDC roles.
:type oidc_client_secret: str | unicode
:param oidc_response_mode: The response mode to be used in the OAuth2 request. Allowed values are "query" and
form_post". Defaults to "query".
:type oidc_response_mode: str | unicode
:param oidc_response_types: The response types to request. Allowed values are "code" and "id_token". Defaults
to "code". Note: "id_token" may only be used if "oidc_response_mode" is set to "form_post".
:type oidc_response_types: str | unicode
:param jwks_url: JWKS URL to use to authenticate signatures. Cannot be used with "oidc_discovery_url" or
"jwt_validation_pubkeys".
:type jwks_url: str | unicode
:param jwks_ca_pem: The CA certificate or chain of certificates, in PEM format, to use to validate connections
to the JWKS URL. If not set, system certificates are used.
:type jwks_ca_pem: str | unicode
:param jwt_validation_pubkeys: A list of PEM-encoded public keys to use to authenticate signatures locally.
Cannot be used with "jwks_url" or "oidc_discovery_url".
:type jwt_validation_pubkeys: str | unicode
:param bound_issuer: in a JWT.
:type bound_issuer: str | unicode
:param jwt_supported_algs: A list of supported signing algorithms. Defaults to [RS256].
:type jwt_supported_algs: str | unicode
:param default_role: The default role to use if none is provided during login.
:type default_role: str | unicode
:param provider_config: TypeError
:type provider_config: map
:param path: The "path" the method/backend was mounted on.
:type path: str | unicode
:param namespace_in_state: With this setting, the allowed redirect URL(s) in Vault and on the provider side
should not contain a namespace query parameter.
:type namespace_in_state: bool
:return: The response of the configure request.
:rtype: requests.Response
"""
params = utils.remove_nones(
{
"oidc_discovery_url": oidc_discovery_url,
"oidc_discovery_ca_pem": oidc_discovery_ca_pem,
"oidc_client_id": oidc_client_id,
"oidc_client_secret": oidc_client_secret,
"oidc_response_mode": oidc_response_mode,
"oidc_response_types": oidc_response_types,
"jwks_url": jwks_url,
"jwks_ca_pem": jwks_ca_pem,
"jwt_validation_pubkeys": jwt_validation_pubkeys,
"bound_issuer": bound_issuer,
"jwt_supported_algs": jwt_supported_algs,
"default_role": default_role,
"provider_config": provider_config,
"namespace_in_state": namespace_in_state,
}
)
api_path = utils.format_url(
"/v1/auth/{path}/config",
path=self.resolve_path(path),
)
return self._adapter.post(
url=api_path,
json=params,
)
def read_config(self, path=None):
"""Read the previously configured config.
Supported methods:
GET: /auth/{path}/config.
:return: The response of the read_config request.
:rtype: dict
"""
api_path = utils.format_url(
"/v1/auth/{path}/config",
path=self.resolve_path(path),
)
return self._adapter.get(
url=api_path,
)
def create_role(
self,
name,
user_claim,
allowed_redirect_uris,
role_type="jwt",
bound_audiences=None,
clock_skew_leeway=None,
expiration_leeway=None,
not_before_leeway=None,
bound_subject=None,
bound_claims=None,
groups_claim=None,
claim_mappings=None,
oidc_scopes=None,
bound_claims_type="string",
verbose_oidc_logging=False,
token_ttl=None,
token_max_ttl=None,
token_policies=None,
token_bound_cidrs=None,
token_explicit_max_ttl=None,
token_no_default_policy=None,
token_num_uses=None,
token_period=None,
token_type=None,
path=None,
user_claim_json_pointer=None,
):
"""Register a role in the JWT method.
Role types have specific entities that can perform login operations against this endpoint. Constraints
specific to the role type must be set on the role. These are applied to the authenticated entities
attempting to login. At least one of the bound values must be set.
Supported methods:
POST: /auth/{path}/role/:name.
:param name: Name of the role.
:type name: str | unicode
:param role_type: Type of role, either "oidc" or "jwt" (default).
:type role_type: str | unicode
:param bound_audiences: List of aud claims to match against. Any match is sufficient.
Required for "jwt" roles, optional for "oidc" roles.
:type bound_audiences: list
:param user_claim: The claim to use to uniquely identify the user; this will be used as the name for the
Identity entity alias created due to a successful login. The interpretation of the user claim
is configured with ``user_claim_json_pointer``. If set to ``True``, ``user_claim`` supports JSON pointer syntax
for referencing a claim. The claim value must be a string.
:type user_claim: str | unicode
:param clock_skew_leeway: Only applicable with "jwt" roles.
:type clock_skew_leeway: int
:param expiration_leeway: Only applicable with "jwt" roles.
:type expiration_leeway: int
:param not_before_leeway: Only applicable with "jwt" roles.
:type not_before_leeway: int
:param bound_subject: If set, requires that the sub claim matches this value.
:type bound_subject: str | unicode
:param bound_claims: If set, a dict of claims (keys) to match against respective claim values (values).
The expected value may be a single string or a list of strings. The interpretation of the bound claim
values is configured with bound_claims_type. Keys support JSON pointer syntax for referencing claims.
:type bound_claims: dict
:param groups_claim: The claim to use to uniquely identify the set of groups to which the user belongs; this
will be used as the names for the Identity group aliases created due to a successful login. The claim value
must be a list of strings. Supports JSON pointer syntax for referencing claims.
:type groups_claim: str | unicode
:param claim_mappings: If set, a map of claims (keys) to be copied to specified metadata fields (values). Keys
support JSON pointer syntax for referencing claims.
:type claim_mappings: map
:param oidc_scopes: If set, a list of OIDC scopes to be used with an OIDC role.
The standard scope "openid" is automatically included and need not be specified.
:type oidc_scopes: list
:param allowed_redirect_uris: The list of allowed values for redirect_uri
during OIDC logins.
:type allowed_redirect_uris: list
:param bound_claims_type: Configures the interpretation of the bound_claims values. If "string" (the default),
the values will treated as string literals and must match exactly. If set to "glob", the values will be
interpreted as globs, with * matching any number of characters.
:type bound_claims_type: str | unicode
:param verbose_oidc_logging: Log received OIDC tokens and claims when debug-level
logging is active. Not recommended in production since sensitive information may be present
in OIDC responses.
:type verbose_oidc_logging: bool
:param token_ttl: The incremental lifetime for generated tokens. This current value of this will be referenced
at renewal time.
:type token_ttl: int | str
:param token_max_ttl: The maximum lifetime for generated tokens. This current value of this will be referenced
at renewal time.
:type token_max_ttl: int | str
:param token_policies: List of policies to encode onto generated tokens. Depending on the auth method, this
list may be supplemented by user/group/other values.
:type token_policies: list[str]
:param token_bound_cidrs: List of CIDR blocks; if set, specifies blocks of IP addresses which can authenticate
successfully, and ties the resulting token to these blocks as well.
:type token_bound_cidrs: list[str]
:param token_explicit_max_ttl: If set, will encode an explicit max TTL onto the token. This is a hard cap
even if token_ttl and token_max_ttl would otherwise allow a renewal.
:type token_explicit_max_ttl: int | str
:param token_no_default_policy: If set, the default policy will not be set on generated tokens; otherwise it
will be added to the policies set in token_policies.
:type token_no_default_policy: bool
:param token_num_uses: The maximum number of times a generated token may be used (within its lifetime); 0 means
unlimited. If you require the token to have the ability to create child tokens, you will need to set this
value to 0.
:type token_num_uses: str | unicode
:param token_period: The period, if any, to set on the token.
:type token_period: int | str
:param token_type: The type of token that should be generated. Can be service, batch, or default.
:type token_type: str
:param path: The "path" the method/backend was mounted on.
:type path: str | unicode
:param user_claim_json_pointer: Specifies if the ``user_claim`` value uses JSON pointer syntax for referencing claims.
By default, the ``user_claim`` value will not use JSON pointer.
:type user_claim_json_pointer: bool
:return: The response of the create_role request.
:rtype: dict
"""
params = utils.remove_nones(
{
"name": name,
"role_type": role_type,
"bound_audiences": bound_audiences,
"user_claim": user_claim,
"clock_skew_leeway": clock_skew_leeway,
"expiration_leeway": expiration_leeway,
"not_before_leeway": not_before_leeway,
"bound_subject": bound_subject,
"bound_claims": bound_claims,
"groups_claim": groups_claim,
"claim_mappings": claim_mappings,
"oidc_scopes": oidc_scopes,
"allowed_redirect_uris": allowed_redirect_uris,
"bound_claims_type": bound_claims_type,
"verbose_oidc_logging": verbose_oidc_logging,
"token_ttl": token_ttl,
"token_max_ttl": token_max_ttl,
"token_policies": token_policies,
"token_bound_cidrs": token_bound_cidrs,
"token_explicit_max_ttl": token_explicit_max_ttl,
"token_no_default_policy": token_no_default_policy,
"token_num_uses": token_num_uses,
"token_period": token_period,
"token_type": token_type,
"user_claim_json_pointer": user_claim_json_pointer,
}
)
api_path = utils.format_url(
"/v1/auth/{path}/role/{name}",
path=self.resolve_path(path),
name=name,
)
return self._adapter.post(
url=api_path,
json=params,
)
def read_role(self, name, path=None):
"""Read the previously registered role configuration.
Supported methods:
GET: /auth/{path}/role/:name.
:param name: Name of the role.
:type name: str | unicode
:param path: The "path" the method/backend was mounted on.
:type path: str | unicode
:return: The response of the read_role request.
:rtype: dict
"""
api_path = utils.format_url(
"/v1/auth/{path}/role/{name}",
path=self.resolve_path(path),
name=name,
)
return self._adapter.get(
url=api_path,
)
def list_roles(self, path=None):
"""List all the roles that are registered with the plugin.
Supported methods:
LIST: /auth/{path}/role.
:param path: The "path" the method/backend was mounted on.
:type path: str | unicode
:return: The response of the list_roles request.
:rtype: dict
"""
api_path = utils.format_url(
"/v1/auth/{path}/role",
path=self.resolve_path(path),
)
return self._adapter.list(
url=api_path,
)
def delete_role(self, name, path=None):
"""Delete the previously registered role.
Supported methods:
DELETE: /auth/{path}/role/:name.
:param name: Name of the role.
:type name: str | unicode
:param path: The "path" the method/backend was mounted on.
:type path: str | unicode
:return: The response of the delete_role request.
:rtype: requests.Response
"""
api_path = utils.format_url(
"/v1/auth/{path}/role/{name}",
path=self.resolve_path(path),
name=name,
)
return self._adapter.delete(
url=api_path,
)
def oidc_authorization_url_request(self, role, redirect_uri, path=None):
"""Obtain an authorization URL from Vault to start an OIDC login flow.
Supported methods:
POST: /auth/{path}/auth_url.
:param role: not provided.
:type role: str | unicode
:param redirect_uri: more information.
:type redirect_uri: str | unicode
:param path: The "path" the method/backend was mounted on.
:type path: str | unicode
:return: The response of the _authorization_url_request request.
:rtype: requests.Response
"""
params = {
"role": role,
"redirect_uri": redirect_uri,
}
api_path = utils.format_url(
"/v1/auth/{path}/oidc/auth_url",
path=self.resolve_path(path),
)
return self._adapter.post(
url=api_path,
json=params,
)
def oidc_callback(self, state, nonce, code, path=None):
"""Exchange an authorization code for an OIDC ID Token.
The ID token will be further validated against any bound claims, and if valid a Vault token will be returned.
Supported methods:
GET: /auth/{path}/callback.
:param state: Opaque state ID that is part of the Authorization URL and will
be included in the the redirect following successful authentication on the provider.
:type state: str | unicode
:param nonce: Opaque nonce that is part of the Authorization URL and will
be included in the the redirect following successful authentication on the provider.
:type nonce: str | unicode
:param code: Provider-generated authorization code that Vault will exchange for
an ID token.
:type code: str | unicode
:param path: The "path" the method/backend was mounted on.
:type path: str | unicode
:return: The response of the _callback request.
:rtype: requests.Response
"""
params = {
"state": state,
"nonce": nonce,
"code": code,
}
api_path = utils.format_url(
"/v1/auth/{path}/oidc/callback?state={state}&nonce={nonce}&code={code}",
path=self.resolve_path(path),
state=state,
nonce=nonce,
code=code,
)
return self._adapter.get(
url=api_path,
json=params,
)
def jwt_login(self, role, jwt, use_token=True, path=None):
"""Fetch a token.
This endpoint takes a signed JSON Web Token (JWT) and a role name for some entity.
It verifies the JWT signature to authenticate that entity and then authorizes the
entity for the given role.
Supported methods:
POST: /auth/{path}/login.
:param role: not provided.
:type role: str | unicode
:param jwt: Signed JSON Web Token (JWT).
:type jwt: str | unicode
:param path: The "path" the method/backend was mounted on.
:type path: str | unicode
:return: The response of the jwt_login request.
:rtype: requests.Response
"""
params = {
"role": role,
"jwt": jwt,
}
api_path = utils.format_url(
"/v1/auth/{path}/login",
path=self.resolve_path(path),
)
return self._adapter.login(
url=api_path,
use_token=use_token,
json=params,
)
|