Skip to content

Commit

Permalink
feat(dev): IPipeXloader interface;
Browse files Browse the repository at this point in the history
- Made `IPipeXloader` interface to send xloadr status to other plugins.
  • Loading branch information
JVickery-TBS committed May 16, 2024
1 parent dd475e7 commit 005935a
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 0 deletions.
2 changes: 2 additions & 0 deletions ckanext/xloader/action.py
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,8 @@ def xloader_hook(context, data_dict):
p.toolkit.get_action('xloader_submit')(
context, {'resource_id': res_id})

utils.send_xloader_status(task)


@side_effect_free
def xloader_status(context, data_dict):
Expand Down
17 changes: 17 additions & 0 deletions ckanext/xloader/interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,20 @@ def after_upload(self, context, resource_dict, dataset_dict):
the resource that was uploaded
"""
pass


class IPipeXloader(Interface):
"""
Process data in a Data Pipeline.
Inherit this to subscribe to events in the Data Pipeline and be able to
broadcast the results for others to process next. In this way, a number of
IPipes can be linked up in sequence to build up a data processing pipeline.
When a resource is xloadered, it broadcasts its status in the DataStore,
perhaps triggering a process which notifies users of the status.
These processes can in turn put the resulting resource DS status into the pipeline
"""

def receive_xloader_status(self, xloader_status):
pass
17 changes: 17 additions & 0 deletions ckanext/xloader/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@

import ckan.plugins as p
from ckan.plugins.toolkit import config
from ckanext.xloader.interfaces import IPipeXloader

from logging import getLogger


log = getLogger(__name__)

# resource.formats accepted by ckanext-xloader. Must be lowercase here.
DEFAULT_FORMATS = [
Expand Down Expand Up @@ -257,3 +263,14 @@ def datastore_resource_exists(resource_id):
except p.toolkit.ObjectNotFound:
return False
return response or {'fields': []}


def send_xloader_status(xloader_status):
for observer in p.PluginImplementations(IPipeXloader):
try:
observer.receive_xloader_status(xloader_status)
except Exception as ex:
log.exception(ex)
# We reraise all exceptions so they are obvious there
# is something wrong
raise

0 comments on commit 005935a

Please sign in to comment.