-
Notifications
You must be signed in to change notification settings - Fork 145
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add JSON-LD bulk import module #10798 #10885
Changes from 38 commits
63ae837
b8ecdea
7f0edaf
f99e6ed
9e1d8d0
7aa8d2d
98c4dd0
e33a342
35b4f1b
1e17e31
f95c51d
f83e77a
f2fbc58
dff6ff0
7438dfe
0bf698b
9f7fe1b
1d7a968
792d7f7
582678d
ab98af3
e8ba32c
09330d3
29dab69
bba8893
95b0bc5
ed4dbd8
a30d8ca
bff642a
01251eb
5c96ae6
eada4a7
d5a3aac
2cf5529
0d1da51
ffb1fa3
f20eb5b
993b22f
1cffb67
0ccfeae
b726477
d562da3
46ae5a4
d7c1f60
e6eae30
9740c84
ae4a8de
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,6 +13,8 @@ | |
from django.utils.decorators import method_decorator | ||
from django.db import connection | ||
|
||
from arches.app.datatypes.datatypes import DataTypeFactory | ||
from arches.app.etl_modules.decorators import load_data_async | ||
from arches.app.etl_modules.save import save_to_tiles | ||
from arches.app.models.models import Node | ||
from arches.app.models.system_settings import settings | ||
|
@@ -26,11 +28,18 @@ | |
|
||
|
||
class BaseImportModule: | ||
def __init__(self, loadid=None): | ||
self.moduleid = None | ||
self.fileid = None | ||
def __init__(self, loadid=None, request=None, userid=None, moduleid=None, fileid=None): | ||
self.request = request | ||
self.userid = userid | ||
self.moduleid = moduleid | ||
self.fileid = fileid | ||
self.loadid = loadid | ||
self.legacyid_lookup = {} | ||
self.datatype_factory = DataTypeFactory() | ||
|
||
if self.request: | ||
self.userid = request.user.id | ||
self.moduleid = request.POST.get("module") | ||
|
||
def filesize_format(self, bytes): | ||
"""Convert bytes to readable units""" | ||
|
@@ -107,13 +116,21 @@ def get_graph_tree(self, graphid): | |
cursor.execute("""SELECT * FROM __get_nodegroup_tree_by_graph(%s)""", (graphid,)) | ||
rows = cursor.fetchall() | ||
node_lookup = {str(row[1]): {"depth": int(row[5]), "cardinality": row[7]} for row in rows} | ||
nodes = Node.objects.filter(graph_id=graphid) | ||
nodes = Node.objects.filter(graph_id=graphid).select_related("nodegroup") | ||
for node in nodes: | ||
nodeid = str(node.nodeid) | ||
if nodeid in node_lookup: | ||
node_lookup[nodeid]["alias"] = node.alias | ||
node_lookup[nodeid]["datatype"] = node.datatype | ||
node_lookup[nodeid]["config"] = node.config | ||
elif not node.istopnode: | ||
node_lookup[nodeid] = { | ||
"depth": 0, # ??? | ||
"cardinality": node.nodegroup.cardinality, | ||
"alias": node.alias, | ||
"datatype": node.datatype, | ||
"config": node.config, | ||
} | ||
return node_lookup, nodes | ||
|
||
def get_parent_tileid(self, depth, tileid, previous_tile, nodegroup, nodegroup_tile_lookup): | ||
|
@@ -158,40 +175,66 @@ def get_node_lookup(self, nodes): | |
return lookup | ||
|
||
def run_load_task(self, userid, files, summary, result, temp_dir, loadid): | ||
with connection.cursor() as cursor: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I factored out stage_files() and check_tile_cardinality(), and then wrapped what remained in a try/finally so that temp files were always cleaned up. |
||
for file in files.keys(): | ||
self.stage_excel_file(file, summary, cursor) | ||
cursor.execute("""CALL __arches_check_tile_cardinality_violation_for_load(%s)""", [loadid]) | ||
cursor.execute( | ||
""" | ||
INSERT INTO load_errors (type, source, error, loadid, nodegroupid) | ||
SELECT 'tile', source_description, error_message, loadid, nodegroupid | ||
FROM load_staging | ||
WHERE loadid = %s AND passes_validation = false AND error_message IS NOT null | ||
""", | ||
[loadid], | ||
) | ||
result["validation"] = self.validate(loadid) | ||
if len(result["validation"]["data"]) == 0: | ||
self.loadid = loadid # currently redundant, but be certain | ||
save_to_tiles(userid, loadid) | ||
cursor.execute("""CALL __arches_update_resource_x_resource_with_graphids();""") | ||
cursor.execute("""SELECT __arches_refresh_spatial_views();""") | ||
refresh_successful = cursor.fetchone()[0] | ||
if not refresh_successful: | ||
raise Exception('Unable to refresh spatial views') | ||
else: | ||
cursor.execute( | ||
"""UPDATE load_event SET status = %s, load_end_time = %s WHERE loadid = %s""", | ||
("failed", datetime.now(), loadid), | ||
) | ||
self.delete_from_default_storage(temp_dir) | ||
self.loadid = loadid # currently redundant, but be certain | ||
try: | ||
with connection.cursor() as cursor: | ||
self.stage_files(files, summary, cursor) | ||
self.check_tile_cardinality(cursor) | ||
result["validation"] = self.validate(loadid) | ||
if len(result["validation"]["data"]) == 0: | ||
self.save_to_tiles(cursor, userid, loadid) | ||
cursor.execute("""CALL __arches_update_resource_x_resource_with_graphids();""") | ||
cursor.execute("""SELECT __arches_refresh_spatial_views();""") | ||
refresh_successful = cursor.fetchone()[0] | ||
if not refresh_successful: | ||
raise Exception('Unable to refresh spatial views') | ||
else: | ||
cursor.execute( | ||
"""UPDATE load_event SET status = %s, load_end_time = %s WHERE loadid = %s""", | ||
("failed", datetime.now(), loadid), | ||
) | ||
finally: | ||
self.delete_from_default_storage(temp_dir) | ||
result["summary"] = summary | ||
return {"success": result["validation"]["success"], "data": result} | ||
|
||
def validate_uploaded_file(self, file, kwarg): | ||
@load_data_async | ||
def run_load_task_async(self, request): | ||
raise NotImplementedError | ||
|
||
def prepare_temp_dir(self, request): | ||
self.loadid = request.POST.get("load_id") | ||
self.temp_dir = os.path.join(settings.UPLOADED_FILES_DIR, "tmp", self.loadid) | ||
try: | ||
self.delete_from_default_storage(self.temp_dir) | ||
except FileNotFoundError: | ||
pass | ||
|
||
def validate_uploaded_file(self, file): | ||
pass | ||
|
||
def stage_files(self, files, summary, cursor): | ||
for file in files: | ||
self.stage_excel_file(file, summary, cursor) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In future work, we could move this to the excel importers and let this base method just be abstract. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This seems like such a small (unless I'm missing something) but nice refactor that I think we should just do this refactor now. |
||
|
||
def stage_excel_file(self, file, summary, cursor): | ||
pass | ||
|
||
def check_tile_cardinality(self, cursor): | ||
cursor.execute("""CALL __arches_check_tile_cardinality_violation_for_load(%s)""", [self.loadid]) | ||
cursor.execute( | ||
""" | ||
INSERT INTO load_errors (type, source, error, loadid, nodegroupid) | ||
SELECT 'tile', source_description, error_message, loadid, nodegroupid | ||
FROM load_staging | ||
WHERE loadid = %s AND passes_validation = false AND error_message IS NOT null | ||
""", | ||
[self.loadid], | ||
) | ||
|
||
def save_to_tiles(self, cursor, userid, loadid): | ||
return save_to_tiles(userid, loadid) | ||
|
||
### Actions ### | ||
|
||
def validate(self, loadid): | ||
|
@@ -207,38 +250,36 @@ def validate(self, loadid): | |
return {"success": success, "data": row} | ||
|
||
def read(self, request): | ||
self.loadid = request.POST.get("load_id") | ||
self.cumulative_excel_files_size = 0 | ||
self.prepare_temp_dir(request) | ||
self.cumulative_files_size = 0 | ||
content = request.FILES["file"] | ||
self.temp_dir = os.path.join(settings.UPLOADED_FILES_DIR, "tmp", self.loadid) | ||
try: | ||
self.delete_from_default_storage(self.temp_dir) | ||
except (FileNotFoundError): | ||
pass | ||
|
||
result = {"summary": {"name": content.name, "size": self.filesize_format(content.size), "files": {}}} | ||
validator = FileValidator() | ||
if len(validator.validate_file_type(content)) > 0: | ||
extension = content.name.split(".")[-1] or None | ||
if len(validator.validate_file_type(content, extension=extension)) > 0: | ||
Comment on lines
+289
to
+290
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Needed to get the extension before the first call to validate_file_type for the body of that method to work correctly. You may want to test with FILE_TYPE_CHECKING = True. |
||
return { | ||
"status": 400, | ||
"success": False, | ||
"title": _("Invalid excel file/zip specified"), | ||
"message": _("Upload a valid excel file"), | ||
"data": FileValidationError( | ||
message=_("Upload a valid excel file"), | ||
code=400, | ||
) | ||
} | ||
if content.name.split(".")[-1].lower() == "zip": | ||
with zipfile.ZipFile(content, "r") as zip_ref: | ||
files = zip_ref.infolist() | ||
for file in files: | ||
if file.filename.split(".")[-1] == "xlsx": | ||
self.cumulative_excel_files_size += file.file_size | ||
self.cumulative_files_size += file.file_size | ||
if not file.filename.startswith("__MACOSX"): | ||
if not file.is_dir(): | ||
result["summary"]["files"][file.filename] = {"size": (self.filesize_format(file.file_size))} | ||
result["summary"]["cumulative_excel_files_size"] = self.cumulative_excel_files_size | ||
result["summary"]["cumulative_files_size"] = self.cumulative_files_size | ||
default_storage.save(os.path.join(self.temp_dir, file.filename), File(zip_ref.open(file))) | ||
elif content.name.split(".")[-1] == "xlsx": | ||
self.cumulative_excel_files_size += content.size | ||
self.cumulative_files_size += content.size | ||
result["summary"]["files"][content.name] = {"size": (self.filesize_format(content.size))} | ||
result["summary"]["cumulative_excel_files_size"] = self.cumulative_excel_files_size | ||
result["summary"]["cumulative_files_size"] = self.cumulative_files_size | ||
default_storage.save(os.path.join(self.temp_dir, content.name), File(content)) | ||
|
||
has_valid_excel_file = False | ||
|
@@ -284,7 +325,7 @@ def write(self, request): | |
files = details["result"]["summary"]["files"] | ||
summary = details["result"]["summary"] | ||
use_celery_file_size_threshold_in_MB = 0.1 | ||
if summary["cumulative_excel_files_size"] / 1000000 > use_celery_file_size_threshold_in_MB: | ||
if summary["cumulative_files_size"] / 1000000 > use_celery_file_size_threshold_in_MB: | ||
response = self.run_load_task_async(request, self.loadid) | ||
else: | ||
response = self.run_load_task(self.userid, files, summary, result, self.temp_dir, self.loadid) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is here to work around #10889