File: models.py

package info (click to toggle)
django-allauth 65.0.2-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 9,672 kB
  • sloc: python: 34,411; javascript: 3,070; xml: 849; makefile: 235; sh: 8
file content (129 lines) | stat: -rw-r--r-- 4,368 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
from importlib import import_module

from django.conf import settings
from django.contrib.auth import get_user
from django.core.exceptions import ImproperlyConfigured
from django.db import models, transaction
from django.http import HttpRequest
from django.utils import timezone
from django.utils.translation import gettext_lazy as _

from allauth import app_settings as allauth_settings
from allauth.account.adapter import get_adapter
from allauth.core import context


if not allauth_settings.USERSESSIONS_ENABLED:
    raise ImproperlyConfigured(
        "allauth.usersessions not installed, yet its models are imported."
    )


class UserSessionManager(models.Manager):
    def purge_and_list(self, user):
        ret = []
        sessions = UserSession.objects.filter(user=user)
        for session in sessions.iterator():
            if not session.purge():
                ret.append(session)
        return ret

    def create_from_request(self, request):
        if not request.user.is_authenticated:
            raise ValueError()
        if not request.session.session_key:
            request.session.save()
        ua = request.META.get("HTTP_USER_AGENT", "")[
            0 : UserSession._meta.get_field("user_agent").max_length
        ]

        defaults = dict(
            user=request.user,
            ip=get_adapter().get_client_ip(request),
            user_agent=ua,
        )

        from_session = None
        with transaction.atomic():
            from allauth.usersessions.signals import session_client_changed

            session, created = UserSession.objects.get_or_create(
                session_key=request.session.session_key, defaults=defaults
            )

            if not created:
                from_session = UserSession(
                    session_key=session.session_key,
                    user=session.user,
                    ip=session.ip,
                    user_agent=session.user_agent,
                    data=session.data,
                    created_at=session.created_at,
                    last_seen_at=session.last_seen_at,
                )
                # Update session
                session.user = defaults["user"]
                session.ip = defaults["ip"]
                session.user_agent = defaults["user_agent"]
                session.last_seen_at = timezone.now()

                session.save()

        if from_session and (
            from_session.ip != session.ip
            or from_session.user_agent != session.user_agent
        ):
            session_client_changed.send(
                sender=UserSession,
                request=request,
                from_session=from_session,
                to_session=session,
            )


class UserSession(models.Model):
    objects = UserSessionManager()

    user = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.CASCADE)
    created_at = models.DateTimeField(default=timezone.now)
    ip = models.GenericIPAddressField()
    last_seen_at = models.DateTimeField(default=timezone.now)
    session_key = models.CharField(
        _("session key"), max_length=40, unique=True, editable=False
    )
    user_agent = models.CharField(max_length=200)
    data = models.JSONField(default=dict)

    def __str__(self):
        return f"{self.ip} ({self.user_agent})"

    def _session_store(self, *args):
        engine = import_module(settings.SESSION_ENGINE)
        return engine.SessionStore(*args)

    def exists(self):
        return self._session_store().exists(self.session_key)

    def purge(self):
        purge = not self.exists()
        if not purge:
            # Even if the session still exists, it might be the case that the
            # user session hash is out of sync. So, let's see if
            # `django.contrib.auth` can find a user...
            request = HttpRequest()
            request.session = self._session_store(self.session_key)
            user = get_user(request)
            purge = not user or user.is_anonymous
        if purge:
            self.delete()
            return True
        return False

    def is_current(self):
        return self.session_key == context.request.session.session_key

    def end(self):
        engine = import_module(settings.SESSION_ENGINE)
        store = engine.SessionStore()
        store.delete(self.session_key)
        self.delete()