Skip to content

Commit

Permalink
Capture early failures re #10798
Browse files Browse the repository at this point in the history
  • Loading branch information
jacobtylerwalls committed May 6, 2024
1 parent 1e17e31 commit f95c51d
Showing 1 changed file with 68 additions and 32 deletions.
100 changes: 68 additions & 32 deletions arches/app/etl_modules/jsonld_importer.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import json
import zipfile
from datetime import datetime
from functools import lru_cache
from functools import cache, lru_cache
from pathlib import Path

from django.core.files import File
Expand All @@ -12,15 +12,16 @@
from arches.app.utils.data_management.resources.formats.rdffile import ValueErrorWithNodeInfo
from arches.app.etl_modules.base_import_module import BaseImportModule, FileValidationError
from arches.app.etl_modules.decorators import load_data_async
from arches.app.models.models import GraphModel, LoadErrors, LoadEvent, LoadStaging
from arches.app.models.models import GraphModel, LoadErrors, LoadEvent, LoadStaging, NodeGroup
from arches.app.models.system_settings import settings
from arches.app.utils.betterJSONSerializer import JSONSerializer
from arches.app.utils.file_validator import FileValidator


@lru_cache(maxsize=1)
def get_graph_tree_from_slug(graph_id):
def get_graph_tree_from_slug(slug):
"""Resources are ordered by graph, so use an aggressively low maxsize."""
graph_id = graph_id_from_slug(slug)
return BaseImportModule().get_graph_tree(graph_id)


Expand All @@ -29,6 +30,14 @@ def graph_id_from_slug(slug):
return GraphModel.objects.get(slug=slug).pk


@cache
def fallback_nodegroup_id():
"""Consider removing this if we make LoadStaging.nodegroup nullable."""
return NodeGroup.objects.filter(
graph_id=settings.SYSTEM_SETTINGS_RESOURCE_MODEL_ID
).first().pk


class JSONLDImporter(BaseImportModule):
def read(self, request):
self.prepare_temp_dir(request)
Expand Down Expand Up @@ -126,29 +135,7 @@ def handle_block(self, graph_slug, block):
dry_run=True, # don't save the resources
)
except Exception as e:
has_info = isinstance(e, ValueErrorWithNodeInfo)
LoadErrors(
load_event_id=self.loadid,
type="graph",
source="/".join((graph_slug, block)),
error=_("Load JSON-LD command error"),
message=e.args[0],
value=str(e.value) if has_info else None,
datatype=e.datatype if has_info else None,
node_id=e.node_id if has_info else None,
nodegroup_id=e.nodegroup_id if has_info else None,
).save()

# Prevent IntegrityError: https://code.djangoproject.com/ticket/35425
LoadEvent.objects.filter(loadid=self.loadid).update(
user_id=self.userid,
successful=False,
status="failed",
load_description="/".join((graph_slug, block)),
etl_module_id=self.moduleid,
error_message=_("Load JSON-LD command error"),
load_end_time=datetime.now(),
)
self.handle_early_failure(e, graph_slug=graph_slug, block=block)
return

nodegroup_info, node_info = get_graph_tree_from_slug(graph_slug)
Expand All @@ -166,8 +153,56 @@ def populate_staging_table(self, resources, nodegroup_info, node_info):
LoadStaging.objects.bulk_create(load_staging_instances, batch_size=tile_batch_size)
# todo(jtw): edit log?

def handle_early_failure(self, exception, graph_slug, block):
"""The CLI might have failed well before creating resources and tiles,
but still with some useful information we can surface."""

has_info = isinstance(exception, ValueErrorWithNodeInfo)
early_failure = _("Load JSON-LD command failed before fully parsing a block.")
exception_message = exception.args[0]

LoadErrors(
load_event_id=self.loadid,
type="graph",
source="/".join((graph_slug, block)),
error=early_failure,
message=exception_message,
value=str(exception.value) if has_info else None,
datatype=exception.datatype if has_info else None,
node_id=exception.node_id if has_info else None,
nodegroup_id=exception.nodegroup_id if has_info else None,
).save()

# Avoid save() to prevent IntegrityError: https://code.djangoproject.com/ticket/35425
LoadEvent.objects.filter(loadid=self.loadid).update(
user_id=self.userid,
successful=False,
status="failed",
load_description="/".join((graph_slug, block)),
etl_module_id=self.moduleid,
error_message=early_failure,
load_end_time=datetime.now(),
)

dummy_tile_info = {
"value": {},
"valid": False,
"source": early_failure,
"notes": exception_message,
"datatype": "",
}

LoadStaging(
load_event_id=self.loadid,
nodegroup_id=exception.nodegroup_id if has_info else fallback_nodegroup_id(),
value=JSONSerializer().serialize(dummy_tile_info),
passes_validation=False,
source_description=early_failure,
error_message=exception_message,
operation="insert",
).save()

def load_staging_instance_from_tile(self, tile, resource, nodegroup_info, node_info):
tile_value = {}
for nodeid, source_value in tile.data.entries():
datatype = node_info[nodeid]["datatype"]
config = node_info[nodeid]["config"]
Expand All @@ -176,25 +211,26 @@ def load_staging_instance_from_tile(self, tile, resource, nodegroup_info, node_i
source_value,
config,
)
passes_validation = len(validation_errors) == 0
self.save_validation_errors(validation_errors, tile, source_value, datatype, nodeid)

tile_value[nodeid] = {
tile_info = {
"value": value,
"valid": len(validation_errors) == 0,
"valid": passes_validation,
"source": source_value,
"notes": ",".join(validation_errors),
"notes": "|".join(validation_errors),
"datatype": datatype,
}

return LoadStaging(
nodegroup_id=tile.nodegroup_id,
load_event_id=self.loadid,
value=JSONSerializer().serialize(tile_value),
value=JSONSerializer().serialize(tile_info),
legacyid=resource.legacyid,
resourceid=resource.pk,
tileid=tile.pk,
parenttileid=tile.parenttile_id,
passes_validation=True,
passes_validation=passes_validation,
nodegroup_depth=nodegroup_info[tile.nodegroup_id].depth,
source_description=None,
error_message=None,
Expand Down

0 comments on commit f95c51d

Please sign in to comment.