Skip to content

Commit

Permalink
Populate staging table re archesproject#10798
Browse files Browse the repository at this point in the history
  • Loading branch information
jacobtylerwalls committed May 1, 2024
1 parent f10f02d commit 2d3bfd9
Show file tree
Hide file tree
Showing 3 changed files with 199 additions and 32 deletions.
30 changes: 21 additions & 9 deletions arches/app/etl_modules/base_import_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,18 @@


class BaseImportModule:
def __init__(self, loadid=None):
def __init__(self, request=None, loadid=None):
self.request = request
self.userid = None
self.moduleid = None
self.fileid = None
self.loadid = loadid
self.legacyid_lookup = {}

if self.request:
self.userid = request.user.id
self.moduleid = request.POST.get("module")

def filesize_format(self, bytes):
"""Convert bytes to readable units"""
bytes = int(bytes)
Expand Down Expand Up @@ -160,8 +166,7 @@ def get_node_lookup(self, nodes):

def run_load_task(self, userid, files, summary, result, temp_dir, loadid):
with connection.cursor() as cursor:
for file in files.keys():
self.stage_excel_file(file, summary, cursor)
self.stage_files(files, summary, cursor)
cursor.execute("""CALL __arches_check_tile_cardinality_violation_for_load(%s)""", [loadid])
cursor.execute(
"""
Expand Down Expand Up @@ -205,6 +210,13 @@ def prepare_temp_dir(self, request):
def validate_uploaded_file(self, file):
pass

def stage_files(self, files, summary, cursor):
for file in files:
self.stage_excel_file(file, summary, cursor)

def stage_excel_file(self, file, summary, cursor):
pass

### Actions ###

def validate(self, loadid):
Expand All @@ -221,7 +233,7 @@ def validate(self, loadid):

def read(self, request):
self.prepare_temp_dir(request)
self.cumulative_excel_files_size = 0
self.cumulative_files_size = 0
content = request.FILES["file"]

result = {"summary": {"name": content.name, "size": self.filesize_format(content.size), "files": {}}}
Expand All @@ -238,16 +250,16 @@ def read(self, request):
files = zip_ref.infolist()
for file in files:
if file.filename.split(".")[-1] == "xlsx":
self.cumulative_excel_files_size += file.file_size
self.cumulative_files_size += file.file_size
if not file.filename.startswith("__MACOSX"):
if not file.is_dir():
result["summary"]["files"][file.filename] = {"size": (self.filesize_format(file.file_size))}
result["summary"]["cumulative_excel_files_size"] = self.cumulative_excel_files_size
result["summary"]["cumulative_files_size"] = self.cumulative_files_size
default_storage.save(os.path.join(self.temp_dir, file.filename), File(zip_ref.open(file)))
elif content.name.split(".")[-1] == "xlsx":
self.cumulative_excel_files_size += content.size
self.cumulative_files_size += content.size
result["summary"]["files"][content.name] = {"size": (self.filesize_format(content.size))}
result["summary"]["cumulative_excel_files_size"] = self.cumulative_excel_files_size
result["summary"]["cumulative_files_size"] = self.cumulative_files_size
default_storage.save(os.path.join(self.temp_dir, content.name), File(content))

has_valid_excel_file = False
Expand Down Expand Up @@ -293,7 +305,7 @@ def write(self, request):
files = details["result"]["summary"]["files"]
summary = details["result"]["summary"]
use_celery_file_size_threshold_in_MB = 0.1
if summary["cumulative_excel_files_size"] / 1000000 > use_celery_file_size_threshold_in_MB:
if summary["cumulative_files_size"] / 1000000 > use_celery_file_size_threshold_in_MB:
response = self.run_load_task_async(request, self.loadid)
else:
response = self.run_load_task(self.userid, files, summary, result, self.temp_dir, self.loadid)
Expand Down
125 changes: 116 additions & 9 deletions arches/app/etl_modules/jsonld_importer.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,28 @@
import json
import os
import zipfile
from functools import lru_cache
from pathlib import Path

from django.core.files import File
from django.core.files.storage import default_storage
from django.core.management import call_command
from django.utils.translation import gettext as _

from arches.app.etl_modules.base_import_module import BaseImportModule, FileValidationError
from arches.app.etl_modules.decorators import load_data_async
from arches.app.models.models import GraphModel
from arches.app.models.models import GraphModel, LoadErrors, LoadStaging
from arches.app.models.system_settings import settings
from arches.app.utils.betterJSONSerializer import JSONSerializer
from arches.app.utils.file_validator import FileValidator


@lru_cache(maxsize=1)
def get_graph_tree_from_slug(graph_id):
"""Resources are ordered by graph, so use an aggressively low maxsize."""
return BaseImportModule().get_graph_tree(graph_id)


@lru_cache(maxsize=1)
def graph_id_from_slug(slug):
return GraphModel.objects.get(slug=slug).pk
Expand All @@ -21,10 +31,16 @@ def graph_id_from_slug(slug):
class JSONLDImporter(BaseImportModule):
def read(self, request):
self.prepare_temp_dir(request)
self.cumulative_json_files_size = 0
self.cumulative_files_size = 0
content = request.FILES["file"]

result = {"summary": {"name": content.name, "size": self.filesize_format(content.size), "files": {}}}
result = {
"summary": {
"name": content.name,
"size": self.filesize_format(content.size),
"files": {},
}
}
validator = FileValidator()
if validator.validate_file_type(content):
return {
Expand All @@ -43,13 +59,19 @@ def read(self, request):
continue
if file.is_dir():
continue
self.cumulative_json_files_size += file.file_size
self.cumulative_files_size += file.file_size
result["summary"]["files"][file.filename] = {"size": (self.filesize_format(file.file_size))}
result["summary"]["cumulative_json_files_size"] = self.cumulative_json_files_size
result["summary"]["cumulative_files_size"] = self.cumulative_files_size
with zip_ref.open(file) as opened_file:
self.validate_uploaded_file(opened_file)
f = File(opened_file)
default_storage.save(os.path.join(self.temp_dir, file.filename), f)

# Discard outermost part ("e.g. myzip/")
destination = Path(self.temp_dir)
for part in Path(file.filename).parts[1:]:
destination = destination / part

default_storage.save(destination, f)

if not result["summary"]["files"]:
title = _("Invalid Uploaded File")
Expand All @@ -68,9 +90,94 @@ def validate_uploaded_file(self, file):
message=_('The model "{0}" does not exist.').format(path.parts[1])
)

def run_load_task(self, userid, files, summary, result, temp_dir, loadid):
...
def stage_files(self, files, summary, cursor):
for file in files:
path = Path(file)
unused, graph_slug, block, resource_id_with_suffix = path.parts
resource_id = resource_id_with_suffix.split(".json")[0]

self.handle_block(graph_slug, block)

summary["files"][file]["resources"].append(resource_id)
cursor.execute(
"""UPDATE load_event SET load_details = %s WHERE loadid = %s""",
(json.dumps(summary), self.loadid),
)

# Clear cache in case the user edits the last graph and re-runs.
get_graph_tree_from_slug.cache_clear()
graph_id_from_slug.cache_clear()

def handle_block(self, graph_slug, block):
# todo(jtw): add try
resources = call_command(
"load_jsonld",
model=graph_slug,
block=block,
force="overwrite",
source=self.temp_dir,
quiet=True,
fast=True,
use_storage=True,
dry_run=True, # don't save the resources
),
nodegroup_info, node_info = get_graph_tree_from_slug(graph_slug)
self.populate_staging_table(resources, nodegroup_info, node_info)

def populate_staging_table(self, resources, nodegroup_info, node_info):
load_staging_instances = []
for resource in resources:
for tile in resource.get_flattened_tiles():
tile_value = {}
for nodeid, source_value in tile.data.entries():
datatype = node_info[nodeid]["datatype"]
config = node_info[nodeid]["config"]
value, validation_errors = self.prepare_data_for_loading(
datatype,
source_value,
config,
)
for error in validation_errors:
LoadErrors(
load_event=self.loadid,
nodegroup_id=tile.nodegroup_id,
node_id=nodeid,
datatype=datatype.pk,
type="node",
value=source_value,
source="",
error=error["title"],
message=error["message"],
).save()
tile_value[nodeid] = {
"value": value,
"valid": len(validation_errors) == 0,
"source": source_value,
"notes": ",".join(validation_errors),
"datatype": datatype,
}

load_staging_instances.append(
LoadStaging(
nodegroup_id=tile.nodegroup_id,
load_event_id=self.loadid,
value=JSONSerializer().serialize(tile_value),
legacyid=resource.legacyid,
resourceid=resource.pk,
tileid=tile.pk,
parenttileid=tile.parenttile_id,
passes_validation=True,
nodegroup_depth=nodegroup_info[tile.nodegroup_id].depth,
source_description=None,
error_message=None,
operation="insert",
)
)

tile_batch_size = settings.BULK_IMPORT_BATCH_SIZE * 10 # assume 10 tiles/resource
LoadStaging.objects.bulk_create(load_staging_instances, batch_size=tile_batch_size)
# todo(jtw): edit log?

@load_data_async
def run_load_task_async(self, request):
...
raise NotImplementedError
Loading

0 comments on commit 2d3bfd9

Please sign in to comment.