-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcreate_ds.py
361 lines (303 loc) · 13.8 KB
/
create_ds.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
import fitz
import os
import re
from collections import defaultdict
from nltk import word_tokenize
import csv
import string
def read_pdf_into_chapters(
path=os.path.join(os.getcwd(), "data", "raw", "book-legal.pdf")
):
"""Parse the pdf version of 'The Glannon Guide To Civil Procedure' to get the raw text.
Separate the content into chapters
Args:
path (string, optional): Path to book pdf. Defaults to os.path.join(os.getcwd(), 'data', 'raw', 'book-legal.pdf').
Returns:
dict: Chapter separated text.
"""
def clean_text(text):
"""
normalize the text.
"""
return text.replace("\t", " ").replace(" ", " ")
chapter_dict = {}
with fitz.open(path) as f:
chapter_names = [
c for c in f.get_toc() if c[0] == 1
] # Remove first chapter subchapters which are unnecesary. Other subchapters are not detected
for i in range(
len(chapter_names) - 1
): # Last chapter is index chapter and therefore not mandatory to collect
chapter_content = ""
start_page = chapter_names[i][2] - 1 # Page of chapter start
end_page = chapter_names[i + 1][2] # Page of chapter end
for page in f.pages(start_page, end_page, 1):
chapter_content += (
clean_text(page.getText()) + "\n"
) # Add page content to chapter content for later processing
chapter_dict[
chapter_names[i][1]
] = chapter_content # Add chapter content to dictionary
return chapter_dict
def write_into_csv(
file_path,
subchapter_list,
title_list=["question", "answer", "solution", "analysis", "explanation"],
):
"""Write the parsed content in a dataset format into a csv file .
Args:
file_path (string): path to file
subchapter_list (list): dataset entries
title_list (list, optional): header. Defaults to ["question", "answer", "solution", "analysis", "explanation"].
"""
with open(file_path, "w") as csv_file:
writer = csv.writer(csv_file, delimiter="\t", quotechar="|")
writer.writerow(title_list)
for entry in subchapter_list:
writer.writerow(entry)
def get_title_name(raw_title_and_explanation, chars_per_line_length=61):
"""Get the title of the subchapter by evaluating the char length per line.
The text is written "Blocksatz" formatting stratching the words per line to fit the whole line.
Comparing the char length would allow to differ between the bigger written subchapter header and the smaller written explanation.
Furthermore could the title be treated as the first paragraph of the subchapter.
Args:
raw_title_and_explanation (string): raw text of the subchapter
Returns:
title (string): title of the subchapter
explanation (string): explanation of the subchapter
"""
title = ""
explanation = ""
first_explanation_line = -1
raw_title_and_explanation_list = raw_title_and_explanation.split("\n")
for i in range(len(raw_title_and_explanation_list)):
if len(raw_title_and_explanation_list[i]) < chars_per_line_length:
title += (
raw_title_and_explanation_list[i] + " "
) # First paragraph is title of subchapter
else:
first_explanation_line = i # + 1
break
explanation = " ".join(raw_title_and_explanation_list[first_explanation_line:])
return title, explanation
def split_char_dot_pattern(raw):
"""
Split the raw text at the recurring keyword pattern "<Character>."
"""
return re.split(
r"\n[A-H]\. ", raw
) # A letter after H is not occuring in the text as Headliner but as Normal letter.
def process_chapter(raw_chapter, chapter_index):
"""Divide the chapter into its subchapters. Moreover divide and annotate each subchapter into the 4 parts: explanation, question, answers, analysis
Args:
raw_chapter (string): raw text
chapter_index (int): Index of the chapter
Returns:
dict: dict of lists. Each subchapter is an key with a list containing the separated parts: explanation, question, answers, analysis.
"""
def fallback():
return {} # defaultdict fallback option
def parse_chapter_solutions(raw_solutions, number_of_questions):
"""
Parse the solutions subchapter and put the solution character in a list.
Transforms the character into a number (0,1).
"""
solutions = []
for solution in re.split(
r"\n[0-9]+\. ",
raw_solutions,
)[1:number_of_questions]:
solution_char = [
sol
for sol in word_tokenize(solution)
if len(sol) == 1 and sol in string.ascii_uppercase
][-1] # Last Token in split is the solution character; Hacky solution but book is not consistent with annotations
solution_number = [
index
for index, char in enumerate(string.ascii_uppercase)
if char == solution_char
][0] # Transform the character into a number to make parsing easier
solutions.append(solution_number)
return solutions
def add_to_dict(question, answer, analysis):
"""Add new entry to output dict
Args:
question (string): parsed question
answer (string): parsed answer
analysis (string): parsed analysis
"""
if (
"question" in chapter_dict[last_chapter_name].keys()
): # If multiple questons are in the same chapter append them
chapter_dict[last_chapter_name]["question"].append(question)
chapter_dict[last_chapter_name]["answers"].append(answer)
chapter_dict[last_chapter_name]["analysis"].append(analysis)
else:
chapter_dict[last_chapter_name]["question"] = [question]
chapter_dict[last_chapter_name]["answers"] = [answer]
chapter_dict[last_chapter_name]["analysis"] = [analysis]
def special_rules():
"""Lookup the correct keyword pattern if analysis split is not available. keyword pattern were added manually through looking it up in the book
Returns:
string: keyword pattern
"""
if chapter_index == 5:
keyword_pattern = "There is a significant difference here that would likely lead to a different\n\noutcome."
elif chapter_index == 9:
keyword_pattern = "Let’s start in the middle and nibble up and down. "
else:
print("Index not in list with special rules. Exit")
exit()
return keyword_pattern
last_chapter_name = ""
chapter_dict = defaultdict(fallback)
solutions = [] # store the solutions of the questions
chapter_order = (
[]
) # Store chapter names in order to assing the correct solution (getting at the end of the chapter) to the correct question
for section in re.split(
r"QUESTION\d*.", raw_chapter
): # Divide chapter into sections between the stable keyword pattern "Question <Number>."
if last_chapter_name == "": # First chapter border case
title_split = split_char_dot_pattern(section)[
-1
] # Last section is the title and explanation. The first sections is Chapter overview
new_chapter_name, explanation = get_title_name(title_split)
chapter_dict[new_chapter_name]["explanation"] = explanation
elif "Glannon’s Picks" in section: # Last chapter border case
analysis_split = re.split(r"\n*ANALYSIS\. ", section, re.MULTILINE)
if len(analysis_split) == 1:
analysis_split = re.split(
"\n*%s" % (special_rules()), section, re.MULTILINE
)
subchapter_question_and_answer_split = split_char_dot_pattern(
analysis_split[0]
) # split question and question answers from next chapter title and explanation
analysis_solution_split = re.split(
r"Glannon’s Picks", " ".join(analysis_split[1:]), re.MULTILINE
) # Split at Keyword Glannon's Picks
add_to_dict(
subchapter_question_and_answer_split[0],
subchapter_question_and_answer_split[1:],
analysis_solution_split[0],
)
solutions = parse_chapter_solutions(
analysis_solution_split[1], len(chapter_order) + 1
) # Part after subchapter title
else: # Normal/Middle chapter case
print("main case start")
analysis_split = re.split(r"\n*ANALYSIS\. ", section, re.MULTILINE)
if len(analysis_split) == 1:
analysis_split = re.split(
"\n*%s" % (special_rules()), section, re.MULTILINE
)
subchapter_question_and_answer_split = split_char_dot_pattern(
analysis_split[0]
) # split question and question answers from next chapter title and explanation
subchapter_title_and_explanation_split = split_char_dot_pattern(
analysis_split[1]
) # split question and analysis from next chapter title and explanation
if (
len(subchapter_title_and_explanation_split) == 1
): # No next chapter title and explanation
add_to_dict(
subchapter_question_and_answer_split[0],
subchapter_question_and_answer_split[1:],
analysis_split[1],
)
chapter_order.append(new_chapter_name)
continue # No new chapter therfore no new name assignments
else:
new_chapter_name, explanation = get_title_name(
subchapter_title_and_explanation_split[-1]
)
add_to_dict(
subchapter_question_and_answer_split[0],
subchapter_question_and_answer_split[1:],
" ".join(subchapter_title_and_explanation_split[0:-1]),
)
chapter_dict[new_chapter_name]["explanation"] = explanation
last_chapter_name = new_chapter_name
chapter_order.append(new_chapter_name)
for chapter_name, solution in zip(
chapter_order, solutions
): # Add solutions to the chapter_dict
if "solution" not in chapter_dict[chapter_name].keys():
chapter_dict[chapter_name]["solution"] = [solution]
else:
chapter_dict[chapter_name]["solution"].append(solution)
return chapter_dict
def parse_chapter_26(raw_chapter):
"""Divide the special chapter 26 into its subchapters. Moreover divide and annotate each subchapter into the 4 parts: explanation, question, answers, analysis
Args:
raw_chapter (string): raw text
Returns:
dict: dict of lists. Each subchapter is an key with a list containing the separated parts: explanation, question, answers, analysis.
"""
chapter_dict = {}
questions, solutions = re.split(r"Glannon’s Picks", raw_chapter, re.MULTILINE)
for index, section in enumerate(
re.split(r"QUESTION\d*.", questions)[1:]
): # Ignore chapter introduction
question_and_answers_split = split_char_dot_pattern(section)
chapter_dict[index] = {
"question": [question_and_answers_split[0]],
"answers": [question_and_answers_split[1:]],
"solution": [-1],
"explanation": "",
}
for index, section in enumerate(re.split(r"\n\d{1,2}\. ", solutions)[1:17]):
if index == 16:
chapter_dict[index]["analysis"] = [section[:-3]] # Ignore last three lines
else:
chapter_dict[index]["analysis"] = [section]
return chapter_dict
def generate_ds_pairs(chapter_dict):
"""Reformat the parsed content from a multiple choice question type into a binary question type
Args:
chapter_dict (list): chapter content divided into its parts
Returns:
list: list of dataset entries
"""
def clean_text(text):
return text.replace("\n", " ").replace(" ", " ")
ds_entries = []
for subchapter in chapter_dict.keys():
for question, answers, analysis, solution in zip(
chapter_dict[subchapter]["question"],
chapter_dict[subchapter]["answers"],
chapter_dict[subchapter]["analysis"],
chapter_dict[subchapter]["solution"],
):
for index, answer in enumerate(answers):
answer_solution = 1 if index == solution else 0
answer = answer.replace("\n", " ").replace(" ", " ") # Cleaning
ds_entries.append(
[
clean_text(question),
clean_text(answer),
answer_solution,
clean_text(analysis),
clean_text(chapter_dict[subchapter]["explanation"]),
]
)
return ds_entries
if __name__ == "__main__":
chapter_dict = read_pdf_into_chapters()
for index, key in enumerate(chapter_dict.keys()):
if index in [
0,
27,
]: # Chapter 0 and 27 can be ignored as they do not entries for the final dataset
continue
if (
index == 26
): # Chapter 26 does not follows the structure of the other chapters.
chapter = parse_chapter_26(chapter_dict[key])
else:
chapter = process_chapter(chapter_dict[key], index)
ds_pairs = generate_ds_pairs(chapter)
write_into_csv(
os.path.join(os.getcwd(), "data", "processed", key + "_ds_pairs.csv"),
ds_pairs,
)