1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129
|
from importlib import import_module
from django.conf import settings
from django.contrib.auth import get_user
from django.core.exceptions import ImproperlyConfigured
from django.db import models, transaction
from django.http import HttpRequest
from django.utils import timezone
from django.utils.translation import gettext_lazy as _
from allauth import app_settings as allauth_settings
from allauth.account.adapter import get_adapter
from allauth.core import context
if not allauth_settings.USERSESSIONS_ENABLED:
raise ImproperlyConfigured(
"allauth.usersessions not installed, yet its models are imported."
)
class UserSessionManager(models.Manager):
def purge_and_list(self, user):
ret = []
sessions = UserSession.objects.filter(user=user)
for session in sessions.iterator():
if not session.purge():
ret.append(session)
return ret
def create_from_request(self, request):
if not request.user.is_authenticated:
raise ValueError()
if not request.session.session_key:
request.session.save()
ua = request.META.get("HTTP_USER_AGENT", "")[
0 : UserSession._meta.get_field("user_agent").max_length
]
defaults = dict(
user=request.user,
ip=get_adapter().get_client_ip(request),
user_agent=ua,
)
from_session = None
with transaction.atomic():
from allauth.usersessions.signals import session_client_changed
session, created = UserSession.objects.get_or_create(
session_key=request.session.session_key, defaults=defaults
)
if not created:
from_session = UserSession(
session_key=session.session_key,
user=session.user,
ip=session.ip,
user_agent=session.user_agent,
data=session.data,
created_at=session.created_at,
last_seen_at=session.last_seen_at,
)
# Update session
session.user = defaults["user"]
session.ip = defaults["ip"]
session.user_agent = defaults["user_agent"]
session.last_seen_at = timezone.now()
session.save()
if from_session and (
from_session.ip != session.ip
or from_session.user_agent != session.user_agent
):
session_client_changed.send(
sender=UserSession,
request=request,
from_session=from_session,
to_session=session,
)
class UserSession(models.Model):
objects = UserSessionManager()
user = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.CASCADE)
created_at = models.DateTimeField(default=timezone.now)
ip = models.GenericIPAddressField()
last_seen_at = models.DateTimeField(default=timezone.now)
session_key = models.CharField(
_("session key"), max_length=40, unique=True, editable=False
)
user_agent = models.CharField(max_length=200)
data = models.JSONField(default=dict)
def __str__(self):
return f"{self.ip} ({self.user_agent})"
def _session_store(self, *args):
engine = import_module(settings.SESSION_ENGINE)
return engine.SessionStore(*args)
def exists(self):
return self._session_store().exists(self.session_key)
def purge(self):
purge = not self.exists()
if not purge:
# Even if the session still exists, it might be the case that the
# user session hash is out of sync. So, let's see if
# `django.contrib.auth` can find a user...
request = HttpRequest()
request.session = self._session_store(self.session_key)
user = get_user(request)
purge = not user or user.is_anonymous
if purge:
self.delete()
return True
return False
def is_current(self):
return self.session_key == context.request.session.session_key
def end(self):
engine = import_module(settings.SESSION_ENGINE)
store = engine.SessionStore()
store.delete(self.session_key)
self.delete()
|