Skip to content

Commit 8a447f8

Browse files
authored
Merge pull request #240 from jordiromera/ldap_clean
Ldap clean users
2 parents c4be9db + fd14bf8 commit 8a447f8

File tree

4 files changed

+306
-6
lines changed

4 files changed

+306
-6
lines changed

README.rst

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ Available settings
8686
8787
# The LDAP username and password of a user for querying the LDAP database for user
8888
# details. If None, then the authenticated user will be used for querying, and
89-
# the `ldap_sync_users` command will perform an anonymous query.
89+
# the `ldap_sync_users`, `ldap_clean_users` commands will perform an anonymous query.
9090
LDAP_AUTH_CONNECTION_USERNAME = None
9191
LDAP_AUTH_CONNECTION_PASSWORD = None
9292
@@ -158,6 +158,19 @@ The parameters are:-
158158
- ``dn`` - the DN (Distinguished Name) of the LDAP matched user (optional keyword only parameter)
159159

160160

161+
Clean User
162+
----------
163+
164+
When a LDAP user is removed from server it could be interresting to deactive or delete its local Django account
165+
to prevent unauthorized access.
166+
167+
To do so run:
168+
169+
``./manage.py ldap_clean_users`` (or ``./manage.py ldap_clean_users --purge``).
170+
171+
It will deactivate all local users non declared on LDAP server. If ``--purge`` is specified, all local users will be deleted.
172+
173+
161174
Can't get authentication to work?
162175
---------------------------------
163176

django_python3_ldap/ldap.py

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -116,17 +116,27 @@ def get_user(self, **kwargs):
116116
in settings.LDAP_AUTH_USER_LOOKUP_FIELDS.
117117
"""
118118
# Search the LDAP database.
119-
if self._connection.search(
119+
if self.has_user(**kwargs):
120+
return self._get_or_create_user(self._connection.response[0])
121+
logger.warning("LDAP user lookup failed")
122+
return None
123+
124+
def has_user(self, **kwargs):
125+
"""
126+
Returns True if the user with the given identifier exists.
127+
128+
The user identifier should be keyword arguments matching the fields
129+
in settings.LDAP_AUTH_USER_LOOKUP_FIELDS.
130+
"""
131+
# Search the LDAP database.
132+
return self._connection.search(
120133
search_base=settings.LDAP_AUTH_SEARCH_BASE,
121134
search_filter=format_search_filter(kwargs),
122135
search_scope=ldap3.SUBTREE,
123136
attributes=ldap3.ALL_ATTRIBUTES,
124137
get_operational_attributes=True,
125138
size_limit=1,
126-
):
127-
return self._get_or_create_user(self._connection.response[0])
128-
logger.warning("LDAP user lookup failed")
129-
return None
139+
)
130140

131141

132142
@contextmanager
Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
from django.contrib.auth import get_user_model
2+
from django.core.management.base import BaseCommand, CommandError
3+
from django.db import transaction
4+
from django.db.models import ProtectedError
5+
6+
from django_python3_ldap import ldap
7+
from django_python3_ldap.conf import settings
8+
from django_python3_ldap.utils import group_lookup_args
9+
10+
11+
class Command(BaseCommand):
12+
13+
help = "Remove local user models for users not find anymore in the remote LDAP authentication server."
14+
15+
def add_arguments(self, parser):
16+
parser.add_argument(
17+
'-p',
18+
'--purge',
19+
action='store_true',
20+
help='Purge instead of deactive local user models'
21+
)
22+
parser.add_argument(
23+
'lookups',
24+
nargs='*',
25+
type=str,
26+
help='A list of lookup values, matching the fields specified in LDAP_AUTH_USER_LOOKUP_FIELDS. '
27+
'If this is not provided then ALL users are concerned.'
28+
)
29+
parser.add_argument(
30+
'--superuser',
31+
action='store_true',
32+
help='Handle superuser (by default, superusers are excluded)'
33+
)
34+
parser.add_argument(
35+
'--staff',
36+
action='store_true',
37+
help='Handle staff user (by default,staff users are excluded)'
38+
)
39+
40+
@staticmethod
41+
def _iter_local_users(User, lookups, superuser, staff):
42+
"""
43+
Iterates over local users. If the list of lookups is empty, then all users are returned.
44+
However, if lookups are provided, User.object.get is used to clean each user found using the lookups.
45+
Exclude or not superuser and or staff user.
46+
"""
47+
48+
if len(lookups) < 1:
49+
for user in User.objects.filter(is_superuser=superuser,
50+
is_staff=staff):
51+
yield user
52+
else:
53+
for lookup in group_lookup_args(*lookups):
54+
try:
55+
yield User.objects.get(**lookup,
56+
is_superuser=superuser,
57+
is_staff=staff)
58+
except User.DoesNotExist:
59+
raise CommandError("Could not find user with lookup : {lookup}".format(
60+
lookup=lookup,
61+
))
62+
63+
@staticmethod
64+
def _remove(user, purge):
65+
"""
66+
Deactivate or purge a given local user
67+
"""
68+
if purge:
69+
# Delete local user
70+
try:
71+
user.delete()
72+
except ProtectedError as e:
73+
raise CommandError("Could not purge user {user} : {e}".format(
74+
user=user,
75+
e=e
76+
))
77+
else:
78+
# Deactivate local user
79+
user.is_active = False
80+
user.save()
81+
82+
@transaction.atomic()
83+
def handle(self, *args, **kwargs):
84+
verbosity = int(kwargs.get("verbosity", 1))
85+
purge = kwargs.get('purge', False)
86+
lookups = kwargs.get('lookups', [])
87+
superuser = kwargs.get('superuser', False)
88+
staff = kwargs.get('staff', False)
89+
User = get_user_model()
90+
auth_kwargs = {
91+
User.USERNAME_FIELD: settings.LDAP_AUTH_CONNECTION_USERNAME,
92+
'password': settings.LDAP_AUTH_CONNECTION_PASSWORD
93+
}
94+
with ldap.connection(**auth_kwargs) as connection:
95+
if connection is None:
96+
raise CommandError("Could not connect to LDAP server")
97+
for user in self._iter_local_users(User, lookups, superuser, staff):
98+
# For each local users
99+
# Check if user still exists
100+
user_kwargs = {
101+
User.USERNAME_FIELD: getattr(user, User.USERNAME_FIELD)
102+
}
103+
if connection.has_user(**user_kwargs):
104+
# User still exists on LDAP side
105+
continue
106+
# Clean user
107+
self._remove(user, purge)
108+
if verbosity >= 1:
109+
self.stdout.write("{action} {user}".format(
110+
action=('Purged' if purge else 'Deactivated'),
111+
user=user,
112+
))

django_python3_ldap/tests.py

Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,20 @@ def testGetUserKwargsIncorrectUsername(self):
5757
)
5858
self.assertEqual(user, None)
5959

60+
def testHasUserKwargsSuccess(self):
61+
with connection() as c:
62+
exist = c.has_user(
63+
username=settings.LDAP_AUTH_TEST_USER_USERNAME,
64+
)
65+
self.assertEqual(exist, True)
66+
67+
def testHasUserKwargsIncorrectUsername(self):
68+
with connection() as c:
69+
exist = c.has_user(
70+
username="bad" + settings.LDAP_AUTH_TEST_USER_USERNAME,
71+
)
72+
self.assertEqual(exist, False)
73+
6074
# Authentication tests.
6175

6276
def testAuthenticateUserSuccess(self):
@@ -247,3 +261,154 @@ def testImportFunc(self):
247261

248262
with self.settings(LDAP_AUTH_SYNC_USER_RELATIONS='django.contrib.auth.get_user_model'):
249263
self.assertTrue(callable(import_func(settings.LDAP_AUTH_SYNC_USER_RELATIONS)))
264+
265+
def testCleanUsersDeactivate(self):
266+
"""
267+
ldap_clean_users management command test
268+
"""
269+
from django.contrib.auth import get_user_model
270+
User = get_user_model()
271+
_username = "nonldap{user}".format(user=settings.LDAP_AUTH_TEST_USER_USERNAME)
272+
user = User.objects.create_user(
273+
_username,
274+
"nonldap{mail}".format(mail=settings.LDAP_AUTH_TEST_USER_EMAIL),
275+
settings.LDAP_AUTH_TEST_USER_PASSWORD)
276+
user.save()
277+
user_count_1 = User.objects.count()
278+
self.assertEqual(User.objects.get(username=_username).is_active, True)
279+
call_command("ldap_clean_users", verbosity=0)
280+
user_count_2 = User.objects.count()
281+
self.assertEqual(user_count_1, user_count_2)
282+
self.assertEqual(User.objects.get(username=_username).is_active, False)
283+
284+
"""
285+
Test with lookup
286+
"""
287+
# Reactivate user
288+
user = User.objects.get(username=_username)
289+
user.is_active = True
290+
user.save()
291+
# Create second user
292+
_usernameLookup = "nonldaplookup{user}".format(user=settings.LDAP_AUTH_TEST_USER_USERNAME)
293+
user = User.objects.create_user(
294+
_usernameLookup,
295+
"nonldaplookup{mail}".format(mail=settings.LDAP_AUTH_TEST_USER_EMAIL),
296+
settings.LDAP_AUTH_TEST_USER_PASSWORD)
297+
user.save()
298+
user_count_1 = User.objects.count()
299+
self.assertEqual(User.objects.get(username=_usernameLookup).is_active, True)
300+
# Clean second user
301+
call_command("ldap_clean_users", _usernameLookup, verbosity=0)
302+
user_count_2 = User.objects.count()
303+
self.assertEqual(user_count_1, user_count_2)
304+
self.assertEqual(User.objects.get(username=_usernameLookup).is_active, False)
305+
self.assertEqual(User.objects.get(username=_username).is_active, True)
306+
# Reactivate second user
307+
user = User.objects.get(username=_usernameLookup)
308+
user.is_active = True
309+
user.save()
310+
# Clean first user
311+
call_command("ldap_clean_users", _username, verbosity=0)
312+
self.assertEqual(User.objects.get(username=_username).is_active, False)
313+
self.assertEqual(User.objects.get(username=_usernameLookup).is_active, True)
314+
# Lookup a non existing user (raise a CommandError)
315+
with self.assertRaises(CommandError):
316+
call_command("ldap_clean_users", 'doesnonexist', verbosity=0)
317+
318+
"""
319+
Test with superuser
320+
"""
321+
# Reactivate first user and promote to superuser
322+
user = User.objects.get(username=_username)
323+
user.is_active = True
324+
user.is_superuser = True
325+
user.save()
326+
# Reactivate second user
327+
user = User.objects.get(username=_usernameLookup)
328+
user.is_active = True
329+
user.save()
330+
call_command("ldap_clean_users", superuser=False, verbosity=0)
331+
self.assertEqual(User.objects.get(username=_username).is_active, True)
332+
self.assertEqual(User.objects.get(username=_usernameLookup).is_active, False)
333+
call_command("ldap_clean_users", superuser=True, verbosity=0)
334+
self.assertEqual(User.objects.get(username=_username).is_active, False)
335+
336+
"""
337+
Test with staff user
338+
"""
339+
# Reactivate first user and promote to staff
340+
user = User.objects.get(username=_username)
341+
user.is_active = True
342+
user.is_superuser = False
343+
user.is_staff = True
344+
user.save()
345+
# Reactivate second user
346+
user = User.objects.get(username=_usernameLookup)
347+
user.is_active = True
348+
user.save()
349+
call_command("ldap_clean_users", staff=False, verbosity=0)
350+
self.assertEqual(User.objects.get(username=_username).is_active, True)
351+
self.assertEqual(User.objects.get(username=_usernameLookup).is_active, False)
352+
call_command("ldap_clean_users", staff=True, verbosity=0)
353+
self.assertEqual(User.objects.get(username=_username).is_active, False)
354+
355+
def testCleanUsersPurge(self):
356+
"""
357+
ldap_clean_users management command test with purge argument
358+
"""
359+
from django.contrib.auth import get_user_model
360+
User = get_user_model()
361+
user = User.objects.create_user(
362+
"nonldap{user}".format(user=settings.LDAP_AUTH_TEST_USER_USERNAME),
363+
"nonldap{mail}".format(mail=settings.LDAP_AUTH_TEST_USER_EMAIL),
364+
settings.LDAP_AUTH_TEST_USER_PASSWORD)
365+
user.save()
366+
user_count_1 = User.objects.count()
367+
call_command("ldap_clean_users", verbosity=0, purge=True)
368+
user_count_2 = User.objects.count()
369+
self.assertGreater(user_count_1, user_count_2)
370+
371+
def testCleanUsersCommandOutput(self):
372+
# Test without purge
373+
out = StringIO()
374+
from django.contrib.auth import get_user_model
375+
User = get_user_model()
376+
user = User.objects.create_user(
377+
"nonldap{user}".format(user=settings.LDAP_AUTH_TEST_USER_USERNAME),
378+
"nonldap{mail}".format(mail=settings.LDAP_AUTH_TEST_USER_EMAIL),
379+
settings.LDAP_AUTH_TEST_USER_PASSWORD)
380+
user.save()
381+
call_command("ldap_clean_users", stdout=out, verbosity=1)
382+
rows = out.getvalue().split("\n")[:-1]
383+
self.assertEqual(len(rows), 1)
384+
for row in rows:
385+
self.assertRegex(row, r'^Deactivated ')
386+
# Reset for next test
387+
user.delete()
388+
out.truncate(0)
389+
out.seek(0)
390+
# Test with purge
391+
user = User.objects.create_user(
392+
"nonldap{user}".format(user=settings.LDAP_AUTH_TEST_USER_USERNAME),
393+
"nonldap{mail}".format(mail=settings.LDAP_AUTH_TEST_USER_EMAIL),
394+
settings.LDAP_AUTH_TEST_USER_PASSWORD)
395+
user.save()
396+
call_command("ldap_clean_users", stdout=out, verbosity=1, purge=True)
397+
rows = out.getvalue().split("\n")[:-1]
398+
self.assertEqual(len(rows), 1)
399+
for row in rows:
400+
self.assertRegex(row, r'^Purged ')
401+
402+
def testReCleanUsersDoesntRecreateUsers(self):
403+
from django.contrib.auth import get_user_model
404+
User = get_user_model()
405+
user = User.objects.create_user(
406+
"nonldap{user}".format(user=settings.LDAP_AUTH_TEST_USER_USERNAME),
407+
"nonldap{mail}".format(mail=settings.LDAP_AUTH_TEST_USER_EMAIL),
408+
settings.LDAP_AUTH_TEST_USER_PASSWORD)
409+
user.save()
410+
call_command("ldap_clean_users", verbosity=0, purge=True)
411+
user_count_1 = User.objects.count()
412+
call_command("ldap_clean_users", verbosity=0, purge=True)
413+
user_count_2 = User.objects.count()
414+
self.assertEqual(user_count_1, user_count_2)

0 commit comments

Comments
 (0)