1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155
|
from allauth import app_settings as allauth_settings
from allauth.account import app_settings as account_settings
from allauth.account.adapter import get_adapter as get_account_adapter
from allauth.account.authentication import get_authentication_records
from allauth.account.internal import flows
from allauth.account.internal.stagekit import LOGIN_SESSION_KEY
from allauth.headless.adapter import get_adapter
from allauth.headless.constants import Flow
from allauth.headless.internal import authkit
from allauth.headless.internal.restkit.response import APIResponse
from allauth.mfa import app_settings as mfa_settings
class BaseAuthenticationResponse(APIResponse):
def __init__(self, request, user=None, status=None):
data = {}
if user and user.is_authenticated:
adapter = get_adapter()
data["user"] = adapter.serialize_user(user)
data["methods"] = get_authentication_records(request)
status = status or 200
else:
status = status or 401
if status != 200:
data["flows"] = self._get_flows(request, user)
meta = {
"is_authenticated": user and user.is_authenticated,
}
super().__init__(
request,
data=data,
meta=meta,
status=status,
)
def _get_flows(self, request, user):
auth_status = authkit.AuthenticationStatus(request)
ret = []
if user and user.is_authenticated:
ret.extend(flows.reauthentication.get_reauthentication_flows(user))
else:
if not allauth_settings.SOCIALACCOUNT_ONLY:
ret.append({"id": Flow.LOGIN})
if account_settings.LOGIN_BY_CODE_ENABLED:
ret.append({"id": Flow.LOGIN_BY_CODE})
if (
get_account_adapter().is_open_for_signup(request)
and not allauth_settings.SOCIALACCOUNT_ONLY
):
ret.append({"id": Flow.SIGNUP})
if allauth_settings.SOCIALACCOUNT_ENABLED:
from allauth.headless.socialaccount.response import (
provider_flows,
)
ret.extend(provider_flows(request))
if allauth_settings.MFA_ENABLED:
if mfa_settings.PASSKEY_LOGIN_ENABLED:
ret.append({"id": Flow.MFA_LOGIN_WEBAUTHN})
stage_key = None
stage = auth_status.get_pending_stage()
if stage:
stage_key = stage.key
else:
lsk = request.session.get(LOGIN_SESSION_KEY)
if isinstance(lsk, str):
stage_key = lsk
if stage_key:
pending_flow = {"id": stage_key, "is_pending": True}
if stage and stage_key == Flow.MFA_AUTHENTICATE:
self._enrich_mfa_flow(stage, pending_flow)
self._upsert_pending_flow(ret, pending_flow)
return ret
def _upsert_pending_flow(self, flows, pending_flow):
flow = next((flow for flow in flows if flow["id"] == pending_flow["id"]), None)
if flow:
flow.update(pending_flow)
else:
flows.append(pending_flow)
def _enrich_mfa_flow(self, stage, flow: dict) -> None:
from allauth.mfa.adapter import get_adapter as get_mfa_adapter
from allauth.mfa.models import Authenticator
adapter = get_mfa_adapter()
types = []
for typ in Authenticator.Type:
if adapter.is_mfa_enabled(stage.login.user, types=[typ]):
types.append(typ)
flow["types"] = types
class AuthenticationResponse(BaseAuthenticationResponse):
def __init__(self, request):
super().__init__(request, user=request.user)
class ReauthenticationResponse(BaseAuthenticationResponse):
def __init__(self, request):
super().__init__(request, user=request.user, status=401)
class UnauthorizedResponse(BaseAuthenticationResponse):
def __init__(self, request, status=401):
super().__init__(request, user=None, status=status)
class ForbiddenResponse(APIResponse):
def __init__(self, request):
super().__init__(request, status=403)
class ConflictResponse(APIResponse):
def __init__(self, request):
super().__init__(request, status=409)
def get_config_data(request):
data = {
"authentication_method": account_settings.AUTHENTICATION_METHOD,
"is_open_for_signup": get_account_adapter().is_open_for_signup(request),
"email_verification_by_code_enabled": account_settings.EMAIL_VERIFICATION_BY_CODE_ENABLED,
"login_by_code_enabled": account_settings.LOGIN_BY_CODE_ENABLED,
}
return {"account": data}
class ConfigResponse(APIResponse):
def __init__(self, request):
data = get_config_data(request)
if allauth_settings.SOCIALACCOUNT_ENABLED:
from allauth.headless.socialaccount.response import (
get_config_data as get_socialaccount_config_data,
)
data.update(get_socialaccount_config_data(request))
if allauth_settings.MFA_ENABLED:
from allauth.headless.mfa.response import (
get_config_data as get_mfa_config_data,
)
data.update(get_mfa_config_data(request))
if allauth_settings.USERSESSIONS_ENABLED:
from allauth.headless.usersessions.response import (
get_config_data as get_usersessions_config_data,
)
data.update(get_usersessions_config_data(request))
return super().__init__(request, data=data)
class RateLimitResponse(APIResponse):
def __init__(self, request):
super().__init__(request, status=429)
|