Skip to content

Commit

Permalink
Merge pull request #10885 from archesproject/jtw/json-ld-zip-import
Browse files Browse the repository at this point in the history
Add JSON-LD bulk import module #10798
  • Loading branch information
apeters authored Jun 13, 2024
2 parents e21b95a + ae4a8de commit 1fb1607
Show file tree
Hide file tree
Showing 26 changed files with 1,503 additions and 268 deletions.
1 change: 1 addition & 0 deletions .git-blame-ignore-revs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
6d519d00e752448362b880b73c3641a562ee9ee3
eea27d316c5ff975556e15ccd4c11d7c393dd95c
8de09526329685ee4d80c816c3475a66b19d46e9
9740c846eda0cc067f49d7abffdcb1a93fe3e0a9
# Relevant subset of git log --grep black
9b7e30d4499cb02dec6d17c7c99f9a7087fcdd47
b959f1139a1d789e6c116d43d8be7daa0baa6075
Expand Down
151 changes: 94 additions & 57 deletions arches/app/etl_modules/base_import_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
from django.utils.decorators import method_decorator
from django.db import connection

from arches.app.datatypes.datatypes import DataTypeFactory
from arches.app.etl_modules.decorators import load_data_async
from arches.app.etl_modules.save import save_to_tiles
from arches.app.models.models import Node
from arches.app.models.system_settings import settings
Expand All @@ -26,11 +28,20 @@


class BaseImportModule:
def __init__(self, loadid=None):
self.moduleid = None
self.fileid = None
def __init__(
self, loadid=None, request=None, userid=None, moduleid=None, fileid=None
):
self.request = request
self.userid = userid
self.moduleid = moduleid
self.fileid = fileid
self.loadid = loadid
self.legacyid_lookup = {}
self.datatype_factory = DataTypeFactory()

if self.request:
self.userid = request.user.id
self.moduleid = request.POST.get("module")

def filesize_format(self, bytes):
"""Convert bytes to readable units"""
Expand Down Expand Up @@ -122,13 +133,21 @@ def get_graph_tree(self, graphid):
str(row[1]): {"depth": int(row[5]), "cardinality": row[7]}
for row in rows
}
nodes = Node.objects.filter(graph_id=graphid)
nodes = Node.objects.filter(graph_id=graphid).select_related("nodegroup")
for node in nodes:
nodeid = str(node.nodeid)
if nodeid in node_lookup:
node_lookup[nodeid]["alias"] = node.alias
node_lookup[nodeid]["datatype"] = node.datatype
node_lookup[nodeid]["config"] = node.config
elif not node.istopnode:
node_lookup[nodeid] = {
"depth": 0, # ???
"cardinality": node.nodegroup.cardinality,
"alias": node.alias,
"datatype": node.datatype,
"config": node.config,
}
return node_lookup, nodes

def get_parent_tileid(
Expand Down Expand Up @@ -179,45 +198,67 @@ def get_node_lookup(self, nodes):
return lookup

def run_load_task(self, userid, files, summary, result, temp_dir, loadid):
with connection.cursor() as cursor:
for file in files.keys():
self.stage_excel_file(file, summary, cursor)
cursor.execute(
"""CALL __arches_check_tile_cardinality_violation_for_load(%s)""",
[loadid],
)
cursor.execute(
"""
INSERT INTO load_errors (type, source, error, loadid, nodegroupid)
SELECT 'tile', source_description, error_message, loadid, nodegroupid
FROM load_staging
WHERE loadid = %s AND passes_validation = false AND error_message IS NOT null
""",
[loadid],
)
result["validation"] = self.validate(loadid)
if len(result["validation"]["data"]) == 0:
self.loadid = loadid # currently redundant, but be certain
save_to_tiles(userid, loadid)
cursor.execute(
"""CALL __arches_update_resource_x_resource_with_graphids();"""
)
cursor.execute("""SELECT __arches_refresh_spatial_views();""")
refresh_successful = cursor.fetchone()[0]
if not refresh_successful:
raise Exception("Unable to refresh spatial views")
else:
cursor.execute(
"""UPDATE load_event SET status = %s, load_end_time = %s WHERE loadid = %s""",
("failed", datetime.now(), loadid),
)
self.delete_from_default_storage(temp_dir)
self.loadid = loadid # currently redundant, but be certain
try:
with connection.cursor() as cursor:
self.stage_files(files, summary, cursor)
self.check_tile_cardinality(cursor)
result["validation"] = self.validate(loadid)
if len(result["validation"]["data"]) == 0:
self.save_to_tiles(cursor, userid, loadid)
cursor.execute(
"""CALL __arches_update_resource_x_resource_with_graphids();"""
)
cursor.execute("""SELECT __arches_refresh_spatial_views();""")
refresh_successful = cursor.fetchone()[0]
if not refresh_successful:
raise Exception("Unable to refresh spatial views")
else:
cursor.execute(
"""UPDATE load_event SET status = %s, load_end_time = %s WHERE loadid = %s""",
("failed", datetime.now(), loadid),
)
finally:
self.delete_from_default_storage(temp_dir)
result["summary"] = summary
return {"success": result["validation"]["success"], "data": result}

def validate_uploaded_file(self, file, kwarg):
@load_data_async
def run_load_task_async(self, request):
raise NotImplementedError

def prepare_temp_dir(self, request):
self.loadid = request.POST.get("load_id")
self.temp_dir = os.path.join(settings.UPLOADED_FILES_DIR, "tmp", self.loadid)
try:
self.delete_from_default_storage(self.temp_dir)
except FileNotFoundError:
pass

def validate_uploaded_file(self, file):
pass

def stage_files(self, files, summary, cursor):
raise NotImplementedError

def check_tile_cardinality(self, cursor):
cursor.execute(
"""CALL __arches_check_tile_cardinality_violation_for_load(%s)""",
[self.loadid],
)
cursor.execute(
"""
INSERT INTO load_errors (type, source, error, loadid, nodegroupid)
SELECT 'tile', source_description, error_message, loadid, nodegroupid
FROM load_staging
WHERE loadid = %s AND passes_validation = false AND error_message IS NOT null
""",
[self.loadid],
)

def save_to_tiles(self, cursor, userid, loadid):
return save_to_tiles(userid, loadid)

### Actions ###

def validate(self, loadid):
Expand All @@ -233,14 +274,10 @@ def validate(self, loadid):
return {"success": success, "data": row}

def read(self, request):
self.loadid = request.POST.get("load_id")
self.cumulative_excel_files_size = 0
self.prepare_temp_dir(request)
self.cumulative_files_size = 0
content = request.FILES["file"]
self.temp_dir = os.path.join(settings.UPLOADED_FILES_DIR, "tmp", self.loadid)
try:
self.delete_from_default_storage(self.temp_dir)
except FileNotFoundError:
pass

result = {
"summary": {
"name": content.name,
Expand All @@ -249,39 +286,39 @@ def read(self, request):
}
}
validator = FileValidator()
if len(validator.validate_file_type(content)) > 0:
extension = content.name.split(".")[-1] or None
if len(validator.validate_file_type(content, extension=extension)) > 0:
return {
"status": 400,
"success": False,
"title": _("Invalid excel file/zip specified"),
"message": _("Upload a valid excel file"),
"data": FileValidationError(
message=_("Upload a valid excel file"),
code=400,
),
}
if content.name.split(".")[-1].lower() == "zip":
with zipfile.ZipFile(content, "r") as zip_ref:
files = zip_ref.infolist()
for file in files:
if file.filename.split(".")[-1] == "xlsx":
self.cumulative_excel_files_size += file.file_size
self.cumulative_files_size += file.file_size
if not file.filename.startswith("__MACOSX"):
if not file.is_dir():
result["summary"]["files"][file.filename] = {
"size": (self.filesize_format(file.file_size))
}
result["summary"][
"cumulative_excel_files_size"
] = self.cumulative_excel_files_size
"cumulative_files_size"
] = self.cumulative_files_size
default_storage.save(
os.path.join(self.temp_dir, file.filename),
File(zip_ref.open(file)),
)
elif content.name.split(".")[-1] == "xlsx":
self.cumulative_excel_files_size += content.size
self.cumulative_files_size += content.size
result["summary"]["files"][content.name] = {
"size": (self.filesize_format(content.size))
}
result["summary"][
"cumulative_excel_files_size"
] = self.cumulative_excel_files_size
result["summary"]["cumulative_files_size"] = self.cumulative_files_size
default_storage.save(
os.path.join(self.temp_dir, content.name), File(content)
)
Expand Down Expand Up @@ -340,7 +377,7 @@ def write(self, request):
summary = details["result"]["summary"]
use_celery_file_size_threshold_in_MB = 0.1
if (
summary["cumulative_excel_files_size"] / 1000000
summary["cumulative_files_size"] / 1000000
> use_celery_file_size_threshold_in_MB
):
response = self.run_load_task_async(request, self.loadid)
Expand Down
4 changes: 4 additions & 0 deletions arches/app/etl_modules/branch_excel_importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,10 @@ def get_graphid(self, workbook):
graphid = workbook.get_sheet_by_name("metadata")["B1"].value
return graphid

def stage_files(self, files, summary, cursor):
for file in files:
self.stage_excel_file(file, summary, cursor)

def stage_excel_file(self, file, summary, cursor):
if file.endswith("xlsx"):
summary["files"][file]["worksheets"] = []
Expand Down
Loading

0 comments on commit 1fb1607

Please sign in to comment.