File: ldap_clean_users.py

package info (click to toggle)
django-python3-ldap 0.15.8-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 224 kB
  • sloc: python: 975; sh: 6; makefile: 4
file content (112 lines) | stat: -rw-r--r-- 4,239 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
from django.contrib.auth import get_user_model
from django.core.management.base import BaseCommand, CommandError
from django.db import transaction
from django.db.models import ProtectedError

from django_python3_ldap import ldap
from django_python3_ldap.conf import settings
from django_python3_ldap.utils import group_lookup_args


class Command(BaseCommand):

    help = "Remove local user models for users not find anymore in the remote LDAP authentication server."

    def add_arguments(self, parser):
        parser.add_argument(
            '-p',
            '--purge',
            action='store_true',
            help='Purge instead of deactive local user models'
        )
        parser.add_argument(
            'lookups',
            nargs='*',
            type=str,
            help='A list of lookup values, matching the fields specified in LDAP_AUTH_USER_LOOKUP_FIELDS. '
                 'If this is not provided then ALL users are concerned.'
        )
        parser.add_argument(
            '--superuser',
            action='store_true',
            help='Handle superuser (by default, superusers are excluded)'
        )
        parser.add_argument(
            '--staff',
            action='store_true',
            help='Handle staff user (by default,staff users are excluded)'
        )

    @staticmethod
    def _iter_local_users(User, lookups, superuser, staff):
        """
        Iterates over local users. If the list of lookups is empty, then all users are returned.
        However, if lookups are provided, User.object.get is used to clean each user found using the lookups.
        Exclude or not superuser and or staff user.
        """

        if len(lookups) < 1:
            for user in User.objects.filter(is_superuser=superuser,
                                            is_staff=staff):
                yield user
        else:
            for lookup in group_lookup_args(*lookups):
                try:
                    yield User.objects.get(**lookup,
                                           is_superuser=superuser,
                                           is_staff=staff)
                except User.DoesNotExist:
                    raise CommandError("Could not find user with lookup : {lookup}".format(
                        lookup=lookup,
                    ))

    @staticmethod
    def _remove(user, purge):
        """
        Deactivate or purge a given local user
        """
        if purge:
            # Delete local user
            try:
                user.delete()
            except ProtectedError as e:
                raise CommandError("Could not purge user {user} : {e}".format(
                    user=user,
                    e=e
                ))
        else:
            # Deactivate local user
            user.is_active = False
            user.save()

    @transaction.atomic()
    def handle(self, *args, **kwargs):
        verbosity = int(kwargs.get("verbosity", 1))
        purge = kwargs.get('purge', False)
        lookups = kwargs.get('lookups', [])
        superuser = kwargs.get('superuser', False)
        staff = kwargs.get('staff', False)
        User = get_user_model()
        auth_kwargs = {
            User.USERNAME_FIELD: settings.LDAP_AUTH_CONNECTION_USERNAME,
            'password': settings.LDAP_AUTH_CONNECTION_PASSWORD
        }
        with ldap.connection(**auth_kwargs) as connection:
            if connection is None:
                raise CommandError("Could not connect to LDAP server")
            for user in self._iter_local_users(User, lookups, superuser, staff):
                # For each local users
                # Check if user still exists
                user_kwargs = {
                    User.USERNAME_FIELD: getattr(user, User.USERNAME_FIELD)
                }
                if connection.has_user(**user_kwargs):
                    # User still exists on LDAP side
                    continue
                # Clean user
                self._remove(user, purge)
                if verbosity >= 1:
                    self.stdout.write("{action} {user}".format(
                        action=('Purged' if purge else 'Deactivated'),
                        user=user,
                    ))