From 005935aee514575f71cd0b2699a2e37709b79e14 Mon Sep 17 00:00:00 2001 From: Jesse Vickery Date: Thu, 16 May 2024 20:34:05 +0000 Subject: [PATCH] feat(dev): `IPipeXloader` interface; - Made `IPipeXloader` interface to send xloadr status to other plugins. --- ckanext/xloader/action.py | 2 ++ ckanext/xloader/interfaces.py | 17 +++++++++++++++++ ckanext/xloader/utils.py | 17 +++++++++++++++++ 3 files changed, 36 insertions(+) diff --git a/ckanext/xloader/action.py b/ckanext/xloader/action.py index c0f3f84f..f88be591 100644 --- a/ckanext/xloader/action.py +++ b/ckanext/xloader/action.py @@ -311,6 +311,8 @@ def xloader_hook(context, data_dict): p.toolkit.get_action('xloader_submit')( context, {'resource_id': res_id}) + utils.send_xloader_status(task) + @side_effect_free def xloader_status(context, data_dict): diff --git a/ckanext/xloader/interfaces.py b/ckanext/xloader/interfaces.py index cdecf52a..c6e4c68a 100644 --- a/ckanext/xloader/interfaces.py +++ b/ckanext/xloader/interfaces.py @@ -47,3 +47,20 @@ def after_upload(self, context, resource_dict, dataset_dict): the resource that was uploaded """ pass + + +class IPipeXloader(Interface): + """ + Process data in a Data Pipeline. + + Inherit this to subscribe to events in the Data Pipeline and be able to + broadcast the results for others to process next. In this way, a number of + IPipes can be linked up in sequence to build up a data processing pipeline. + + When a resource is xloadered, it broadcasts its status in the DataStore, + perhaps triggering a process which notifies users of the status. + These processes can in turn put the resulting resource DS status into the pipeline + """ + + def receive_xloader_status(self, xloader_status): + pass diff --git a/ckanext/xloader/utils.py b/ckanext/xloader/utils.py index 073a8091..c2120bf3 100644 --- a/ckanext/xloader/utils.py +++ b/ckanext/xloader/utils.py @@ -10,6 +10,12 @@ import ckan.plugins as p from ckan.plugins.toolkit import config +from ckanext.xloader.interfaces import IPipeXloader + +from logging import getLogger + + +log = getLogger(__name__) # resource.formats accepted by ckanext-xloader. Must be lowercase here. DEFAULT_FORMATS = [ @@ -257,3 +263,14 @@ def datastore_resource_exists(resource_id): except p.toolkit.ObjectNotFound: return False return response or {'fields': []} + + +def send_xloader_status(xloader_status): + for observer in p.PluginImplementations(IPipeXloader): + try: + observer.receive_xloader_status(xloader_status) + except Exception as ex: + log.exception(ex) + # We reraise all exceptions so they are obvious there + # is something wrong + raise