1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425
|
import json
from urllib.parse import urlparse
from django.contrib.auth import logout
from django.contrib.auth.models import AnonymousUser
from django.http import HttpResponse, JsonResponse
from django.urls import reverse
from django.utils.decorators import method_decorator
from django.views.decorators.csrf import csrf_exempt
from django.views.generic import FormView, View
from jwcrypto import jwt
from jwcrypto.common import JWException
from jwcrypto.jws import InvalidJWSObject
from jwcrypto.jwt import JWTExpired
from oauthlib.common import add_params_to_uri
from ..compat import login_not_required
from ..exceptions import (
ClientIdMissmatch,
InvalidIDTokenError,
InvalidOIDCClientError,
InvalidOIDCRedirectURIError,
LogoutDenied,
OIDCError,
)
from ..forms import ConfirmLogoutForm
from ..http import OAuth2ResponseRedirect
from ..models import (
AbstractGrant,
get_access_token_model,
get_application_model,
get_id_token_model,
get_refresh_token_model,
)
from ..settings import oauth2_settings
from ..utils import jwk_from_pem
from .mixins import OAuthLibMixin, OIDCLogoutOnlyMixin, OIDCOnlyMixin
Application = get_application_model()
@method_decorator(login_not_required, name="dispatch")
class ConnectDiscoveryInfoView(OIDCOnlyMixin, View):
"""
View used to show oidc provider configuration information per
`OpenID Provider Metadata <https://openid.net/specs/openid-connect-discovery-1_0.html#ProviderMetadata>`_
"""
def get(self, request, *args, **kwargs):
issuer_url = oauth2_settings.OIDC_ISS_ENDPOINT
if not issuer_url:
issuer_url = oauth2_settings.oidc_issuer(request)
authorization_endpoint = request.build_absolute_uri(reverse("oauth2_provider:authorize"))
token_endpoint = request.build_absolute_uri(reverse("oauth2_provider:token"))
userinfo_endpoint = oauth2_settings.OIDC_USERINFO_ENDPOINT or request.build_absolute_uri(
reverse("oauth2_provider:user-info")
)
jwks_uri = request.build_absolute_uri(reverse("oauth2_provider:jwks-info"))
if oauth2_settings.OIDC_RP_INITIATED_LOGOUT_ENABLED:
end_session_endpoint = request.build_absolute_uri(
reverse("oauth2_provider:rp-initiated-logout")
)
else:
parsed_url = urlparse(oauth2_settings.OIDC_ISS_ENDPOINT)
host = parsed_url.scheme + "://" + parsed_url.netloc
authorization_endpoint = "{}{}".format(host, reverse("oauth2_provider:authorize"))
token_endpoint = "{}{}".format(host, reverse("oauth2_provider:token"))
userinfo_endpoint = oauth2_settings.OIDC_USERINFO_ENDPOINT or "{}{}".format(
host, reverse("oauth2_provider:user-info")
)
jwks_uri = "{}{}".format(host, reverse("oauth2_provider:jwks-info"))
if oauth2_settings.OIDC_RP_INITIATED_LOGOUT_ENABLED:
end_session_endpoint = "{}{}".format(host, reverse("oauth2_provider:rp-initiated-logout"))
signing_algorithms = [Application.HS256_ALGORITHM]
if oauth2_settings.OIDC_RSA_PRIVATE_KEY:
signing_algorithms = [Application.RS256_ALGORITHM, Application.HS256_ALGORITHM]
validator_class = oauth2_settings.OAUTH2_VALIDATOR_CLASS
validator = validator_class()
oidc_claims = list(set(validator.get_discovery_claims(request)))
scopes_class = oauth2_settings.SCOPES_BACKEND_CLASS
scopes = scopes_class()
scopes_supported = [scope for scope in scopes.get_available_scopes()]
data = {
"issuer": issuer_url,
"authorization_endpoint": authorization_endpoint,
"token_endpoint": token_endpoint,
"userinfo_endpoint": userinfo_endpoint,
"jwks_uri": jwks_uri,
"scopes_supported": scopes_supported,
"response_types_supported": oauth2_settings.OIDC_RESPONSE_TYPES_SUPPORTED,
"subject_types_supported": oauth2_settings.OIDC_SUBJECT_TYPES_SUPPORTED,
"id_token_signing_alg_values_supported": signing_algorithms,
"token_endpoint_auth_methods_supported": (
oauth2_settings.OIDC_TOKEN_ENDPOINT_AUTH_METHODS_SUPPORTED
),
"code_challenge_methods_supported": [key for key, _ in AbstractGrant.CODE_CHALLENGE_METHODS],
"claims_supported": oidc_claims,
}
if oauth2_settings.OIDC_RP_INITIATED_LOGOUT_ENABLED:
data["end_session_endpoint"] = end_session_endpoint
response = JsonResponse(data)
response["Access-Control-Allow-Origin"] = "*"
return response
@method_decorator(login_not_required, name="dispatch")
class JwksInfoView(OIDCOnlyMixin, View):
"""
View used to show oidc json web key set document
"""
def get(self, request, *args, **kwargs):
keys = []
if oauth2_settings.OIDC_RSA_PRIVATE_KEY:
for pem in [
oauth2_settings.OIDC_RSA_PRIVATE_KEY,
*oauth2_settings.OIDC_RSA_PRIVATE_KEYS_INACTIVE,
]:
key = jwk_from_pem(pem)
data = {"alg": "RS256", "use": "sig", "kid": key.thumbprint()}
data.update(json.loads(key.export_public()))
keys.append(data)
response = JsonResponse({"keys": keys})
response["Access-Control-Allow-Origin"] = "*"
response["Cache-Control"] = (
"Cache-Control: public, "
+ f"max-age={oauth2_settings.OIDC_JWKS_MAX_AGE_SECONDS}, "
+ f"stale-while-revalidate={oauth2_settings.OIDC_JWKS_MAX_AGE_SECONDS}, "
+ f"stale-if-error={oauth2_settings.OIDC_JWKS_MAX_AGE_SECONDS}"
)
return response
@method_decorator(csrf_exempt, name="dispatch")
@method_decorator(login_not_required, name="dispatch")
class UserInfoView(OIDCOnlyMixin, OAuthLibMixin, View):
"""
View used to show Claims about the authenticated End-User
"""
def get(self, request, *args, **kwargs):
return self._create_userinfo_response(request)
def post(self, request, *args, **kwargs):
return self._create_userinfo_response(request)
def _create_userinfo_response(self, request):
url, headers, body, status = self.create_userinfo_response(request)
response = HttpResponse(content=body or "", status=status)
for k, v in headers.items():
response[k] = v
return response
def _load_id_token(token):
"""
Loads an IDToken given its string representation for use with RP-Initiated Logout.
A tuple (IDToken, claims) is returned. Depending on the configuration expired tokens may be loaded.
If loading failed (None, None) is returned.
"""
IDToken = get_id_token_model()
validator = oauth2_settings.OAUTH2_VALIDATOR_CLASS()
try:
key = validator._get_key_for_token(token)
except InvalidJWSObject:
# Failed to deserialize the key.
return None, None
# Could not identify key from the ID Token.
if not key:
return None, None
try:
if oauth2_settings.OIDC_RP_INITIATED_LOGOUT_ACCEPT_EXPIRED_TOKENS:
# Only check the following while loading the JWT
# - claims are dict
# - the Claims defined in RFC7519 if present have the correct type (string, integer, etc.)
# The claim contents are not validated. `exp` and `nbf` in particular are not validated.
check_claims = {}
else:
# Also validate the `exp` (expiration time) and `nbf` (not before) claims.
check_claims = None
jwt_token = jwt.JWT(key=key, jwt=token, check_claims=check_claims)
claims = json.loads(jwt_token.claims)
# Assumption: the `sub` claim and `user` property of the corresponding IDToken Object point to the
# same user.
# To verify that the IDToken was intended for the user it is therefore sufficient to check the `user`
# attribute on the IDToken Object later on.
return IDToken.objects.get(jti=claims["jti"]), claims
except (JWException, JWTExpired, IDToken.DoesNotExist):
return None, None
def _validate_claims(request, claims):
"""
Validates the claims of an IDToken for use with OIDC RP-Initiated Logout.
"""
validator = oauth2_settings.OAUTH2_VALIDATOR_CLASS()
# Verification of `iss` claim is mandated by OIDC RP-Initiated Logout specs.
if "iss" not in claims or claims["iss"] != validator.get_oidc_issuer_endpoint(request):
# IDToken was not issued by this OP, or it can not be verified.
return False
return True
@method_decorator(login_not_required, name="dispatch")
class RPInitiatedLogoutView(OIDCLogoutOnlyMixin, FormView):
template_name = "oauth2_provider/logout_confirm.html"
form_class = ConfirmLogoutForm
# Only delete tokens for Application whose client type and authorization
# grant type are in the respective lists.
token_deletion_client_types = [
Application.CLIENT_PUBLIC,
Application.CLIENT_CONFIDENTIAL,
]
token_deletion_grant_types = [
Application.GRANT_AUTHORIZATION_CODE,
Application.GRANT_IMPLICIT,
Application.GRANT_PASSWORD,
Application.GRANT_CLIENT_CREDENTIALS,
Application.GRANT_OPENID_HYBRID,
]
def get_initial(self):
return {
"id_token_hint": self.oidc_data.get("id_token_hint", None),
"logout_hint": self.oidc_data.get("logout_hint", None),
"client_id": self.oidc_data.get("client_id", None),
"post_logout_redirect_uri": self.oidc_data.get("post_logout_redirect_uri", None),
"state": self.oidc_data.get("state", None),
"ui_locales": self.oidc_data.get("ui_locales", None),
}
def dispatch(self, request, *args, **kwargs):
self.oidc_data = {}
return super().dispatch(request, *args, **kwargs)
def get(self, request, *args, **kwargs):
id_token_hint = request.GET.get("id_token_hint")
client_id = request.GET.get("client_id")
post_logout_redirect_uri = request.GET.get("post_logout_redirect_uri")
state = request.GET.get("state")
try:
application, token_user = self.validate_logout_request(
id_token_hint=id_token_hint,
client_id=client_id,
post_logout_redirect_uri=post_logout_redirect_uri,
)
except OIDCError as error:
return self.error_response(error)
if not self.must_prompt(token_user):
return self.do_logout(application, post_logout_redirect_uri, state, token_user)
self.oidc_data = {
"id_token_hint": id_token_hint,
"client_id": client_id,
"post_logout_redirect_uri": post_logout_redirect_uri,
"state": state,
}
form = self.get_form(self.get_form_class())
kwargs["form"] = form
if application:
kwargs["application"] = application
return self.render_to_response(self.get_context_data(**kwargs))
def form_valid(self, form):
id_token_hint = form.cleaned_data.get("id_token_hint")
client_id = form.cleaned_data.get("client_id")
post_logout_redirect_uri = form.cleaned_data.get("post_logout_redirect_uri")
state = form.cleaned_data.get("state")
try:
application, token_user = self.validate_logout_request(
id_token_hint=id_token_hint,
client_id=client_id,
post_logout_redirect_uri=post_logout_redirect_uri,
)
if not self.must_prompt(token_user) or form.cleaned_data.get("allow"):
return self.do_logout(application, post_logout_redirect_uri, state, token_user)
else:
raise LogoutDenied()
except OIDCError as error:
return self.error_response(error)
def validate_post_logout_redirect_uri(self, application, post_logout_redirect_uri):
"""
Validate the OIDC RP-Initiated Logout Request post_logout_redirect_uri parameter
"""
if not post_logout_redirect_uri:
return
if not application:
raise InvalidOIDCClientError()
scheme = urlparse(post_logout_redirect_uri)[0]
if not scheme:
raise InvalidOIDCRedirectURIError("A Scheme is required for the redirect URI.")
if oauth2_settings.OIDC_RP_INITIATED_LOGOUT_STRICT_REDIRECT_URIS and (
scheme == "http" and application.client_type != "confidential"
):
raise InvalidOIDCRedirectURIError("http is only allowed with confidential clients.")
if scheme not in application.get_allowed_schemes():
raise InvalidOIDCRedirectURIError(f'Redirect to scheme "{scheme}" is not permitted.')
if not application.post_logout_redirect_uri_allowed(post_logout_redirect_uri):
raise InvalidOIDCRedirectURIError("This client does not have this redirect uri registered.")
def validate_logout_request_user(self, id_token_hint, client_id):
"""
Validate the an OIDC RP-Initiated Logout Request user
"""
if not id_token_hint:
return
# Only basic validation has been done on the IDToken at this point.
id_token, claims = _load_id_token(id_token_hint)
if not id_token or not _validate_claims(self.request, claims):
raise InvalidIDTokenError()
# If both id_token_hint and client_id are given it must be verified that they match.
if client_id:
if id_token.application.client_id != client_id:
raise ClientIdMissmatch()
return id_token
def get_request_application(self, id_token, client_id):
if client_id:
return get_application_model().objects.get(client_id=client_id)
if id_token:
return id_token.application
def validate_logout_request(self, id_token_hint, client_id, post_logout_redirect_uri):
"""
Validate an OIDC RP-Initiated Logout Request.
`(application, token_user)` is returned.
If it is set, `application` is the Application that is requesting the logout.
`token_user` is the id_token user, which will used to revoke the tokens if found.
The `id_token_hint` will be validated if given. If both `client_id` and `id_token_hint` are given they
will be validated against each other.
"""
id_token = self.validate_logout_request_user(id_token_hint, client_id)
application = self.get_request_application(id_token, client_id)
self.validate_post_logout_redirect_uri(application, post_logout_redirect_uri)
return application, id_token.user if id_token else None
def must_prompt(self, token_user):
"""Indicate whether the logout has to be confirmed by the user. This happens if the
specifications force a confirmation, or it is enabled by `OIDC_RP_INITIATED_LOGOUT_ALWAYS_PROMPT`.
A logout without user interaction (i.e. no prompt) is only allowed
if an ID Token is provided that matches the current user.
"""
return (
oauth2_settings.OIDC_RP_INITIATED_LOGOUT_ALWAYS_PROMPT
or token_user is None
or token_user != self.request.user
)
def do_logout(self, application=None, post_logout_redirect_uri=None, state=None, token_user=None):
user = token_user or self.request.user
# Delete Access Tokens if a user was found
if oauth2_settings.OIDC_RP_INITIATED_LOGOUT_DELETE_TOKENS and not isinstance(user, AnonymousUser):
AccessToken = get_access_token_model()
RefreshToken = get_refresh_token_model()
access_tokens_to_delete = AccessToken.objects.filter(
user=user,
application__client_type__in=self.token_deletion_client_types,
application__authorization_grant_type__in=self.token_deletion_grant_types,
)
# This queryset has to be evaluated eagerly. The queryset would be empty with lazy evaluation
# because `access_tokens_to_delete` represents an empty queryset once `refresh_tokens_to_delete`
# is evaluated as all AccessTokens have been deleted.
refresh_tokens_to_delete = list(
RefreshToken.objects.filter(access_token__in=access_tokens_to_delete)
)
for token in access_tokens_to_delete:
# Delete the token and its corresponding refresh and IDTokens.
if token.id_token:
token.id_token.revoke()
token.revoke()
for refresh_token in refresh_tokens_to_delete:
refresh_token.revoke()
# Logout in Django
logout(self.request)
# Redirect
if post_logout_redirect_uri:
if state:
return OAuth2ResponseRedirect(
add_params_to_uri(post_logout_redirect_uri, [("state", state)]),
application.get_allowed_schemes(),
)
else:
return OAuth2ResponseRedirect(post_logout_redirect_uri, application.get_allowed_schemes())
else:
return OAuth2ResponseRedirect(
self.request.build_absolute_uri("/"),
oauth2_settings.ALLOWED_REDIRECT_URI_SCHEMES,
)
def error_response(self, error):
error_response = {"error": error}
return self.render_to_response(error_response, status=error.status_code)
|