File: flows.py

package info (click to toggle)
django-allauth 65.0.2-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 9,672 kB
  • sloc: python: 34,411; javascript: 3,070; xml: 849; makefile: 235; sh: 8
file content (58 lines) | stat: -rw-r--r-- 1,870 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
from typing import Callable, Optional

from allauth.account.adapter import get_adapter as get_account_adapter
from allauth.account.internal.flows.login import record_authentication
from allauth.core import context, ratelimit
from allauth.mfa import signals
from allauth.mfa.models import Authenticator


def delete_dangling_recovery_codes(user) -> Optional[Authenticator]:
    deleted_authenticator = None
    qs = Authenticator.objects.filter(user=user)
    if not qs.exclude(type=Authenticator.Type.RECOVERY_CODES).exists():
        deleted_authenticator = qs.first()
        qs.delete()
    return deleted_authenticator


def delete_and_cleanup(request, authenticator) -> None:
    authenticator.delete()
    rc_auth = delete_dangling_recovery_codes(authenticator.user)
    for auth in [authenticator, rc_auth]:
        if auth:
            signals.authenticator_removed.send(
                sender=Authenticator,
                request=request,
                user=request.user,
                authenticator=auth,
            )


def post_authentication(
    request,
    authenticator: Authenticator,
    reauthenticated: bool = False,
    passwordless: bool = False,
) -> None:
    authenticator.record_usage()
    extra_data = {
        "id": authenticator.pk,
        "type": authenticator.type,
    }
    if reauthenticated:
        extra_data["reauthenticated"] = True
    if passwordless:
        extra_data["passwordless"] = True
    record_authentication(request, "mfa", **extra_data)


def check_rate_limit(user) -> Callable[[], None]:
    key = f"mfa-auth-user-{str(user.pk)}"
    if not ratelimit.consume(
        context.request,
        action="login_failed",
        key=key,
    ):
        raise get_account_adapter().validation_error("too_many_login_attempts")
    return lambda: ratelimit.clear(context.request, action="login_failed", key=key)