diff --git a/arches/app/etl_modules/base_import_module.py b/arches/app/etl_modules/base_import_module.py index b4bfd961198..f25e20d5aa6 100644 --- a/arches/app/etl_modules/base_import_module.py +++ b/arches/app/etl_modules/base_import_module.py @@ -27,12 +27,18 @@ class BaseImportModule: - def __init__(self, loadid=None): + def __init__(self, request=None, loadid=None): + self.request = request + self.userid = None self.moduleid = None self.fileid = None self.loadid = loadid self.legacyid_lookup = {} + if self.request: + self.userid = request.user.id + self.moduleid = request.POST.get("module") + def filesize_format(self, bytes): """Convert bytes to readable units""" bytes = int(bytes) @@ -160,8 +166,7 @@ def get_node_lookup(self, nodes): def run_load_task(self, userid, files, summary, result, temp_dir, loadid): with connection.cursor() as cursor: - for file in files.keys(): - self.stage_excel_file(file, summary, cursor) + self.stage_files(files, summary, cursor) cursor.execute("""CALL __arches_check_tile_cardinality_violation_for_load(%s)""", [loadid]) cursor.execute( """ @@ -205,6 +210,13 @@ def prepare_temp_dir(self, request): def validate_uploaded_file(self, file): pass + def stage_files(self, files, summary, cursor): + for file in files: + self.stage_excel_file(file, summary, cursor) + + def stage_excel_file(self, file, summary, cursor): + pass + ### Actions ### def validate(self, loadid): @@ -221,7 +233,7 @@ def validate(self, loadid): def read(self, request): self.prepare_temp_dir(request) - self.cumulative_excel_files_size = 0 + self.cumulative_files_size = 0 content = request.FILES["file"] result = {"summary": {"name": content.name, "size": self.filesize_format(content.size), "files": {}}} @@ -238,16 +250,16 @@ def read(self, request): files = zip_ref.infolist() for file in files: if file.filename.split(".")[-1] == "xlsx": - self.cumulative_excel_files_size += file.file_size + self.cumulative_files_size += file.file_size if not file.filename.startswith("__MACOSX"): if not file.is_dir(): result["summary"]["files"][file.filename] = {"size": (self.filesize_format(file.file_size))} - result["summary"]["cumulative_excel_files_size"] = self.cumulative_excel_files_size + result["summary"]["cumulative_files_size"] = self.cumulative_files_size default_storage.save(os.path.join(self.temp_dir, file.filename), File(zip_ref.open(file))) elif content.name.split(".")[-1] == "xlsx": - self.cumulative_excel_files_size += content.size + self.cumulative_files_size += content.size result["summary"]["files"][content.name] = {"size": (self.filesize_format(content.size))} - result["summary"]["cumulative_excel_files_size"] = self.cumulative_excel_files_size + result["summary"]["cumulative_files_size"] = self.cumulative_files_size default_storage.save(os.path.join(self.temp_dir, content.name), File(content)) has_valid_excel_file = False @@ -293,7 +305,7 @@ def write(self, request): files = details["result"]["summary"]["files"] summary = details["result"]["summary"] use_celery_file_size_threshold_in_MB = 0.1 - if summary["cumulative_excel_files_size"] / 1000000 > use_celery_file_size_threshold_in_MB: + if summary["cumulative_files_size"] / 1000000 > use_celery_file_size_threshold_in_MB: response = self.run_load_task_async(request, self.loadid) else: response = self.run_load_task(self.userid, files, summary, result, self.temp_dir, self.loadid) diff --git a/arches/app/etl_modules/jsonld_importer.py b/arches/app/etl_modules/jsonld_importer.py index 8135db67e67..b200893fc3c 100644 --- a/arches/app/etl_modules/jsonld_importer.py +++ b/arches/app/etl_modules/jsonld_importer.py @@ -1,3 +1,4 @@ +import json import os import zipfile from functools import lru_cache @@ -5,14 +6,23 @@ from django.core.files import File from django.core.files.storage import default_storage +from django.core.management import call_command from django.utils.translation import gettext as _ from arches.app.etl_modules.base_import_module import BaseImportModule, FileValidationError from arches.app.etl_modules.decorators import load_data_async -from arches.app.models.models import GraphModel +from arches.app.models.models import GraphModel, LoadErrors, LoadStaging +from arches.app.models.system_settings import settings +from arches.app.utils.betterJSONSerializer import JSONSerializer from arches.app.utils.file_validator import FileValidator +@lru_cache(maxsize=1) +def get_graph_tree_from_slug(graph_id): + """Resources are ordered by graph, so use an aggressively low maxsize.""" + return BaseImportModule().get_graph_tree(graph_id) + + @lru_cache(maxsize=1) def graph_id_from_slug(slug): return GraphModel.objects.get(slug=slug).pk @@ -21,10 +31,16 @@ def graph_id_from_slug(slug): class JSONLDImporter(BaseImportModule): def read(self, request): self.prepare_temp_dir(request) - self.cumulative_json_files_size = 0 + self.cumulative_files_size = 0 content = request.FILES["file"] - result = {"summary": {"name": content.name, "size": self.filesize_format(content.size), "files": {}}} + result = { + "summary": { + "name": content.name, + "size": self.filesize_format(content.size), + "files": {}, + } + } validator = FileValidator() if validator.validate_file_type(content): return { @@ -43,13 +59,19 @@ def read(self, request): continue if file.is_dir(): continue - self.cumulative_json_files_size += file.file_size + self.cumulative_files_size += file.file_size result["summary"]["files"][file.filename] = {"size": (self.filesize_format(file.file_size))} - result["summary"]["cumulative_json_files_size"] = self.cumulative_json_files_size + result["summary"]["cumulative_files_size"] = self.cumulative_files_size with zip_ref.open(file) as opened_file: self.validate_uploaded_file(opened_file) f = File(opened_file) - default_storage.save(os.path.join(self.temp_dir, file.filename), f) + + # Discard outermost part ("e.g. myzip/") + destination = Path(self.temp_dir) + for part in Path(file.filename).parts[1:]: + destination = destination / part + + default_storage.save(destination, f) if not result["summary"]["files"]: title = _("Invalid Uploaded File") @@ -68,9 +90,94 @@ def validate_uploaded_file(self, file): message=_('The model "{0}" does not exist.').format(path.parts[1]) ) - def run_load_task(self, userid, files, summary, result, temp_dir, loadid): - ... + def stage_files(self, files, summary, cursor): + for file in files: + path = Path(file) + unused, graph_slug, block, resource_id_with_suffix = path.parts + resource_id = resource_id_with_suffix.split(".json")[0] + + self.handle_block(graph_slug, block) + + summary["files"][file]["resources"].append(resource_id) + cursor.execute( + """UPDATE load_event SET load_details = %s WHERE loadid = %s""", + (json.dumps(summary), self.loadid), + ) + + # Clear cache in case the user edits the last graph and re-runs. + get_graph_tree_from_slug.cache_clear() + graph_id_from_slug.cache_clear() + + def handle_block(self, graph_slug, block): + # todo(jtw): add try + resources = call_command( + "load_jsonld", + model=graph_slug, + block=block, + force="overwrite", + source=self.temp_dir, + quiet=True, + fast=True, + use_storage=True, + dry_run=True, # don't save the resources + ), + nodegroup_info, node_info = get_graph_tree_from_slug(graph_slug) + self.populate_staging_table(resources, nodegroup_info, node_info) + + def populate_staging_table(self, resources, nodegroup_info, node_info): + load_staging_instances = [] + for resource in resources: + for tile in resource.get_flattened_tiles(): + tile_value = {} + for nodeid, source_value in tile.data.entries(): + datatype = node_info[nodeid]["datatype"] + config = node_info[nodeid]["config"] + value, validation_errors = self.prepare_data_for_loading( + datatype, + source_value, + config, + ) + for error in validation_errors: + LoadErrors( + load_event=self.loadid, + nodegroup_id=tile.nodegroup_id, + node_id=nodeid, + datatype=datatype.pk, + type="node", + value=source_value, + source="", + error=error["title"], + message=error["message"], + ).save() + tile_value[nodeid] = { + "value": value, + "valid": len(validation_errors) == 0, + "source": source_value, + "notes": ",".join(validation_errors), + "datatype": datatype, + } + + load_staging_instances.append( + LoadStaging( + nodegroup_id=tile.nodegroup_id, + load_event_id=self.loadid, + value=JSONSerializer().serialize(tile_value), + legacyid=resource.legacyid, + resourceid=resource.pk, + tileid=tile.pk, + parenttileid=tile.parenttile_id, + passes_validation=True, + nodegroup_depth=nodegroup_info[tile.nodegroup_id].depth, + source_description=None, + error_message=None, + operation="insert", + ) + ) + + tile_batch_size = settings.BULK_IMPORT_BATCH_SIZE * 10 # assume 10 tiles/resource + LoadStaging.objects.bulk_create(load_staging_instances, batch_size=tile_batch_size) + # todo(jtw): edit log? @load_data_async def run_load_task_async(self, request): - ... + raise NotImplementedError diff --git a/arches/management/commands/load_jsonld.py b/arches/management/commands/load_jsonld.py index 700406e53e2..617724418f3 100644 --- a/arches/management/commands/load_jsonld.py +++ b/arches/management/commands/load_jsonld.py @@ -19,10 +19,13 @@ import os import json import time +from pathlib import Path -from arches.app.models import models as archesmodels from django.core.management.base import BaseCommand +from django.core.files.storage import default_storage from django.db import transaction + +from arches.app.models import models as archesmodels from arches.app.models.resource import Resource from arches.app.utils.data_management.resources.formats.rdffile import JsonLdReader from arches.app.models.models import TileModel @@ -106,6 +109,22 @@ def add_arguments(self, parser): help="If a node is set to not be exposed to advanced search, then don't even index it", ) + parser.add_argument( + "--dry-run", + default=False, + action="store_true", + dest="dry_run", + help="Neither save nor index resources, but run all validation with requested logging.", + ) + + parser.add_argument( + "--use-storage", + default=False, + action="store_true", + dest="use_storage", + help="Resolve filepaths against Django default_storage.", + ) + def handle(self, *args, **options): print("Starting JSON-LD load") @@ -132,9 +151,14 @@ def handle(self, *args, **options): print("ERROR: stripping fields not exposed to advanced search only works in fast mode") return + if options["dry_run"]: + self.stdout.write("Running in --dry-run mode. Validating only (no saving, no indexing).") + self.resources = [] self.load_resources(options) + return self.resources + def load_resources(self, options): self.reader = JsonLdReader(verbosity=options["verbosity"], ignore_errors=options["ignore_errors"]) @@ -143,7 +167,10 @@ def load_resources(self, options): if options["model"]: models = [options["model"]] else: - models = os.listdir(source) + if options["use_storage"]: + models = [default_storage.listdir(source)][0] + else: + models = os.listdir(source) models.sort() models = [m for m in models if m[0] not in ["_", "."]] print(f"Found possible models: {models}") @@ -168,6 +195,7 @@ def load_resources(self, options): for m in models: print(f"Loading {m}") + model_path = Path(source) / m graphid = graph_uuid_map.get(m, None) if not graphid: # Check slug @@ -182,7 +210,10 @@ def load_resources(self, options): if block and "," not in block: blocks = [block] else: - blocks = os.listdir(f"{source}/{m}") + if options["use_storage"]: + blocks = default_storage.listdir(model_path)[0] + else: + blocks = os.listdir(model_path) blocks.sort() blocks = [b for b in blocks if b[0] not in ["_", "."]] if "," in block: @@ -196,7 +227,11 @@ def load_resources(self, options): try: for b in blocks: - files = os.listdir(f"{source}/{m}/{b}") + block_path = model_path / b + if options["use_storage"]: + files = default_storage.listdir(block_path)[1] + else: + files = os.listdir(block_path) files.sort() for f in files: if not f.endswith(options["suffix"]): @@ -210,20 +245,23 @@ def load_resources(self, options): if seen <= options["skip"]: # Do it this way to keep the counts correct continue - fn = f"{source}/{m}/{b}/{f}" + fn = block_path / f # Check file size of record if not options["quiet"]: print(f"About to import {fn}") if options["toobig"]: sz = os.os.path.getsize(fn) if sz > options["toobig"]: - if not quiet: + if not options["quiet"]: print(f" ... Skipping due to size: {sz} > {options['toobig']}") continue uu = f.replace(f".{options['suffix']}", "") - fh = open(fn) - data = fh.read() - fh.close() + if options["use_storage"]: + with default_storage.open(fn, mode="r") as fh: + data = fh.read() + else: + with open(fn, mode="r") as fh: + data = fh.read() # FIXME Timezone / DateTime Workaround # FIXME The following line should be removed when #5669 / #6346 are closed data = data.replace("T00:00:00Z", "") @@ -243,9 +281,17 @@ def load_resources(self, options): reload=options["force"], quiet=options["quiet"], strip_search=options["strip_search"], + dry_run=options["dry_run"], ) else: - l = self.import_resource(uu, graphid, jsdata, reload=options["force"], quiet=options["quiet"]) + l = self.import_resource( + uu, + graphid, + jsdata, + reload=options["force"], + quiet=options["quiet"], + dry_run=options["dry_run"], + ) loaded += l loaded_model += l except Exception as e: @@ -266,7 +312,7 @@ def load_resources(self, options): self.resources = [] print(f"Total Time: seen {seen} / loaded {loaded} in {time.time()-start} seconds") - def fast_import_resource(self, resourceid, graphid, data, n=1000, reload="ignore", quiet=True, strip_search=False): + def fast_import_resource(self, resourceid, graphid, data, n=1000, reload="ignore", quiet=True, strip_search=False, dry_run=False): try: resource_instance = Resource.objects.get(pk=resourceid) if reload == "ignore": @@ -276,7 +322,7 @@ def fast_import_resource(self, resourceid, graphid, data, n=1000, reload="ignore elif reload == "error": print(f"*** Record exists for {resourceid}, and -ow is error") raise FileExistsError(resourceid) - else: + elif not dry_run: resource_instance.delete() except archesmodels.ResourceInstance.DoesNotExist: # thrown when resource doesn't exist @@ -287,13 +333,13 @@ def fast_import_resource(self, resourceid, graphid, data, n=1000, reload="ignore except: print(f"Exception raised while reading {resourceid}...") raise - if len(self.resources) >= n: + if not dry_run and len(self.resources) >= n: self.save_resources() self.index_resources(strip_search) self.resources = [] return 1 - def import_resource(self, resourceid, graphid, data, reload="ignore", quiet=False): + def import_resource(self, resourceid, graphid, data, reload="ignore", quiet=False, dry_run=False): with transaction.atomic(): try: resource_instance = Resource.objects.get(pk=resourceid) @@ -312,6 +358,8 @@ def import_resource(self, resourceid, graphid, data, reload="ignore", quiet=Fals try: self.reader.read_resource(data, resourceid=resourceid, graphid=graphid) + if dry_run: + return 1 for resource in self.reader.resources: resource.save(request=None) except archesmodels.ResourceInstance.DoesNotExist: