-
Notifications
You must be signed in to change notification settings - Fork 17
/
Copy pathcore.py
285 lines (249 loc) · 9.95 KB
/
core.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
import csv
import sys
import logging
from .grafana import *
from .ldap import *
from ldap3.core.exceptions import LDAPSocketOpenError
from requests.exceptions import ConnectionError
from .config import *
logger = logging.getLogger()
PERMISSION_MAP = {
"View": 1,
"Edit": 2,
"Admin": 4
}
configuration = ""
def read_csv(file):
"""
Reads the csv-file defined in CSV_FILE and returns it as a 2-dimensional array.
:return: The given csv file parsed into a 2-dimensional array.
"""
try:
with open(file, newline='') as f:
reader = csv.reader(f)
data = list(reader)
except FileNotFoundError as e:
logger.error("Binding-file %s does not exist!", configuration.LDAP_BINDING_FILE)
return data
def read_mapping_from_csv(bind):
"""
Calls read_csv() and parses the loaded array into a dictionary. The dictionary is defined as follows:
{
"teams": {
*team-name*: {
"ldap": []
},
....
},
"folders: {
*folder-id*: {
"name": *folder-name*,
"permissions": [
{
"teamId": *team-name*,
"permission0: *permission*"
},
....
]
},
...
}
:return: The csv's contents parsed into a dictionary as described above.
"""
result = {"teams": {}, "folders": {}}
csv_content = read_csv(bind)
is_header = True
for line in csv_content:
if not is_header:
ldap = line[0]
team = line[1]
folder_name = line[3]
folder_uuid = line[4]
permission = line[5]
permission_for_viewer = line[6]
if not team in result["teams"]:
result["teams"][team] = {"ldap": []}
if not ldap in result["teams"][team]["ldap"]:
result["teams"][team]["ldap"].append(ldap)
if not folder_uuid in result["folders"]:
result["folders"][folder_uuid] = {"name": folder_name, "permissions": []}
access = {"teamId": team, "permission": permission}
if not access in result["folders"][folder_uuid]["permissions"]:
result["folders"][folder_uuid]["permissions"].append(access)
viewer_access = {"role": "Viewer", "permission": permission_for_viewer}
if permission_for_viewer != "" and not viewer_access in result["folders"][folder_uuid]["permissions"]:
result["folders"][folder_uuid]["permissions"].append(viewer_access)
else:
is_header = False
return result
def add_users(grafana_team, ldap_groups):
"""
Takes the name of a grafana-team and a list of ldap groups. Returns an array of users which are part of the
given ldap groups but not part of the given grafana-team.
:param grafana_team: The name of the grafana-team the users should be added to.
:param ldap_groups: The names of the ldap-groups the users should be added from.
:return: An array containing all users that need to be added to the grafana-team.
"""
grafana_users = get_members_of_team(grafana_team)
for ldap_group in ldap_groups:
ldap_users = get_users_of_group(ldap_group)
for user in ldap_users:
if user not in grafana_users:
if not login_taken(user["login"]):
create_user_with_random_pw(user)
add_user_to_team(user["login"], grafana_team)
def remove_users(grafana_team, ldap_groups):
"""
Takes the name of a grafana-team and a list of ldap groups. Returns an array of users which are part of the given
grafana-team but not of any of the given ldap-groups.
:param grafana_team: The name of the grafana-team the users should be removed from.
:param ldap_groups: The names of the ldap groups containing the users that should be present in the grafana-group.
:return: An array containing all users that need to be removed from the grafana-team.
"""
grafana_users = get_members_of_team(grafana_team)
ldap_users = []
for ldap_group in ldap_groups:
users_of_group = get_users_of_group(ldap_group)
for user in users_of_group:
if user not in ldap_users:
ldap_users.append(user)
for user in grafana_users:
if user not in ldap_users:
try:
remove_member_from_team(grafana_team, user["login"])
except GrafanaClientError:
break
def update_teams(team_mapping):
"""
Creates a grafana team from a given team mapping if the team does not exist yet.
A team mapping is a dictionary defined as follows:
*team-name*: {
"ldap": [
*mapped-ldap-group1*,
*mapped-ldap-group2*,
]
}
Also all users of the provided LDAP-groups are added.
:param team_mapping: A dictionary that resembles a team as described above.
"""
for team in team_mapping:
mapping = team_mapping[team]
# Create team if it does not exist.
if not get_id_of_team(team):
create_team(team, "")
# Add users to team, create account if necessary.
add_users(team, mapping["ldap"])
# Remove unwanted users from team.
remove_users(team, mapping["ldap"])
def update_folders(folders):
"""
Takes a dictionary resembling multiple folders and creates each folders with the provided name and uid.
The dictionary should be defined as follows:
{
*folder-id1*: {
"name": *folder-name*,
"permissions": [
{
"teamId": *team-name*,
"permission: *permission*"
},
....
},
*folder-id2*:....
}
:param folders:
"""
for folder_id in folders:
if not exists_folder(folder_id):
create_folder(folders[folder_id]["name"], folder_id)
permissions = folders[folder_id]["permissions"]
for permission in permissions:
permission["teamId"] = get_id_of_team(permission["teamId"]) if not "role" in permission else 0
permission["permission"] = PERMISSION_MAP[permission["permission"]]
update_folder_permissions(folder_id, permissions)
def delete_unmapped_teams(team_mappings):
"""
Deletes all teams which are not required anymore by the current mapping.
:param team_mappings: The dictionary found under the name "team" in the mapping-dictionary.
:return:
"""
all_teams = get_all_teams()
for team in all_teams:
exists = False
for team_name in team_mappings:
if team["name"] == team_name:
exists = True
if not exists:
delete_team_by_name(team["name"])
def get_users_of_used_ldap_groups(team_mappings):
"""
Retrieves all users which are part of groups defined in the team mappings.
:param team_mappings: The dictionary found under the name "team" in the mapping-dictionary.
:return: A List of all users needed to be created in grafana.
"""
users = []
for team in team_mappings:
for ldap_group in team_mappings[team]["ldap"]:
for user in get_users_of_group(ldap_group):
if not user in users:
users.append(user)
return users
def delete_unmapped_users(team_mappings):
"""
Deletes all users that are not needed by the current mapping.
:param team_mappings: The dictionary found under the name "team" in the mapping-dictionar
"""
grafana_users = get_all_users()
ldap_users = get_users_of_used_ldap_groups(team_mappings)
for user in grafana_users:
exists = False
for ldap_user in ldap_users:
if ldap_user["login"] == user["login"]:
exists = True
if not exists:
delete_user_by_login(user["login"])
def remove_unused_items(team_mappings):
"""
Deletes all teams and user accounts which are not needed anymore.
:param team_mappings: The dictionary found under the name "team" in the mapping-dictionary.
"""
delete_unmapped_teams(team_mappings)
delete_unmapped_users(team_mappings)
def startUserSync(config_path, bind, dry_run):
"""
Checks if a .lock file is currently present. If no .lock file is present, the updating of the grafana teams,
folders and users is performed.
If a .lock file is present, no action is performed.
"""
global configuration
if lock():
try:
logger.info("=================================================")
logger.info("Starting user synchronization...")
configuration = config(config_path)
configuration.DRY_RUN = dry_run
if configuration.DRY_RUN:
logger.info("!! DryRun enabled: Changes will not be applied !!")
logger.info("=================================================")
logger.info("Setting up the connection to the Grafana server..")
setup_grafana(configuration)
logger.info("Setting up the connection to the LDAP server..")
setup_ldap(configuration)
logger.info("Reading the user and team mappings..")
mapping = read_mapping_from_csv(bind)
logger.info("Updating the Grafana teams..")
update_teams(mapping["teams"])
logger.info("Updating the Grafana folders..")
update_folders(mapping["folders"])
logger.info("Removing unused teams and users..")
remove_unused_items(mapping["teams"])
logger.info("Task finished successfully!")
except LDAPSocketOpenError:
logger.error("Task aborted, unable to reach LDAP-Server.")
except ConnectionError:
logger.error("Task aborted, unable to reach Grafana-Server.")
except:
logger.error("An unexpected error occured: %s", str(sys.exc_info()))
unlock()
else:
logger.error("Task aborted, process is already active!")