1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156
|
import time
from authlib.common.encoding import to_native
from authlib.common.urls import add_params_to_uri
from authlib.common.urls import quote_url
from authlib.jose import jwt
from authlib.oauth2.rfc6749 import InvalidRequestError
from authlib.oauth2.rfc6749 import scope_to_list
from ..errors import AccountSelectionRequiredError
from ..errors import ConsentRequiredError
from ..errors import LoginRequiredError
from ..util import create_half_hash
def is_openid_scope(scope):
scopes = scope_to_list(scope)
return scopes and "openid" in scopes
def validate_request_prompt(grant, redirect_uri, redirect_fragment=False):
prompt = grant.request.payload.data.get("prompt")
end_user = grant.request.user
if not prompt:
if not end_user:
grant.prompt = "login"
return grant
if prompt == "none" and not end_user:
raise LoginRequiredError(
redirect_uri=redirect_uri, redirect_fragment=redirect_fragment
)
prompts = prompt.split()
if "none" in prompts and len(prompts) > 1:
# If this parameter contains none with any other value,
# an error is returned
raise InvalidRequestError(
"Invalid 'prompt' parameter.",
redirect_uri=redirect_uri,
redirect_fragment=redirect_fragment,
)
prompt = _guess_prompt_value(
end_user, prompts, redirect_uri, redirect_fragment=redirect_fragment
)
if prompt:
grant.prompt = prompt
return grant
def validate_nonce(request, exists_nonce, required=False):
nonce = request.payload.data.get("nonce")
if not nonce:
if required:
raise InvalidRequestError("Missing 'nonce' in request.")
return True
if exists_nonce(nonce, request):
raise InvalidRequestError("Replay attack")
def generate_id_token(
token,
user_info,
key,
iss,
aud,
alg="RS256",
exp=3600,
nonce=None,
auth_time=None,
acr=None,
amr=None,
code=None,
kid=None,
):
now = int(time.time())
if auth_time is None:
auth_time = now
header = {"alg": alg}
if kid:
header["kid"] = kid
payload = {
"iss": iss,
"aud": aud,
"iat": now,
"exp": now + exp,
"auth_time": auth_time,
}
if nonce:
payload["nonce"] = nonce
if acr:
payload["acr"] = acr
if amr:
payload["amr"] = amr
if code:
payload["c_hash"] = to_native(create_half_hash(code, alg))
access_token = token.get("access_token")
if access_token:
payload["at_hash"] = to_native(create_half_hash(access_token, alg))
payload.update(user_info)
return to_native(jwt.encode(header, payload, key))
def create_response_mode_response(redirect_uri, params, response_mode):
if response_mode == "form_post":
tpl = (
"<html><head><title>Redirecting</title></head>"
'<body onload="javascript:document.forms[0].submit()">'
'<form method="post" action="{}">{}</form></body></html>'
)
inputs = "".join(
[
f'<input type="hidden" name="{quote_url(k)}" value="{quote_url(v)}"/>'
for k, v in params
]
)
body = tpl.format(quote_url(redirect_uri), inputs)
return 200, body, [("Content-Type", "text/html; charset=utf-8")]
if response_mode == "query":
uri = add_params_to_uri(redirect_uri, params, fragment=False)
elif response_mode == "fragment":
uri = add_params_to_uri(redirect_uri, params, fragment=True)
else:
raise InvalidRequestError('Invalid "response_mode" value')
return 302, "", [("Location", uri)]
def _guess_prompt_value(end_user, prompts, redirect_uri, redirect_fragment):
# http://openid.net/specs/openid-connect-core-1_0.html#AuthRequest
if not end_user or "login" in prompts:
return "login"
if "consent" in prompts:
if not end_user:
raise ConsentRequiredError(
redirect_uri=redirect_uri, redirect_fragment=redirect_fragment
)
return "consent"
elif "select_account" in prompts:
if not end_user:
raise AccountSelectionRequiredError(
redirect_uri=redirect_uri, redirect_fragment=redirect_fragment
)
return "select_account"
|