Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add JSON-LD bulk import module #10798 #10885

Merged
merged 47 commits into from
Jun 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
63ae837
Add migration for JSON-LD import module #10798
jacobtylerwalls Apr 30, 2024
b8ecdea
Initial commit of JSON-LD import components
jacobtylerwalls Apr 30, 2024
7f0edaf
Initial commit of JSON-LD import backend (etl module) #10798
jacobtylerwalls Apr 30, 2024
f99e6ed
Populate staging table re #10798
jacobtylerwalls May 1, 2024
9e1d8d0
Decompose populate_staging_table() re #10798
jacobtylerwalls May 2, 2024
7aa8d2d
Wrap delete_from_default_storage() in finally re #10798
jacobtylerwalls May 2, 2024
98c4dd0
Remove templates re #10798
jacobtylerwalls May 2, 2024
e33a342
Handle errors from load_jsonld command re #10798
jacobtylerwalls May 2, 2024
35b4f1b
Allow JSON uploads re #10798
jacobtylerwalls May 2, 2024
1e17e31
Surface more node information in errors re #10798
jacobtylerwalls May 3, 2024
f95c51d
Capture early failures re #10798
jacobtylerwalls May 3, 2024
f83e77a
Improve error handling
jacobtylerwalls May 6, 2024
f2fbc58
Adjust DS_Store exception logic
jacobtylerwalls May 6, 2024
dff6ff0
Various fixes
jacobtylerwalls May 6, 2024
7438dfe
Fix excel uploads with file type checking enabled
jacobtylerwalls May 7, 2024
0bf698b
Fix LoadStaging tile value
jacobtylerwalls May 7, 2024
9f7fe1b
Work around missing nodes in __get_nodegroup_tree
jacobtylerwalls May 7, 2024
1d7a968
Allow overwriting resources re #10798
jacobtylerwalls May 7, 2024
792d7f7
Implement run_load_task_async re #10798
jacobtylerwalls May 7, 2024
582678d
Decompose save_to_tiles() to allow wrapping inside a transaction.
jacobtylerwalls May 7, 2024
ab98af3
Add unit test re #10798
jacobtylerwalls May 7, 2024
e8ba32c
Add minor incompatibility notice re #10798
jacobtylerwalls May 7, 2024
09330d3
Workaround test isolation issue re: runtime trigger disables
jacobtylerwalls May 8, 2024
29dab69
nit: typo
jacobtylerwalls May 8, 2024
bba8893
Remove cheesy test image (for now...)
jacobtylerwalls May 8, 2024
95b0bc5
nits re #10798
jacobtylerwalls May 8, 2024
ed4dbd8
Preserve backward compat in BaseImportModule.__init__()
jacobtylerwalls May 8, 2024
a30d8ca
Merge branch 'dev/7.6.x' into jtw/json-ld-zip-import
apeters May 9, 2024
bff642a
need to use kwargs to pass request, re #10798
apeters May 10, 2024
01251eb
Merge branch 'dev/7.6.x' into jtw/json-ld-zip-import
jacobtylerwalls May 13, 2024
5c96ae6
Surface FileValidationError to user
jacobtylerwalls May 14, 2024
eada4a7
Fix fallback node
jacobtylerwalls May 14, 2024
d5a3aac
Surface load errors to UI
jacobtylerwalls May 14, 2024
2cf5529
Use file name as source in load errors
jacobtylerwalls May 14, 2024
0d1da51
update error messages with better node info, re #10798
apeters May 16, 2024
ffb1fa3
Merge branch 'dev/7.6.x' into jtw/json-ld-zip-import
apeters May 24, 2024
f20eb5b
Merge branch 'dev/7.6.x' into jtw/json-ld-zip-import
apeters May 29, 2024
993b22f
Fix migration conflict re #10798
jacobtylerwalls May 29, 2024
1cffb67
Make stage_files() abstract
jacobtylerwalls Jun 5, 2024
0ccfeae
Avoid directly print()'ing in load_jsonld command
jacobtylerwalls Jun 5, 2024
b726477
Override etl_error_report block
jacobtylerwalls Jun 5, 2024
d562da3
Fix error message query for early json-ld failures
jacobtylerwalls Jun 5, 2024
46ae5a4
Merge branch 'dev/7.6.x' into jtw/json-ld-zip-import
jacobtylerwalls Jun 5, 2024
d7c1f60
Avoid manually managing truncation in tests
jacobtylerwalls Jun 7, 2024
e6eae30
Merge branch 'dev/7.6.x' into jtw/json-ld-zip-import
jacobtylerwalls Jun 13, 2024
9740c84
Blacken prior work
jacobtylerwalls Jun 13, 2024
ae4a8de
Update git-blame-ignore-revs
jacobtylerwalls Jun 13, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .git-blame-ignore-revs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
6d519d00e752448362b880b73c3641a562ee9ee3
eea27d316c5ff975556e15ccd4c11d7c393dd95c
8de09526329685ee4d80c816c3475a66b19d46e9
9740c846eda0cc067f49d7abffdcb1a93fe3e0a9
# Relevant subset of git log --grep black
9b7e30d4499cb02dec6d17c7c99f9a7087fcdd47
b959f1139a1d789e6c116d43d8be7daa0baa6075
Expand Down
151 changes: 94 additions & 57 deletions arches/app/etl_modules/base_import_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
from django.utils.decorators import method_decorator
from django.db import connection

from arches.app.datatypes.datatypes import DataTypeFactory
from arches.app.etl_modules.decorators import load_data_async
from arches.app.etl_modules.save import save_to_tiles
from arches.app.models.models import Node
from arches.app.models.system_settings import settings
Expand All @@ -26,11 +28,20 @@


class BaseImportModule:
def __init__(self, loadid=None):
self.moduleid = None
self.fileid = None
def __init__(
self, loadid=None, request=None, userid=None, moduleid=None, fileid=None
):
self.request = request
self.userid = userid
self.moduleid = moduleid
self.fileid = fileid
self.loadid = loadid
self.legacyid_lookup = {}
self.datatype_factory = DataTypeFactory()

if self.request:
self.userid = request.user.id
self.moduleid = request.POST.get("module")

def filesize_format(self, bytes):
"""Convert bytes to readable units"""
Expand Down Expand Up @@ -122,13 +133,21 @@ def get_graph_tree(self, graphid):
str(row[1]): {"depth": int(row[5]), "cardinality": row[7]}
for row in rows
}
nodes = Node.objects.filter(graph_id=graphid)
nodes = Node.objects.filter(graph_id=graphid).select_related("nodegroup")
for node in nodes:
nodeid = str(node.nodeid)
if nodeid in node_lookup:
node_lookup[nodeid]["alias"] = node.alias
node_lookup[nodeid]["datatype"] = node.datatype
node_lookup[nodeid]["config"] = node.config
elif not node.istopnode:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is here to work around #10889

node_lookup[nodeid] = {
"depth": 0, # ???
"cardinality": node.nodegroup.cardinality,
"alias": node.alias,
"datatype": node.datatype,
"config": node.config,
}
return node_lookup, nodes

def get_parent_tileid(
Expand Down Expand Up @@ -179,45 +198,67 @@ def get_node_lookup(self, nodes):
return lookup

def run_load_task(self, userid, files, summary, result, temp_dir, loadid):
with connection.cursor() as cursor:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I factored out stage_files() and check_tile_cardinality(), and then wrapped what remained in a try/finally so that temp files were always cleaned up.

for file in files.keys():
self.stage_excel_file(file, summary, cursor)
cursor.execute(
"""CALL __arches_check_tile_cardinality_violation_for_load(%s)""",
[loadid],
)
cursor.execute(
"""
INSERT INTO load_errors (type, source, error, loadid, nodegroupid)
SELECT 'tile', source_description, error_message, loadid, nodegroupid
FROM load_staging
WHERE loadid = %s AND passes_validation = false AND error_message IS NOT null
""",
[loadid],
)
result["validation"] = self.validate(loadid)
if len(result["validation"]["data"]) == 0:
self.loadid = loadid # currently redundant, but be certain
save_to_tiles(userid, loadid)
cursor.execute(
"""CALL __arches_update_resource_x_resource_with_graphids();"""
)
cursor.execute("""SELECT __arches_refresh_spatial_views();""")
refresh_successful = cursor.fetchone()[0]
if not refresh_successful:
raise Exception("Unable to refresh spatial views")
else:
cursor.execute(
"""UPDATE load_event SET status = %s, load_end_time = %s WHERE loadid = %s""",
("failed", datetime.now(), loadid),
)
self.delete_from_default_storage(temp_dir)
self.loadid = loadid # currently redundant, but be certain
try:
with connection.cursor() as cursor:
self.stage_files(files, summary, cursor)
self.check_tile_cardinality(cursor)
result["validation"] = self.validate(loadid)
if len(result["validation"]["data"]) == 0:
self.save_to_tiles(cursor, userid, loadid)
cursor.execute(
"""CALL __arches_update_resource_x_resource_with_graphids();"""
)
cursor.execute("""SELECT __arches_refresh_spatial_views();""")
refresh_successful = cursor.fetchone()[0]
if not refresh_successful:
raise Exception("Unable to refresh spatial views")
else:
cursor.execute(
"""UPDATE load_event SET status = %s, load_end_time = %s WHERE loadid = %s""",
("failed", datetime.now(), loadid),
)
finally:
self.delete_from_default_storage(temp_dir)
result["summary"] = summary
return {"success": result["validation"]["success"], "data": result}

def validate_uploaded_file(self, file, kwarg):
@load_data_async
def run_load_task_async(self, request):
raise NotImplementedError

def prepare_temp_dir(self, request):
self.loadid = request.POST.get("load_id")
self.temp_dir = os.path.join(settings.UPLOADED_FILES_DIR, "tmp", self.loadid)
try:
self.delete_from_default_storage(self.temp_dir)
except FileNotFoundError:
pass

def validate_uploaded_file(self, file):
pass

def stage_files(self, files, summary, cursor):
raise NotImplementedError

def check_tile_cardinality(self, cursor):
cursor.execute(
"""CALL __arches_check_tile_cardinality_violation_for_load(%s)""",
[self.loadid],
)
cursor.execute(
"""
INSERT INTO load_errors (type, source, error, loadid, nodegroupid)
SELECT 'tile', source_description, error_message, loadid, nodegroupid
FROM load_staging
WHERE loadid = %s AND passes_validation = false AND error_message IS NOT null
""",
[self.loadid],
)

def save_to_tiles(self, cursor, userid, loadid):
return save_to_tiles(userid, loadid)

### Actions ###

def validate(self, loadid):
Expand All @@ -233,14 +274,10 @@ def validate(self, loadid):
return {"success": success, "data": row}

def read(self, request):
self.loadid = request.POST.get("load_id")
self.cumulative_excel_files_size = 0
self.prepare_temp_dir(request)
self.cumulative_files_size = 0
content = request.FILES["file"]
self.temp_dir = os.path.join(settings.UPLOADED_FILES_DIR, "tmp", self.loadid)
try:
self.delete_from_default_storage(self.temp_dir)
except FileNotFoundError:
pass

result = {
"summary": {
"name": content.name,
Expand All @@ -249,39 +286,39 @@ def read(self, request):
}
}
validator = FileValidator()
if len(validator.validate_file_type(content)) > 0:
extension = content.name.split(".")[-1] or None
if len(validator.validate_file_type(content, extension=extension)) > 0:
Comment on lines +289 to +290
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needed to get the extension before the first call to validate_file_type for the body of that method to work correctly.

You may want to test with FILE_TYPE_CHECKING = True.

return {
"status": 400,
"success": False,
"title": _("Invalid excel file/zip specified"),
"message": _("Upload a valid excel file"),
"data": FileValidationError(
message=_("Upload a valid excel file"),
code=400,
),
}
if content.name.split(".")[-1].lower() == "zip":
with zipfile.ZipFile(content, "r") as zip_ref:
files = zip_ref.infolist()
for file in files:
if file.filename.split(".")[-1] == "xlsx":
self.cumulative_excel_files_size += file.file_size
self.cumulative_files_size += file.file_size
if not file.filename.startswith("__MACOSX"):
if not file.is_dir():
result["summary"]["files"][file.filename] = {
"size": (self.filesize_format(file.file_size))
}
result["summary"][
"cumulative_excel_files_size"
] = self.cumulative_excel_files_size
"cumulative_files_size"
] = self.cumulative_files_size
default_storage.save(
os.path.join(self.temp_dir, file.filename),
File(zip_ref.open(file)),
)
elif content.name.split(".")[-1] == "xlsx":
self.cumulative_excel_files_size += content.size
self.cumulative_files_size += content.size
result["summary"]["files"][content.name] = {
"size": (self.filesize_format(content.size))
}
result["summary"][
"cumulative_excel_files_size"
] = self.cumulative_excel_files_size
result["summary"]["cumulative_files_size"] = self.cumulative_files_size
default_storage.save(
os.path.join(self.temp_dir, content.name), File(content)
)
Expand Down Expand Up @@ -340,7 +377,7 @@ def write(self, request):
summary = details["result"]["summary"]
use_celery_file_size_threshold_in_MB = 0.1
if (
summary["cumulative_excel_files_size"] / 1000000
summary["cumulative_files_size"] / 1000000
> use_celery_file_size_threshold_in_MB
):
response = self.run_load_task_async(request, self.loadid)
Expand Down
4 changes: 4 additions & 0 deletions arches/app/etl_modules/branch_excel_importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,10 @@ def get_graphid(self, workbook):
graphid = workbook.get_sheet_by_name("metadata")["B1"].value
return graphid

def stage_files(self, files, summary, cursor):
for file in files:
self.stage_excel_file(file, summary, cursor)

def stage_excel_file(self, file, summary, cursor):
if file.endswith("xlsx"):
summary["files"][file]["worksheets"] = []
Expand Down
Loading
Loading