1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172
|
# Copyright 2013 The Distro Tracker Developers
# See the COPYRIGHT file at http://deb.li/DTAuthors
#
# Adapted to work directly with the django user model
# instead of DistroTracker UserEmail
# 2016 Neil Williams <neil.williams@linaro.org>
#
# SPDX-License-Identifier: GPL-2.0-or-later
import ldap
from django.contrib import auth
from django.contrib.auth.backends import RemoteUserBackend
from django.contrib.auth.middleware import RemoteUserMiddleware
from django.contrib.auth.models import User
from django.core.exceptions import ImproperlyConfigured
class DebianSsoUserMiddleware(RemoteUserMiddleware):
"""
Middleware that initiates user authentication based on the REMOTE_USER
field provided by Debian's SSO system, or based on the SSL_CLIENT_S_DN_CN
field provided by the validation of the SSL client certificate generated
by sso.debian.org.
If the currently logged in user is a DD (as identified by having a
@debian.org address), he is forcefully logged out if the header is no longer
found or is invalid.
To enable, set "AUTH_DEBIAN_SSO" = true in /etc/lava-server/settings.conf
(JSON syntax). There is no deduplication of users in lava-server, it is
NOT supported to have Debian SSO and LDAP configured on the same instance.
"""
dacs_header = "REMOTE_USER"
cert_header = "SSL_CLIENT_S_DN_CN"
@staticmethod
def dacs_user_to_email(username):
parts = [part for part in username.split(":") if part]
federation, jurisdiction = parts[:2]
if (federation, jurisdiction) != ("DEBIANORG", "DEBIAN"):
return
username = parts[-1]
if "@" in username:
return username # Full email already
return username + "@debian.org"
@staticmethod
def is_debian_member(user):
return user.email.endswith("@debian.org")
def process_request(self, request):
# AuthenticationMiddleware is required so that request.user exists.
if not hasattr(request, "user"):
raise ImproperlyConfigured(
"The Django remote user auth middleware requires the"
" authentication middleware to be installed. Edit your"
" MIDDLEWARE setting to insert"
" 'django.contrib.auth.middleware.AuthenticationMiddleware'"
" before the DebianSsoUserMiddleware class."
)
dacs_user = request.META.get(self.dacs_header)
cert_user = request.META.get(self.cert_header)
if cert_user is not None:
remote_user = cert_user
elif dacs_user is not None:
remote_user = self.dacs_user_to_email(dacs_user)
else:
# Debian developers can only authenticate via SSO/SSL certs
# so log them out now if they no longer have the proper META
# variable
if request.user.is_authenticated:
if self.is_debian_member(request.user):
auth.logout(request)
return
if request.user.is_authenticated:
user = User.objects.filter(email=request.user.email)
if user.exists():
# The currently logged in user matches the one given by the
# headers.
return
# This will create the user if it doesn't exist
user = auth.authenticate(remote_user=remote_user)
if user:
# User is valid. Set request.user and persist user in the session
# by logging the user in.
request.user = user
auth.login(request, user)
class DebianSsoUserBackend(RemoteUserBackend):
"""
The authentication backend which authenticates the provided remote user
(identified by his @debian.org email) in Django. If a matching User
model instance does not exist, one is automatically created. In that case
the DDs first and last name are pulled from Debian's LDAP.
"""
def generate_unique_username(self, count, slug):
username = "%s%d" % (slug, count)
try:
User.objects.get(username=username)
count += 1
return self.generate_unique_username(count, slug)
except User.DoesNotExist:
return username
def ensure_unique_username(self, username):
count = 0
try:
User.objects.get(username=username)
return self.generate_unique_username(count, username)
except User.DoesNotExist:
return username
def authenticate(self, remote_user):
if not remote_user:
return
user = User.objects.filter(email=remote_user)
if user:
return user[0]
kwargs = {}
names = self.get_user_details(remote_user)
if names:
kwargs.update(names)
username = "sso-user"
email_list = remote_user.split("@")
if len(email_list) > 1:
username = email_list[0]
username = self.ensure_unique_username(username)
user = User.objects.create_user(username=username, email=remote_user, **kwargs)
return user
@staticmethod
def get_uid(remote_user):
# Strips off the @debian.org part of the email leaving the uid
if remote_user.endswith("@debian.org"):
return remote_user[:-11]
return remote_user
def get_user_details(self, remote_user):
"""
Gets the details of the given user from the Debian LDAP.
:return: Dict with the keys ``first_name``, ``last_name``
``None`` if the LDAP lookup did not return anything.
"""
if ldap is None:
return None
if not remote_user.endswith("@debian.org"):
# We only know how to extract data for DD via LDAP
return None
service = ldap.initialize("ldap://db.debian.org")
result_set = service.search_s(
"dc=debian,dc=org",
ldap.SCOPE_SUBTREE,
f"uid={self.get_uid(remote_user)}",
None,
)
if not result_set:
return None
result = result_set[0]
return {
"first_name": result[1]["cn"][0].decode("utf-8"),
"last_name": result[1]["sn"][0].decode("utf-8"),
}
|