Skip to content

Commit

Permalink
Merge pull request #10574 from archesproject/10573_add_generic_etl_ce…
Browse files Browse the repository at this point in the history
…lery_task

Add a generic etl celery task for custom module, #10573
  • Loading branch information
apeters authored Feb 23, 2024
2 parents 10fdc68 + de27796 commit 776c7f0
Showing 1 changed file with 48 additions and 0 deletions.
48 changes: 48 additions & 0 deletions arches/app/tasks.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import importlib
import os
import logging
import shutil
Expand Down Expand Up @@ -351,6 +352,53 @@ def bulk_data_deletion(userid, load_id, graph_id, nodegroup_id, resourceids):
user = User.objects.get(id=userid)
notify_completion(msg, user)


@shared_task
def run_task(module_name=None, class_name=None, method_to_run=None, **kwargs):
"""
this allows the user to run any method as a celery task
module_name, class_name, and method_to_run are required
pass any additional arguments to the method via the kwargs parameter
"""

theClass = getattr(importlib.import_module(module_name), class_name)
theMethod = getattr(theClass(), method_to_run)
theMethod(**kwargs)


@shared_task
def run_etl_task(**kwargs):
"""
this allows the user to run the custom etl module
import_module, import_class, loadid, userid are the required string parameter
importer_name can be added (not required) for messaging purpose
"""

logger = logging.getLogger(__name__)

import_module = kwargs.pop("import_module")
import_class = kwargs.pop("import_class")
importer_name = kwargs.pop("importer_name", import_class)
loadid = kwargs.get("loadid")
userid = kwargs.get("userid")

try:
run_task(module_name=import_module, class_name=import_class, method_to_run="run_load_task", **kwargs)

load_event = models.LoadEvent.objects.get(loadid=loadid)
status = _("Completed") if load_event.status == "indexed" else _("Failed")
except Exception as e:
logger.error(e)
load_event = models.LoadEvent.objects.get(loadid=loadid)
load_event.status = "failed"
load_event.save()
status = _("Failed")
finally:
msg = _("{}: {}").format(importer_name, status)
user = User.objects.get(id=userid)
notify_completion(msg, user)


@shared_task
def reverse_etl_load(loadid):
from arches.app.etl_modules import base_import_module
Expand Down

0 comments on commit 776c7f0

Please sign in to comment.