File: response.py

package info (click to toggle)
django-allauth 65.0.2-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 9,672 kB
  • sloc: python: 34,411; javascript: 3,070; xml: 849; makefile: 235; sh: 8
file content (155 lines) | stat: -rw-r--r-- 5,757 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
from allauth import app_settings as allauth_settings
from allauth.account import app_settings as account_settings
from allauth.account.adapter import get_adapter as get_account_adapter
from allauth.account.authentication import get_authentication_records
from allauth.account.internal import flows
from allauth.account.internal.stagekit import LOGIN_SESSION_KEY
from allauth.headless.adapter import get_adapter
from allauth.headless.constants import Flow
from allauth.headless.internal import authkit
from allauth.headless.internal.restkit.response import APIResponse
from allauth.mfa import app_settings as mfa_settings


class BaseAuthenticationResponse(APIResponse):
    def __init__(self, request, user=None, status=None):
        data = {}
        if user and user.is_authenticated:
            adapter = get_adapter()
            data["user"] = adapter.serialize_user(user)
            data["methods"] = get_authentication_records(request)
            status = status or 200
        else:
            status = status or 401
        if status != 200:
            data["flows"] = self._get_flows(request, user)
        meta = {
            "is_authenticated": user and user.is_authenticated,
        }
        super().__init__(
            request,
            data=data,
            meta=meta,
            status=status,
        )

    def _get_flows(self, request, user):
        auth_status = authkit.AuthenticationStatus(request)
        ret = []
        if user and user.is_authenticated:
            ret.extend(flows.reauthentication.get_reauthentication_flows(user))
        else:
            if not allauth_settings.SOCIALACCOUNT_ONLY:
                ret.append({"id": Flow.LOGIN})
            if account_settings.LOGIN_BY_CODE_ENABLED:
                ret.append({"id": Flow.LOGIN_BY_CODE})
            if (
                get_account_adapter().is_open_for_signup(request)
                and not allauth_settings.SOCIALACCOUNT_ONLY
            ):
                ret.append({"id": Flow.SIGNUP})
            if allauth_settings.SOCIALACCOUNT_ENABLED:
                from allauth.headless.socialaccount.response import (
                    provider_flows,
                )

                ret.extend(provider_flows(request))
            if allauth_settings.MFA_ENABLED:
                if mfa_settings.PASSKEY_LOGIN_ENABLED:
                    ret.append({"id": Flow.MFA_LOGIN_WEBAUTHN})
        stage_key = None
        stage = auth_status.get_pending_stage()
        if stage:
            stage_key = stage.key
        else:
            lsk = request.session.get(LOGIN_SESSION_KEY)
            if isinstance(lsk, str):
                stage_key = lsk
        if stage_key:
            pending_flow = {"id": stage_key, "is_pending": True}
            if stage and stage_key == Flow.MFA_AUTHENTICATE:
                self._enrich_mfa_flow(stage, pending_flow)
            self._upsert_pending_flow(ret, pending_flow)
        return ret

    def _upsert_pending_flow(self, flows, pending_flow):
        flow = next((flow for flow in flows if flow["id"] == pending_flow["id"]), None)
        if flow:
            flow.update(pending_flow)
        else:
            flows.append(pending_flow)

    def _enrich_mfa_flow(self, stage, flow: dict) -> None:
        from allauth.mfa.adapter import get_adapter as get_mfa_adapter
        from allauth.mfa.models import Authenticator

        adapter = get_mfa_adapter()
        types = []
        for typ in Authenticator.Type:
            if adapter.is_mfa_enabled(stage.login.user, types=[typ]):
                types.append(typ)
        flow["types"] = types


class AuthenticationResponse(BaseAuthenticationResponse):
    def __init__(self, request):
        super().__init__(request, user=request.user)


class ReauthenticationResponse(BaseAuthenticationResponse):
    def __init__(self, request):
        super().__init__(request, user=request.user, status=401)


class UnauthorizedResponse(BaseAuthenticationResponse):
    def __init__(self, request, status=401):
        super().__init__(request, user=None, status=status)


class ForbiddenResponse(APIResponse):
    def __init__(self, request):
        super().__init__(request, status=403)


class ConflictResponse(APIResponse):
    def __init__(self, request):
        super().__init__(request, status=409)


def get_config_data(request):
    data = {
        "authentication_method": account_settings.AUTHENTICATION_METHOD,
        "is_open_for_signup": get_account_adapter().is_open_for_signup(request),
        "email_verification_by_code_enabled": account_settings.EMAIL_VERIFICATION_BY_CODE_ENABLED,
        "login_by_code_enabled": account_settings.LOGIN_BY_CODE_ENABLED,
    }
    return {"account": data}


class ConfigResponse(APIResponse):
    def __init__(self, request):
        data = get_config_data(request)
        if allauth_settings.SOCIALACCOUNT_ENABLED:
            from allauth.headless.socialaccount.response import (
                get_config_data as get_socialaccount_config_data,
            )

            data.update(get_socialaccount_config_data(request))
        if allauth_settings.MFA_ENABLED:
            from allauth.headless.mfa.response import (
                get_config_data as get_mfa_config_data,
            )

            data.update(get_mfa_config_data(request))
        if allauth_settings.USERSESSIONS_ENABLED:
            from allauth.headless.usersessions.response import (
                get_config_data as get_usersessions_config_data,
            )

            data.update(get_usersessions_config_data(request))
        return super().__init__(request, data=data)


class RateLimitResponse(APIResponse):
    def __init__(self, request):
        super().__init__(request, status=429)