-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfcr_classes.py
1748 lines (1562 loc) · 82.2 KB
/
fcr_classes.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# -*- coding: utf-8 -*-
# External import
import os
from dotenv import load_dotenv
import logging
from logging.handlers import RotatingFileHandler
import csv
import json
import xml.etree.ElementTree as ET
import pymarc
import re
from typing import Dict, List, Tuple, Optional, Union
from fuzzywuzzy import fuzz
# Internal imports
import fcr_func as fcf
from fcr_enum import *
# ----- Match Records imports -----
# Internal imports
import api.abes.Abes_id2ppn as id2ppn
import api.abes.Sudoc_SRU as ssru
import api.koha.Koha_SRU as Koha_SRU
# -------------------- Execution settings (ES) --------------------
class Execution_Settings(object):
def __init__(self, dir: str):
# Load analysis settings
with open(dir + "/json_configs/analysis.json", "r+", encoding="utf-8") as f:
self.analysis_json = json.load(f)
# Load marc fields
with open(dir + "/json_configs/marc_fields.json", "r+", encoding="utf-8") as f:
self.marc_fields_json = json.load(f)
# Set up subclasses
self.csv = self.CSV(self)
def load_env_values(self):
load_dotenv()
# General
self.lang = os.getenv("SERVICE")
if self.lang not in ["eng", "fre"]:
self.lang = "eng"
self.service = os.getenv("SERVICE")
self.file_path = os.getenv("FILE_PATH")
self.output_path = os.getenv("OUTPUT_PATH")
self.csv_cols_config_path = os.getenv("CSV_OUTPUT_JSON_CONFIG_PATH")
self.logs_path = os.getenv("LOGS_PATH")
self.log_level = os.getenv("LOG_LEVEL")
# Processing & operations
self.UI_change_processing(os.getenv("PROCESSING_VAL"))
self.get_operation()
# Database specifics
self.origin_url = os.getenv("ORIGIN_URL")
self.origin_database_mapping = os.getenv("ORIGIN_DATABASE_MAPPING")
self.target_url = os.getenv("TARGET_URL")
self.target_database_mapping = os.getenv("TARGET_DATABASE_MAPPING")
self.iln = os.getenv("ILN") # Better Item
self.rcr = os.getenv("RCR") # Better Item
self.filter1 = os.getenv("FILTER1") # Other DB in local DB
self.filter2 = os.getenv("FILTER2") # Other DB in local DB
self.filter3 = os.getenv("FILTER3") # Other DB in local DB
# UI specifics
self.UI_curr_database_mapping = self.origin_database_mapping
self.UI_curr_data = "id"
self.UI_curr_data_label = self.get_data_label_by_id(self.UI_curr_database_mapping, self.UI_curr_data)
self.UI_update_curr_data(self.UI_curr_data_label)
self.UI_curr_field = self.get_data_field_tags()[0]
self.UI_new_field = None
self.UI_curr_single_line_coded_data = None
self.UI_curr_filtering_subfield = None
self.UI_curr_subfields = None
self.UI_curr_positions = None
self.chosen_analysis = 0
self.define_chosen_analysis(0)
# ----- Methods for main -----
def generate_files_path(self):
self.file_path_out_results = self.output_path + "/resultats.txt"
self.file_path_out_json = self.output_path + "/resultats.json"
self.file_path_out_csv = self.output_path + "/resultats.csv"
def get_operation(self):
self.operation = PROCESSING_OPERATION_MAPPING[self.processing]
def define_chosen_analysis(self, nb: int):
self.chosen_analysis = self.analysis_json[nb]
self.chosen_analysis_checks = {}
if self.chosen_analysis["TITLE_MIN_SCORE"] > 0:
self.chosen_analysis_checks[Analysis_Checks.TITLE] = None
if self.chosen_analysis["PUBLISHER_MIN_SCORE"] > 0:
self.chosen_analysis_checks[Analysis_Checks.PUBLISHER] = None
if self.chosen_analysis["USE_DATE"]:
self.chosen_analysis_checks[Analysis_Checks.DATE] = None
# ----- Methods for UI -----
def UI_get_log_levels(self) -> List[str]:
"""Returns log levels as a list of str"""
return [lvl.name for lvl in Log_Level]
def UI_switch_lang(self):
"""Switch the two languages"""
if self.lang == "eng":
self.lang = "fre"
else:
self.lang = "eng"
def UI_update_curr_data(self, label=None):
"""Update the current data
Takes as argument :
- label : the data id label in selected language"""
if not label:
label = self.UI_curr_data_label
db = self.UI_curr_database_mapping
self.UI_curr_data = self.get_data_id_from_label(db, label)
id = self.UI_curr_data
self.UI_update_curr_field(self.get_data_field_tags(db, id)[0])
self.UI_update_curr_field_subvalues(db, id, self.UI_curr_field)
def UI_update_curr_data_label(self, id=None):
"""Update the current data label
Takes as argument :
- id : the data id"""
if not id:
id = self.UI_curr_data
self.UI_curr_data_label = self.get_data_label_by_id(id=id)
def UI_update_curr_field(self, tag: str):
"""Update the current data label"""
self.UI_curr_field = tag
def UI_update_curr_field_subvalues(self, db=None, id=None, tag=None):
"""Update the current field
Takes as argument :
- db : the database name in marc_fields.json as a string
- id : the data id
- tag : the MARC tag as a string"""
if not db:
db = self.UI_curr_database_mapping
if not id:
id = self.UI_curr_data
if not tag:
tag = self.UI_curr_field
self.UI_curr_single_line_coded_data = self.get_data_field_single_line_coded_data(db, id, tag)
self.UI_curr_filtering_subfield = self.get_data_field_filtering_subfield(db, id, tag)
self.UI_curr_subfields = self.get_data_field_subfields(db, id, tag)
self.UI_curr_positions = self.get_data_field_positions(db, id, tag)
def UI_reset_curr_field_subvalues(self):
"""Defaults all values for the current field"""
self.UI_curr_single_line_coded_data = False
self.UI_curr_filtering_subfield = ""
self.UI_curr_subfields = ""
self.UI_curr_positions = ""
def UI_rename_curr_data(self, new_name: str):
"""Updates the current data label.
Takes as argument the new label"""
self.UI_curr_data_label = new_name
def UI_change_curr_database_mapping(self, new_mapping: str):
"""Updates the current dataabse mapping.
Takes as argument the new mappnig"""
self.UI_curr_database_mapping = new_mapping
def UI_change_processing(self, processing_val: str):
"""Updates the current dataabse mapping.
Takes as argument the new val AS A STRING not a FCR_Processings entry"""
self.processing_val = processing_val
self.processing = FCR_Processings[self.processing_val]
def UI_update_main_screen_values(self, val:dict):
"""Updates all data from the UI inside this instance"""
self.service = val["SERVICE"]
self.log_level = val["LOG_LEVEL"]
self.file_path = val["FILE_PATH"]
self.output_path = val["OUTPUT_PATH"]
self.csv_cols_config_path = val["CSV_OUTPUT_JSON_CONFIG_PATH"]
self.logs_path = val["LOGS_PATH"]
self.UI_change_processing(val["PROCESSING_VAL"])
self.get_operation()
def UI_update_processing_configuration_values(self, val:dict):
"""Updates all data from the UI inside this instance"""
self.origin_url = val["ORIGIN_URL"]
self.target_url = val["TARGET_URL"]
self.iln = val["ILN"]
self.rcr = val["RCR"]
self.filter1 = val["FILTER1"]
self.filter2 = val["FILTER2"]
self.filter3 = val["FILTER3"]
self.origin_database_mapping = val["ORIGIN_DATABASE_MAPPING"]
self.target_database_mapping = val["TARGET_DATABASE_MAPPING"]
# ----- Methods for retrieving data from mappings -----
def UI_get_mappings_names(self) -> list:
"""Returns all mappings names as a list"""
return list(self.marc_fields_json.keys())
def get_data_id_from_label(self, db=None, label=None) -> str:
"""Returns the data id from marc field based on it's label.
Takes as argument :
- db : the database name in marc_fields.json as a string
- label : the data id label in selected language
Returns None if no match was found"""
if not db:
db = self.UI_curr_database_mapping
if not label:
label = self.UI_curr_data_label
for data in self.marc_fields_json[db]:
if self.marc_fields_json[db][data]["label"][self.lang] == label:
return data
return None
def get_data_labels_as_list(self, db=None) -> List[str]:
"""Returns all data labels from marc field as a list.
Takes as argument :
- db : the database name in marc_fields.json as a string"""
if not db:
db = self.UI_curr_database_mapping
return [self.marc_fields_json[db][key]["label"][self.lang] for key in self.marc_fields_json[db]]
def get_data_label_by_id(self, db=None, id=None) -> str:
"""Returns the label of a data
Takes as argument :
- db : the database name in marc_fields.json as a string
- id : the data id"""
if not db:
db = self.UI_curr_database_mapping
if not id:
id = self.UI_curr_data
return self.marc_fields_json[db][id]["label"][self.lang]
def get_data_field_tags(self, db=None, id=None) -> List[str]:
"""Returns the tags of each field from a data
Takes as argument :
- db : the database name in marc_fields.json as a string
- id : the data id"""
if not db:
db = self.UI_curr_database_mapping
if not id:
id = self.UI_curr_data
return [field["tag"] for field in self.marc_fields_json[db][id]["fields"]]
def retrieve_data_from_data_field_subvalues(self, attr:str, db=None, id=None, tag=None):
"""Mother function of get_marc_data_field + attribute.
Takes as argument :
- db : the database name in marc_fields.json as a string
- id : the data id
- tag : the MARC tag as a string
- attr : the attribute name (positions, subfields, etc.)"""
if not db:
db = self.UI_curr_database_mapping
if not id:
id = self.UI_curr_data
if not tag:
tag = self.UI_curr_field
fields = self.marc_fields_json[db][id]["fields"]
for field in fields:
if field["tag"] == tag:
return field[attr]
def get_data_field_single_line_coded_data(self, db=None, id=None, tag=None) -> bool:
"""Returns if the field is a sinngle line coded data
Takes as argument :
- db : the database name in marc_fields.json as a string
- id : the data id
- tag : the MARC tag as a string"""
if not db:
db = self.UI_curr_database_mapping
if not id:
id = self.UI_curr_data
if not tag:
tag = self.UI_curr_field
return self.retrieve_data_from_data_field_subvalues("single_line_coded_data", db, id, tag)
def get_data_field_filtering_subfield(self, db=None, id=None, tag=None) -> str:
"""Returns the filtering subfield of this field
Takes as argument :
- db : the database name in marc_fields.json as a string
- id : the data id
- tag : the MARC tag as a string"""
if not db:
db = self.UI_curr_database_mapping
if not id:
id = self.UI_curr_data
if not tag:
tag = self.UI_curr_field
return self.retrieve_data_from_data_field_subvalues("filtering_subfield", db, id, tag)
def get_data_field_subfields(self, db=None, id=None, tag=None) -> str:
"""Returns the subfields to export for this field
Takes as argument :
- db : the database name in marc_fields.json as a string
- id : the data id
- tag : the MARC tag as a string"""
if not db:
db = self.UI_curr_database_mapping
if not id:
id = self.UI_curr_data
if not tag:
tag = self.UI_curr_field
return ", ".join(self.retrieve_data_from_data_field_subvalues("subfields", db, id, tag))
def get_data_field_positions(self, db=None, id=None, tag=None) -> str:
"""Returns the positions to export for this field
Takes as argument :
- db : the database name in marc_fields.json as a string
- id : the data id
- tag : the MARC tag as a string"""
if not db:
db = self.UI_curr_database_mapping
if not id:
id = self.UI_curr_data
if not tag:
tag = self.UI_curr_field
return ", ".join(self.retrieve_data_from_data_field_subvalues("positions", db, id, tag))
# ----- Methods for retrieving data from analysis -----
def get_analysis_names_as_list(self):
"""Returns all analysis names as a list"""
return [this["name"] for this in self.analysis_json]
def get_analysis_index_from_name(self, name: str) -> int:
"""Returns the index of an analysis.
Takes as argument the name of the analysis"""
for index, this in enumerate(self.analysis_json):
if this["name"] == name:
return index
# --- Logger methods for other classes / functions ---
def init_logger(self):
"""Init the logger"""
self.log = self.Logger(self)
class Logger(object):
def __init__(self, parent) -> None:
self.parent:Execution_Settings = parent
par = self.parent
self.__init_logs(par.logs_path, par.service, par.log_level)
self.logger = logging.getLogger(par.service)
def __init_logs(self, logsrep,programme,niveau):
# logs.py by @louxfaure, check file for more comments
# D'aprés http://sametmax.com/ecrire-des-logs-en-python/
logsfile = logsrep + "/" + programme + ".log"
logger = logging.getLogger(programme)
logger.setLevel(getattr(logging, niveau))
# Formatter
formatter = logging.Formatter(u'%(asctime)s :: %(levelname)s :: %(message)s')
file_handler = RotatingFileHandler(logsfile, 'a', 10000000, 1, encoding="utf-8")
file_handler.setLevel(getattr(logging, niveau))
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
# For console
stream_handler = logging.StreamHandler()
stream_handler.setLevel(getattr(logging, niveau))
stream_handler.setFormatter(formatter)
logger.addHandler(stream_handler)
logger.info('Logger initialised')
def critical(self, msg:str):
"""Basic log critical function"""
self.logger.critical(f"{msg}")
def debug(self, msg:str):
"""Log a debug statement logging first the service then the message"""
self.logger.debug(f"{self.parent.service} :: {msg}")
def info(self, msg:str):
"""Basic log info function"""
self.logger.info(f"{msg}")
def simple_info(self, msg:str, data):
"""Log an info statement separating msg and data by :"""
self.logger.info(f"{msg} : {data}")
def big_info(self, msg:str):
"""Logs a info statement encapsuled between ----"""
self.logger.info(f"--------------- {msg} ---------------")
def error(self, msg:str):
"""Log a error statement logging first the service then the message"""
self.logger.error(f"{self.parent.service} :: {msg}")
# --- CSV methods for other classes / functions ---
class CSV(object):
def __init__(self, parent) -> None:
self.parent:Execution_Settings = parent
def create_file(self, original_file_cols:List[str]):
"""Create the CSV file and the DictWriter.
Takes as argument the list of columns from the original file"""
self.file_path = self.parent.file_path_out_csv
self.file = open(self.file_path, "w", newline="", encoding='utf-8')
with open(self.parent.csv_cols_config_path, "r+", encoding="utf-8") as f:
self.csv_cols = json.load(f)
self.__define_headers(original_file_cols)
self.writer = csv.DictWriter(self.file, extrasaction="ignore", fieldnames=self.headers_ordered, delimiter=";")
self.writer.writerow(self.headers)
def __define_headers(self, original_file_cols:List[str]):
"""Defines the headers for the CSV file"""
self.headers = {}
par:Execution_Settings = self.parent
# Common columns
for col in [
CSV_Cols.ERROR,
CSV_Cols.ERROR_MSG,
CSV_Cols.INPUT_QUERY,
CSV_Cols.ORIGIN_DB_INPUT_ID,
CSV_Cols.MATCH_RECORDS_QUERY,
CSV_Cols.FCR_ACTION_USED,
CSV_Cols.MATCH_RECORDS_NB_RESULTS,
CSV_Cols.MATCH_RECORDS_RESULTS,
CSV_Cols.MATCHED_ID,
CSV_Cols.MATCHING_TITLE_RATIO,
CSV_Cols.MATCHING_TITLE_PARTIAL_RATIO,
CSV_Cols.MATCHING_TITLE_TOKEN_SORT_RATIO,
CSV_Cols.MATCHING_TITLE_TOKEN_SET_RATIO,
CSV_Cols.MATCHING_DATES_RESULT,
CSV_Cols.MATCHING_PUBLISHER_RESULT,
CSV_Cols.ORIGIN_DB_CHOSEN_PUBLISHER,
CSV_Cols.TARGET_DB_CHOSEN_PUBLISHER,
CSV_Cols.TARGET_DB_NB_OTHER_ID,
CSV_Cols.IS_ORIGIN_ID_IN_TARGET_OTHER_DB_IDS,
CSV_Cols.GLOBAL_VALIDATION_RESULT,
CSV_Cols.GLOBAL_VALIDATION_NB_SUCCESSFUL_CHECKS,
CSV_Cols.GLOBAL_VALIDATION_TITLE_CHECK,
CSV_Cols.GLOBAL_VALIDATION_PUBLISHER_CHECK,
CSV_Cols.GLOBAL_VALIDATION_DATE_CHECK,
# special data
CSV_Cols.ORIGIN_DB_DATE_1,
CSV_Cols.TARGET_DB_DATE_1,
CSV_Cols.ORIGIN_DB_DATE_2,
CSV_Cols.TARGET_DB_DATE_2,
CSV_Cols.ORIGIN_DB_TITLE_KEY,
CSV_Cols.TARGET_DB_TITLE_KEY
]:
self.headers[col.name] = self.csv_cols[col.name][par.lang]
# Columns from records
processing_fields:Dict[FCR_Mapped_Fields, FCR_Processing_Data_Target] = par.processing.value
for data in processing_fields:
if processing_fields[data] in [FCR_Processing_Data_Target.BOTH, FCR_Processing_Data_Target.ORIGIN]:
self.headers[f"ORIGIN_DB_{data.name}"] = self.csv_cols[f"ORIGIN_DB_{data.name}"][par.lang]
# NOT A ELIF
if processing_fields[data] in [FCR_Processing_Data_Target.BOTH, FCR_Processing_Data_Target.TARGET]:
self.headers[f"TARGET_DB_{data.name}"] = self.csv_cols[f"TARGET_DB_{data.name}"][par.lang]
# Special processing cols
if par.processing_val == FCR_Processings.BETTER_ITEM.name:
self.headers[CSV_Cols.TARGET_DB_HAS_ITEMS.name] = self.csv_cols[CSV_Cols.TARGET_DB_HAS_ITEMS.name][par.lang]
del self.headers[CSV_Cols.ORIGIN_DB_GENERAL_PROCESSING_DATA_DATES.name]
del self.headers[CSV_Cols.TARGET_DB_GENERAL_PROCESSING_DATA_DATES.name]
elif par.processing_val == FCR_Processings.BETTER_ITEM_DVD.name:
self.headers[CSV_Cols.TARGET_DB_HAS_ITEMS.name] = self.csv_cols[CSV_Cols.TARGET_DB_HAS_ITEMS.name][par.lang]
del self.headers[CSV_Cols.ORIGIN_DB_GENERAL_PROCESSING_DATA_DATES.name]
del self.headers[CSV_Cols.TARGET_DB_GENERAL_PROCESSING_DATA_DATES.name]
# Order columns by their index
self.headers_ordered = sorted(self.headers.keys(), key=lambda x: CSV_Cols[x].value)
# Columns from the original file
for col in original_file_cols:
self.headers[col] = col
self.headers_ordered.append(col)
def write_line(self, rec, success):
"""Write this record line to the CSV file"""
self.writer.writerow(rec.output.to_csv())
if success:
msg = "SUCCESSFULLY processed"
else:
msg = "FAILED to process"
self.parent.log.info(f"{msg} line input query = \"{rec.input_query}\", origin database ID = \"{rec.original_uid}\"")
def close_file(self):
"""Closes the CSV file"""
self.file.close()
# -------------------- DATABASE RECORD (DBR) --------------------
class Database_Record(object):
"""Contains extracted data from the record.
The data property contains every mapped data for the chosen processing"""
def __init__(self, processing: FCR_Processings, record: ET.ElementTree | dict | pymarc.record.Record, database: Databases, is_target_db: bool, es: Execution_Settings):
self.processing = processing
self.record = record
self.database = database
self.is_target_db = is_target_db
self.ude = Universal_Data_Extractor(self.record, self.database, self.is_target_db, es)
self.data = {}
for data in processing.value:
if (
(processing.value[data] == FCR_Processing_Data_Target.BOTH)
or (self.is_target_db and processing.value[data] == FCR_Processing_Data_Target.TARGET)
or (not self.is_target_db and processing.value[data] == FCR_Processing_Data_Target.ORIGIN)
):
if data in self.database.value:
#temp
filter_value = ""
if self.database.value[data] == "RCR":
filter_value = es.rcr
if self.database.value[data] == "ILN":
filter_value = es.iln
#temp
self.data[data] = self.ude.get_by_mapped_field_name(data, filter_value)
else:
self.data[data] = self.ude.get_by_mapped_field_name(data)
self.chosen_analysis = es.chosen_analysis
self.chosen_analysis_checks = es.chosen_analysis_checks
self.utils = self.Utils(self)
def __compare_titles(self, compared_to):
"""Compares the titles and sets their keys"""
self.compared_title_key = compared_to.utils.get_first_title_as_string()
self.title_key = self.utils.get_first_title_as_string()
self.title_ratio = fuzz.ratio(self.title_key, self.compared_title_key)
self.title_partial_ratio = fuzz.partial_ratio(self.title_key, self.compared_title_key)
self.title_token_sort_ratio = fuzz.token_sort_ratio(self.title_key, self.compared_title_key)
self.title_token_set_ratio = fuzz.token_set_ratio(self.title_key, self.compared_title_key)
def __compare_dates(self, compared_to) -> None:
"""Compares if one of the record dates matches on one of the compared record."""
self.dates_matched = False
# Merge dates lists
this_dates = []
for dates in self.data[FCR_Mapped_Fields.GENERAL_PROCESSING_DATA_DATES]:
this_dates.extend(dates)
compared_dates = []
for dates in compared_to.data[FCR_Mapped_Fields.GENERAL_PROCESSING_DATA_DATES]:
compared_dates.extend(dates)
for date in this_dates:
if date in compared_dates and date != " ": # excludes empty dates
self.dates_matched = True
return
def __compare_publishers(self, compared_to) -> None:
"""Compares every publishers in this record with every publishers in comapred record"""
self.publishers_score = -1
self.chosen_publisher = ""
self.chosen_compared_publisher = ""
# I fboth don't have results, comparison can't be done
if len(self.data[FCR_Mapped_Fields.PUBLISHERS_NAME]) == 0 or len(compared_to.data[FCR_Mapped_Fields.PUBLISHERS_NAME]) == 0:
return
for publisher in self.data[FCR_Mapped_Fields.PUBLISHERS_NAME]:
publisher = fcf.clean_publisher(publisher)
for compared_publisher in compared_to.data[FCR_Mapped_Fields.PUBLISHERS_NAME]:
compared_publisher = fcf.clean_publisher(compared_publisher)
ratio = fuzz.ratio(publisher, compared_publisher)
if ratio > self.publishers_score:
self.publishers_score = ratio
self.chosen_publisher = publisher
self.chosen_compared_publisher = compared_publisher
def __compare_other_db_id(self, compared_to):
"""Checks if this record id is in the comapred other database IDs"""
self.local_id_in_compared_record = Other_Database_Id_In_Target.UNKNOWN
self.list_of_other_db_id = self.data[FCR_Mapped_Fields.OTHER_DB_ID]
self.nb_other_db_id = len(self.list_of_other_db_id)
id = fcf.list_as_string(compared_to.data[FCR_Mapped_Fields.ID])
if self.nb_other_db_id == 0:
self.local_id_in_compared_record = Other_Database_Id_In_Target.NO_OTHER_DB_ID
elif self.nb_other_db_id == 1 and id in self.list_of_other_db_id:
self.local_id_in_compared_record = Other_Database_Id_In_Target.ONLY_THIS_OTHER_DB_ID
elif self.nb_other_db_id > 1 and id in self.list_of_other_db_id:
self.local_id_in_compared_record = Other_Database_Id_In_Target.THIS_ID_INCLUDED
elif id not in self.list_of_other_db_id:
self.local_id_in_compared_record = Other_Database_Id_In_Target.THIS_ID_NOT_INCLUDED
def __analysis_check_title(self):
self.check_title_nb_valids = 0
# for each matching score, checks if it's high enough
title_score_list = [
self.title_ratio,
self.title_partial_ratio,
self.title_token_sort_ratio,
self.title_token_set_ratio
]
for title_score in title_score_list:
if title_score >= self.chosen_analysis["TITLE_MIN_SCORE"]:
self.check_title_nb_valids += 1
self.checks[Analysis_Checks.TITLE] = (self.check_title_nb_valids >= self.chosen_analysis["NB_TITLE_OK"])
def __analysis_checks(self, check):
"""Launches the check for the provided analysis"""
# Titles
if check == Analysis_Checks.TITLE:
self.__analysis_check_title()
# Publishers
elif check == Analysis_Checks.PUBLISHER:
self.checks[Analysis_Checks.PUBLISHER] = (self.publishers_score >= self.chosen_analysis["PUBLISHER_MIN_SCORE"])
# Dates
elif check == Analysis_Checks.DATE:
self.checks[Analysis_Checks.DATE] = self.dates_matched
def __finalize_analysis(self):
"""Summarizes all checks"""
self.total_checks = Analysis_Final_Results.UNKNOWN
self.passed_check_nb = 0
self.checks = {}
for check in Analysis_Checks:
self.checks[check] = None
if len(self.chosen_analysis_checks) == 0:
self.total_checks = Analysis_Final_Results.NO_CHECK
else:
for check in self.chosen_analysis_checks:
self.__analysis_checks(check)
if self.checks[check] == True:
self.passed_check_nb += 1
if self.passed_check_nb == len(self.chosen_analysis_checks):
self.total_checks = Analysis_Final_Results.TOTAL_MATCH
elif self.passed_check_nb > 0:
self.total_checks = Analysis_Final_Results.PARTIAL_MATCH
else:
self.total_checks = Analysis_Final_Results.NO_MATCH
def compare_to(self, compared_to):
"""Execute the analysis processs
Takes as argument:
- compared_to {Database_Record instance} : the record from origin database"""
self.__compare_titles(compared_to)
self.__compare_dates(compared_to)
self.__compare_publishers(compared_to)
self.__compare_other_db_id(compared_to)
self.__finalize_analysis()
# --- Utils methods for other classes / functions ---
class Utils:
def __init__(self, parent) -> None:
self.parent = parent
self.data: dict = self.parent.data
def get_id(self) -> str:
"""Returns the record ID as a string"""
return fcf.list_as_string(self.data[FCR_Mapped_Fields.ID])
def get_first_title_as_string(self) -> str:
"""Returns the first title cleaned up as a strin"""
return fcf.nettoie_titre(fcf.list_as_string(self.data[FCR_Mapped_Fields.TITLE][0]))
def get_titles_as_string(self) -> str:
"""Returns all titles cleaned up as a str"""
return fcf.nettoie_titre(fcf.list_as_string(self.data[FCR_Mapped_Fields.TITLE]))
def get_authors_as_string(self) -> str:
"""Returns all authors cleaned up as a str"""
return fcf.nettoie_titre(fcf.list_as_string(self.data[FCR_Mapped_Fields.AUTHORS]))
def get_all_publishers_as_string(self) -> str:
"""Returns all authors cleaned up as a str"""
return fcf.clean_publisher(fcf.list_as_string(self.data[FCR_Mapped_Fields.PUBLISHERS_NAME]))
def get_all_publication_dates(self) -> Tuple[List[int], int, int]:
"""Returns a tuple :
- all publication dates as a list of int
- the oldest date as a int (None if no date)
- the newest date as a int (None if no date)"""
dates = []
for date_str in self.data[FCR_Mapped_Fields.PUBLICATION_DATES]:
dates += fcf.get_year(date_str)
# Intifies
for date in dates:
date = int(date)
if dates == []:
return dates, None, None
return dates, min(dates), max(dates)
# -------------------- MATCH RECORDS (MR) --------------------
class Request_Try(object):
""""""
def __init__(self, try_nb: int, action: Actions):
self.try_nb = try_nb
self.action = action
self.status = Try_Status.UNKNWON
self.error_type = None
self.msg = None
self.query = None
self.returned_ids = []
self.returned_records = []
self.includes_records = False
def error_occured(self, msg: Match_Records_Errors | str):
if type(self.msg) == Match_Records_Errors:
self.error_type = msg
self.msg = Match_Records_Error_Messages[self.error_type.name]
else:
self.error_type = Match_Records_Errors.GENERIC_ERROR
self.msg = msg
self.status = Try_Status.ERROR
def define_special_status(self, status: Try_Status, msg: str):
self.msg = msg
if type(status) != Try_Status:
return
self.status = status
def define_used_query(self, query: str):
self.query = query
def add_returned_ids(self, ids: list):
self.returned_ids = ids
self.status = Try_Status.SUCCESS
def add_returned_records(self, records: list):
self.returned_records = records
self.includes_records = True
class Matched_Records(object):
"""
Takes as argument :
- operation {Operations Instance} (defaults to SEARCH_IN_SUDOC_BY_ISBN)"""
def __init__(self, operation: Operations, query: str, local_record:Database_Record, es: Execution_Settings):
self.error = None
self.error_msg = None
self.tries = []
self.returned_ids = []
self.returned_records = []
self.includes_record = False
# Get the operation, defaults on SEARCH_IN_SUDOC one
self.operation = Operations.SEARCH_IN_SUDOC_BY_ISBN
if type(operation) == Operations:
self.operation = operation
self.query = query
self.local_record = local_record
self.es = es
# Calls the operation
self.execute_operation()
last_try:Request_Try = self.tries[-1]
self.query = last_try.query
self.action = last_try.action
def execute_operation(self):
"""Searches in the Sudoc with 3 possibles tries :
- isbn2ppn
- if failed, isbn2ppn after ISBN conversion
- if failed again, Sudoc SRU on ISB index
Requires match_records query to be an ISBN"""
for index, action in enumerate(Try_Operations[self.operation.name].value):
thisTry = Request_Try(index, action)
self.request_action(action, thisTry)
self.tries.append(thisTry)
# If matched ids were returned, break the loop as we have our results
if thisTry.returned_ids != []:
self.returned_ids = thisTry.returned_ids
if thisTry.includes_records:
self.returned_records = thisTry.returned_records
self.includes_record = True
break
# Checks if results were found
if self.returned_ids == []:
self.error = Match_Records_Errors.NOTHING_WAS_FOUND
self.error_msg = Match_Records_Error_Messages[self.error.name]
def request_action(self, action: Actions, thisTry: Request_Try):
"""Makes the request for this specific action and returns a list of IDs as a result"""
# Actions based on the same connector are siilar, do not forget to udate all of them
# Action SRU SUdoc ISBN
if action == Actions.SRU_SUDOC_ISBN:
sru = ssru.Sudoc_SRU()
sru_request = ssru.Part_Of_Query(
ssru.SRU_Indexes.ISB,
ssru.SRU_Relations.EQUALS,
self.query
)
thisTry.define_used_query(sru.generate_query([sru_request]))
res = sru.search(
thisTry.query,
record_schema=ssru.SRU_Record_Schemas.UNIMARC,
record_packing=ssru.SRU_Record_Packings.XML,
maximum_records=100,
start_record=1
)
if (res.status == "Error"):
thisTry.error_occured(res.get_error_msg())
else:
thisTry.add_returned_ids(res.get_records_id())
thisTry.add_returned_records(res.get_records())
# Action isbn2ppn
elif action == Actions.ISBN2PPN:
i2p = id2ppn.Abes_id2ppn(webservice=id2ppn.Webservice.ISBN, useJson=True)
res = i2p.get_matching_ppn(self.query)
thisTry.define_used_query(res.get_id_used())
if res.status != id2ppn.Id2ppn_Status.SUCCESS:
thisTry.error_occured(res.get_error_msg())
else:
thisTry.add_returned_ids(res.get_results(merge=True))
# Action isbn2ppn with changed ISBN
elif action == Actions.ISBN2PPN_MODIFIED_ISBN:
#AR226
# Converting the ISBN to 10<->13
if len(self.query) == 13:
new_query = self.query[3:-1]
new_query += id2ppn.compute_isbn_10_check_digit(list(str(new_query)))
elif len(self.query) == 10:
# Doesn't consider 979[...] as the original issue should only concern old ISBN
new_query = "978" + self.query[:-1]
new_query += id2ppn.compute_isbn_13_check_digit(list(str(new_query)))
# Same thing as Action ISBN2PPN
i2p = id2ppn.Abes_id2ppn(useJson=True)
res = i2p.get_matching_ppn(self.query)
thisTry.define_used_query(res.get_id_used())
if res.status != id2ppn.Id2ppn_Status.SUCCESS:
thisTry.error_occured(res.get_error_msg())
else:
thisTry.add_returned_ids(res.get_results(merge=True))
# Action SRU SUdoc EAN
elif action == Actions.EAN2PPN:
# Gets the first EAN
ean = ""
for val in self.local_record.data[FCR_Mapped_Fields.EAN]:
if type(val) == str and val != "":
ean = val
break
# No EAN was found, throw an error
if ean == "":
thisTry.error_occured(Match_Records_Errors.NO_EAN_WAS_FOUND)
return
i2p = id2ppn.Abes_id2ppn(webservice=id2ppn.Webservice.EAN, useJson=True)
res = i2p.get_matching_ppn(ean)
thisTry.define_used_query(ean)
if res.status != id2ppn.Id2ppn_Status.SUCCESS:
thisTry.error_occured(res.get_error_msg())
else:
thisTry.add_returned_ids(res.get_results(merge=True))
# Action SRU SUdoc MTI title AUT author EDI publisher APu date TDO v
elif action == Actions.SRU_SUDOC_MTI_AUT_EDI_APU_TDO_V:
sru = ssru.Sudoc_SRU()
# Extract data
title = fcf.delete_for_sudoc(self.local_record.utils.get_titles_as_string())
author = fcf.delete_for_sudoc(self.local_record.utils.get_authors_as_string())
publisher = fcf.delete_for_sudoc(self.local_record.utils.get_all_publishers_as_string())
dates, oldest_date, newest_date = self.local_record.utils.get_all_publication_dates()
# Ensure no data is Empty
if title.strip() == "" or author.strip() == "" or publisher.strip() == "" or len(dates) < 1:
thisTry.error_occured(Match_Records_Errors.REQUIRED_DATA_MISSING)
return
# Generate query
sru_request = [
ssru.Part_Of_Query(
ssru.SRU_Indexes.MTI,
ssru.SRU_Relations.EQUALS,
title,
ssru.SRU_Boolean_Operators.AND
),
ssru.Part_Of_Query(
ssru.SRU_Indexes.AUT,
ssru.SRU_Relations.EQUALS,
author,
ssru.SRU_Boolean_Operators.AND
),
ssru.Part_Of_Query(
ssru.SRU_Indexes.EDI,
ssru.SRU_Relations.EQUALS,
publisher,
ssru.SRU_Boolean_Operators.AND
),
ssru.Part_Of_Query(
ssru.SRU_Filters.APU,
ssru.SRU_Relations.SUPERIOR_OR_EQUAL,
oldest_date,
ssru.SRU_Boolean_Operators.AND
),
ssru.Part_Of_Query(
ssru.SRU_Filters.APU,
ssru.SRU_Relations.INFERIOR_OR_EQUAL,
newest_date,
ssru.SRU_Boolean_Operators.AND
),
ssru.Part_Of_Query(
ssru.SRU_Filters.TDO,
ssru.SRU_Relations.EQUALS,
ssru.SRU_Filter_TDO.V,
ssru.SRU_Boolean_Operators.AND
)
]
thisTry.define_used_query(sru.generate_query(sru_request))
res = sru.search(
thisTry.query,
record_schema=ssru.SRU_Record_Schemas.UNIMARC,
record_packing=ssru.SRU_Record_Packings.XML,
maximum_records=100,
start_record=1
)
if (res.status == "Error"):
thisTry.error_occured(res.get_error_msg())
else:
thisTry.add_returned_ids(res.get_records_id())
thisTry.add_returned_records(res.get_records())
# Action SRU SUdoc MTI title AUT author APu date TDO v
elif action == Actions.SRU_SUDOC_MTI_AUT_APU_TDO_V:
sru = ssru.Sudoc_SRU()
# Extract data
title = fcf.delete_for_sudoc(self.local_record.utils.get_titles_as_string())
author = fcf.delete_for_sudoc(self.local_record.utils.get_authors_as_string())
dates, oldest_date, newest_date = self.local_record.utils.get_all_publication_dates()
# Ensure no data is Empty
if title.strip() == "" or author.strip() == "" or len(dates) < 1:
thisTry.error_occured(Match_Records_Errors.REQUIRED_DATA_MISSING)
return
# Generate query
sru_request = [
ssru.Part_Of_Query(
ssru.SRU_Indexes.MTI,
ssru.SRU_Relations.EQUALS,
title,
ssru.SRU_Boolean_Operators.AND
),
ssru.Part_Of_Query(
ssru.SRU_Indexes.AUT,
ssru.SRU_Relations.EQUALS,
author,
ssru.SRU_Boolean_Operators.AND
),
ssru.Part_Of_Query(
ssru.SRU_Filters.APU,
ssru.SRU_Relations.SUPERIOR_OR_EQUAL,
oldest_date,
ssru.SRU_Boolean_Operators.AND
),
ssru.Part_Of_Query(
ssru.SRU_Filters.APU,
ssru.SRU_Relations.INFERIOR_OR_EQUAL,
newest_date,
ssru.SRU_Boolean_Operators.AND
),
ssru.Part_Of_Query(
ssru.SRU_Filters.TDO,
ssru.SRU_Relations.EQUALS,
ssru.SRU_Filter_TDO.V,
ssru.SRU_Boolean_Operators.AND
)
]
thisTry.define_used_query(sru.generate_query(sru_request))
res = sru.search(
thisTry.query,
record_schema=ssru.SRU_Record_Schemas.UNIMARC,
record_packing=ssru.SRU_Record_Packings.XML,
maximum_records=100,
start_record=1
)
if (res.status == "Error"):
thisTry.error_occured(res.get_error_msg())
else:
thisTry.add_returned_ids(res.get_records_id())
thisTry.add_returned_records(res.get_records())
# Action SRU SUdoc TOU title + author + publisher + date TDO v
elif action == Actions.SRU_SUDOC_TOU_TITLE_AUTHOR_PUBLISHER_DATE_TDO_V:
sru = ssru.Sudoc_SRU()
# Extract data
title = fcf.delete_for_sudoc(self.local_record.utils.get_titles_as_string())
author = fcf.delete_for_sudoc(self.local_record.utils.get_authors_as_string())
publisher = fcf.delete_for_sudoc(self.local_record.utils.get_all_publishers_as_string())
dates, oldest_date, newest_date = self.local_record.utils.get_all_publication_dates()
# Ensure no data is Empty
if title.strip() == "" or author.strip() == "" or publisher.strip() == "" or len(dates) < 1:
thisTry.error_occured(Match_Records_Errors.REQUIRED_DATA_MISSING)
return
# Generate query
sru_request = [
ssru.Part_Of_Query(
ssru.SRU_Indexes.TOU,
ssru.SRU_Relations.EQUALS,
fcf.delete_duplicate_words(" ".join([title, author, publisher])),
ssru.SRU_Boolean_Operators.AND
),
f" AND (tou={' or tou='.join([str(num) for num in dates])})",
ssru.Part_Of_Query(
ssru.SRU_Filters.TDO,
ssru.SRU_Relations.EQUALS,
ssru.SRU_Filter_TDO.V,
ssru.SRU_Boolean_Operators.AND
)
]
thisTry.define_used_query(sru.generate_query(sru_request))
res = sru.search(
thisTry.query,
record_schema=ssru.SRU_Record_Schemas.UNIMARC,