-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathmain.py
More file actions
246 lines (229 loc) · 11.1 KB
/
main.py
File metadata and controls
246 lines (229 loc) · 11.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
import datetime, argparse
import dateutil.parser
from pathlib import Path
import tqdm
import logging
logging.basicConfig(format='%(levelname)s:%(message)s', level=logging.ERROR)
logging.addLevelName(logging.WARNING, "\033[33m%s\033[0m" % logging.getLevelName(logging.WARNING))
logging.addLevelName(logging.ERROR, "\033[31m%s\033[0m" % logging.getLevelName(logging.ERROR))
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
class DuplicateFilter(logging.Filter):
msgs = set()
def filter(self, record):
retVal = record.msg not in self.msgs
self.msgs.add(record.msg)
return retVal
logger.addFilter(DuplicateFilter())
from lib.spreadsheetReader import getRows
from lib.printing import printReport, makeCsvSummary
from lib.constants import INFO_KEY, GRADES_KEY, ASSIGNMENTS_KEY, ALL_DEFAULT_FILTERS, GRADE_NOT_PRESENT_ANNOTS
from lib.mung import IncorrectFormatException, checkAndClean
from lib.config import loadConfig
def sourceToGrades(sourceConfigObj, studentAttrDict):
'''returns [(Student, [Grade])]'''
sourcePath = Path(sourceConfigObj['file'])
rows = getRows(sourcePath, isRoster=sourceConfigObj.get("isRoster", False), sheetName=sourceConfigObj["sheetName"])
identDict = sourceConfigObj["attributes"]
sourceConfigReader = sourceConfigObj[ASSIGNMENTS_KEY]
outputList = []
for record in rows:
studentInfo = {}
for (identCol, internalName) in identDict.items():
identVal = record[identCol]
try:
studentInfo[internalName] = checkAndClean(identVal, studentAttrDict[internalName]['filters'])
except IncorrectFormatException:
logger.info(f"in file {sourcePath}, invalid value for {internalName}: '{identVal}'")
logger.info(f"skipping this field; may result in an UnidentifiableStudentException later")
grades = {}
for assignment in sourceConfigReader:
scoreCol = assignment.get('scoreCol', None)
if scoreCol == None:
# Full credit for completion (i.e. being in the spreadsheet at all)
score = assignment['max_points']
else:
try:
score = record[scoreCol]
except:
logger.error(f"In file '{str(sourcePath)}', expected score column '{scoreCol}' for assignment '{assignment['name']}' not in record '{record}'")
try:
score = checkAndClean(score, assignment.get('filters', ALL_DEFAULT_FILTERS))
except IncorrectFormatException:
logger.error(f"in file {sourcePath}, unreadable score for score column {scoreCol}: '{score}'")
annotations = {}
if "due_date" in assignment:
dueDatetime = dateutil.parser.parse(assignment['due_date'])
turnedInStr = record[assignment['timestampCol']]
try:
turninDatetime = dateutil.parser.parse(turnedInStr)
except ValueError:
# Try reading as an xlsx timestamp instead
# https://gist.github.com/erikvullings/825283249a5b4617d0f36bcba4fa8be8
utcTime = (float(turnedInStr) - 25569) * 86400
turninDatetime = datetime.datetime.utcfromtimestamp(utcTime)
if turninDatetime > dueDatetime:
score = 0
annotations['shortAnnot'] = f'late - received {turninDatetime.strftime("%b %d, %T")}'
annotations['longAnnot'] = f'due {dueDatetime.strftime("%b %d, %T")}; received {turninDatetime.strftime("%b %d, %T")}'
grades[assignment['name']] = (score, annotations)
outputList.append((studentInfo, grades))
return outputList
def findPrimaryAttr(attrDict):
for (attr, flags) in attrDict.items():
if flags["identifiesStudent"] and flags["onePerStudent"]:
return attr
logger.error("no primary student identifier")
class UnidentifiableStudentException(Exception):
pass
def getStudentID(studentAttrDict, primaryAttr, roster, studentInfo):
if primaryAttr in studentInfo:
return studentInfo[primaryAttr]
for (key, val) in studentInfo.items():
identifiesStudent = studentAttrDict[key]['identifiesStudent']
if not identifiesStudent:
continue
if val in roster[key]:
return roster[key][val]
raise UnidentifiableStudentException()
def mergeIntoRoster(studentAttrDict, primaryAttr, roster, studentInfo, studentID):
try:
checkMerge(studentAttrDict, primaryAttr, roster, studentInfo, studentID)
except Exception as e:
logger.warning(e)
return
if studentID not in roster[primaryAttr]:
roster[primaryAttr][studentID] = {INFO_KEY: {}, GRADES_KEY: {}}
oldInfo = roster[primaryAttr][studentID][INFO_KEY]
for (key, val) in studentInfo.items():
if key == primaryAttr:
continue
onePerStudent = studentAttrDict[key]["onePerStudent"]
if onePerStudent:
if key not in oldInfo:
oldInfo[key] = val
else:
if oldInfo[key] != val:
raise Exception("This state should be unreachable (see checkMerge)")
else:
if key not in oldInfo:
oldInfo[key] = set([val])
else:
oldInfo[key].add(val)
identifiesStudent = studentAttrDict[key]['identifiesStudent']
if identifiesStudent:
if key not in roster:
roster[key] = {}
if val not in roster[key]:
roster[key][val] = studentID
elif roster[key][val] != studentID:
raise Exception("This state should be unreachable (see checkMerge)")
def checkMerge(studentAttrDict, primaryAttr, roster, studentInfo, studentID):
if studentID not in roster[primaryAttr]:
oldInfo = {}
else:
oldInfo = roster[primaryAttr][studentID][INFO_KEY]
for (key, val) in studentInfo.items():
if key == primaryAttr:
continue
onePerStudent = studentAttrDict[key]["onePerStudent"]
if onePerStudent:
if key not in oldInfo:
pass
else:
if oldInfo[key] != val:
raise Exception(f"refusing to overwrite singleton value for {key} from {oldInfo[key]} to {val}")
identifiesStudent = studentAttrDict[key]['identifiesStudent']
if identifiesStudent:
if key not in roster:
pass
elif val not in roster[key]:
pass
elif roster[key][val] != studentID:
raise Exception(f"refusing to reassign ({key}: {val}) from {roster[key][val]} to {studentID}")
def gatherData(globalConfigObj):
studentAttrDict = globalConfigObj["studentAttributes"]
primaryAttr = findPrimaryAttr(studentAttrDict)
roster = {k:{} for k in studentAttrDict}
sourceConfigList = globalConfigObj["sources"]
allAssignments = {}
for obj in sourceConfigList:
for assignmentData in obj[ASSIGNMENTS_KEY]:
allAssignments[assignmentData["name"]] = assignmentData
data = []
for obj in sourceConfigList:
data += sourceToGrades(obj, studentAttrDict)
while True:
newDataMerged = False
failedToMerge = []
for (studentInfo, grades) in data:
try:
studentID = getStudentID(studentAttrDict, primaryAttr, roster, studentInfo)
except UnidentifiableStudentException:
failedToMerge.append((studentInfo, grades))
continue
newDataMerged = True
mergeIntoRoster(studentAttrDict, primaryAttr, roster, studentInfo, studentID)
for (k,v) in grades.items():
oldGrade = roster[primaryAttr][studentID][GRADES_KEY].get(k,(-float('inf'), None))
if oldGrade != (-float('inf'), None):
logger.warning(f'Duplicate grade: {(oldGrade, studentInfo, grades)}')
# Always keep the highest grade for each assignment (TODO: replace with more flexible policy?)
if type(v[0]) != str:
roster[primaryAttr][studentID][GRADES_KEY][k] = max(oldGrade, v)
else:
roster[primaryAttr][studentID][GRADES_KEY][k] = v
if newDataMerged == False or len(failedToMerge) == 0:
break
data = failedToMerge[:]
for (studentInfo, _) in failedToMerge:
logger.warning(f"could not identify student ({studentInfo})")
return (roster[primaryAttr], allAssignments)
def shouldPrint(printFilters, studentInfo):
for attr in printFilters:
if attr not in studentInfo:
return False
return True
#NOTE: we assume all assignments in a category are weighted equally by percentage,
#i.e. getting a 10/20 and a 1/2 contribute the same in all aggregations #TODO: offer alternatives?
def postprocess(actions, students, allAssignments):
for action in actions:
if action['action'] != "dropLowest":
raise Exception("Unknown action in 'processing' field (only dropLowest is supported)")
category = action['type']
dropCount = action.get('dropCount', 1)
for student in students:
studentData = student[1]
grades = []
for (assignmentName, assignmentData) in allAssignments.items():
if assignmentData['type'] == category:
(score, annot) = studentData[GRADES_KEY].get(assignmentName, (0, GRADE_NOT_PRESENT_ANNOTS))
grades.append((score/assignmentData['max_points'], annot))
grades.sort()
for grade in grades[:dropCount]:
annot = grade[1]
annot['dropped'] = True
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument('filename', metavar='CONFIG_FILE', type=str,
help='The .json file describing your class.')
parser.add_argument('-p', '--pdf', action='store_true', help='Generate pdf reports')
parser.add_argument('-w', '--wkhtmltopdf-path', help='Path to wkhtmltopdf executable')
args = parser.parse_args()
globalConfigObj = loadConfig(args.filename)
(gradebook, allAssignments) = gatherData(globalConfigObj)
students = gradebook.items()
if args.pdf:
# Attach progress bar only if generating pdfs (which is slow). Non-pdf
# version is fast enough that progress bar is just unnecessary clutter
students = tqdm.tqdm(students)
printFilters = []
for (k,v) in globalConfigObj["studentAttributes"].items():
if v.get("onlyPrintIfPresent", False):
printFilters.append(k)
postprocess(globalConfigObj['processing'], students, allAssignments)
makeCsvSummary(list(globalConfigObj["studentAttributes"].keys()), students, allAssignments, globalConfigObj["outputs"]) #TODO drop lowest?
for (studentIdentifier, studentData) in students:
if shouldPrint(printFilters, studentData[INFO_KEY]):
printReport(studentIdentifier, studentData, allAssignments, globalConfigObj["outputs"], args.pdf, args.wkhtmltopdf_path)
# logger.info("reports generated in folder 'reports/'")