File: authkit.py

package info (click to toggle)
django-allauth 65.0.2-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 9,672 kB
  • sloc: python: 34,411; javascript: 3,070; xml: 849; makefile: 235; sh: 8
file content (85 lines) | stat: -rw-r--r-- 2,937 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
from contextlib import contextmanager
from typing import Any, Dict, Optional

from django.utils.functional import SimpleLazyObject, empty

from allauth import app_settings as allauth_settings
from allauth.account.internal.stagekit import get_pending_stage
from allauth.core.exceptions import ImmediateHttpResponse
from allauth.headless import app_settings
from allauth.headless.constants import Client
from allauth.headless.internal import sessionkit


class AuthenticationStatus:
    def __init__(self, request):
        self.request = request

    @property
    def is_authenticated(self):
        return self.request.user.is_authenticated

    def get_pending_stage(self):
        return get_pending_stage(self.request)

    @property
    def has_pending_signup(self):
        if not allauth_settings.SOCIALACCOUNT_ENABLED:
            return False
        from allauth.socialaccount.internal import flows

        return bool(flows.signup.get_pending_signup(self.request))


def purge_request_user_cache(request):
    for attr in ["_cached_user", "_acached_user"]:
        if hasattr(request, attr):
            delattr(request, attr)
    if isinstance(request.user, SimpleLazyObject):
        request.user._wrapped = empty


@contextmanager
def authentication_context(request):
    from allauth.headless.base.response import UnauthorizedResponse

    old_user = request.user
    old_session = request.session
    try:
        request.session = sessionkit.new_session()
        purge_request_user_cache(request)

        strategy = app_settings.TOKEN_STRATEGY
        session_token = strategy.get_session_token(request)
        if session_token:
            session = strategy.lookup_session(session_token)
            if not session:
                raise ImmediateHttpResponse(UnauthorizedResponse(request, status=410))
            request.session = session
            purge_request_user_cache(request)
        request.allauth.headless._pre_user = request.user
        # request.user is lazy -- force evaluation
        request.allauth.headless._pre_user.pk
        yield
    finally:
        if request.session.modified and not request.session.is_empty():
            request.session.save()
        request.user = old_user
        request.session = old_session
        # e.g. logging in calls csrf `rotate_token()` -- this prevents setting a new CSRF cookie.
        request.META["CSRF_COOKIE_NEEDS_UPDATE"] = False


def expose_access_token(request) -> Optional[Dict[str, Any]]:
    """
    Determines if a new access token needs to be exposed.
    """
    if request.allauth.headless.client != Client.APP:
        return None
    if not request.user.is_authenticated:
        return None
    pre_user = request.allauth.headless._pre_user
    if pre_user.is_authenticated and pre_user.pk == request.user.pk:
        return None
    strategy = app_settings.TOKEN_STRATEGY
    return strategy.create_access_token_payload(request)