-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathauth.py
97 lines (77 loc) · 3.7 KB
/
auth.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
from django.core.exceptions import SuspiciousOperation, PermissionDenied
from mozilla_django_oidc import auth
from django.contrib import messages
import logging
import secrets
from django.conf import settings
from .models import sync_mailing_list_membership
logger = logging.getLogger(__name__)
class NotInKafkaUsers(PermissionDenied):
pass
class HopskotchOIDCAuthenticationBackend(auth.OIDCAuthenticationBackend):
"""Subclass Mozilla's OIDC Auth backend for custom hopskotch behavior. """
def __init__(self):
auth.OIDCAuthenticationBackend.__init__(self)
self.kafka_user_auth_group = settings.KAFKA_USER_AUTH_GROUP
def filter_users_by_claims(self, claims):
username = self.get_username(claims)
if not username:
return self.UserModel.objects.none()
return self.UserModel.objects.filter(username=username)
def get_username(self, claims):
return claims.get("sub")
def get_email(self, claims):
email = ""
if "email" in claims:
email = claims.get("email")
elif "email_list" in claims:
email = claims.get("email_list")
if isinstance(email, list):
email = email[0]
return email
def verify_claims(self, claims):
logger.info(f"all claims: {claims}")
if "is_member_of" not in claims:
log_event_id = secrets.token_hex(8)
msg = f"Your account is missing LDAP claims. Are you sure you used the account you use for SCIMMA? Error ID: {log_event_id}"
logger.error(f"account is missing LDAP claims, error_id={log_event_id}, claims={claims}")
raise PermissionDenied(msg)
for group in [self.kafka_user_auth_group]:
if not is_member_of(claims, group):
name = claims.get('name', 'Unknown')
id = claims.get('sub', 'Unknown')
email = claims.get('email', 'Unknown')
msg = f"User vo_display_name={name}, vo_person_id={id}, email={email} is not in {group}, but requested access"
logger.error(msg)
raise NotInKafkaUsers(msg)
if "email" in claims:
return True
if "email_list" in claims and len(claims.get("email_list", [])) > 0:
return True
log_event_id = secrets.token_hex(8)
msg = f"Your account is missing an email claim. Error ID: {log_event_id}"
logger.error(f"account is missing LDAP email claims, error_id={log_event_id}, claims={claims}")
raise PermissionDenied(msg)
def create_user(self, claims):
return self.UserModel.objects.create(
username=self.get_username(claims),
email=self.get_email(claims),
is_staff=is_member_of(claims, '/SCiMMA Developers'),
first_name=claims.get('given_name', ''),
last_name=claims.get('family_name', ''),
)
def update_user(self, user, claims):
user.first_name = claims.get('given_name', '')
user.last_name = claims.get('family_name', '')
user.email = self.get_email(claims)
user.is_staff = is_member_of(claims, '/SCiMMA Developers')
user.save()
# Putting this here is a bit of a hack, and slows down the login process, but
# deals with the case of a user's mailing list membership being altered externally.
# Doing this once on login is a trade-off, as it will miss external changes while
# the user is logged in, but avoids making repeated external requests
sync_mailing_list_membership(user, settings.OPENMMA_MAILINGLIST)
return user
def is_member_of(claims, group):
logger.info(f"all claims: {claims}")
return group in claims.get('is_member_of', [])