Skip to content

Commit

Permalink
feat(dev): started doing sync mode;
Browse files Browse the repository at this point in the history
- Started doing sync mode for xloader right after validation.
  • Loading branch information
JVickery-TBS committed Feb 6, 2024
1 parent 1761ed5 commit e182eb7
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 1 deletion.
7 changes: 7 additions & 0 deletions ckanext/xloader/action.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ def xloader_submit(context, data_dict):
:param ignore_hash: If set to True, the xloader will reload the file
even if it haven't changed. (optional, default: False)
:type ignore_hash: bool
:param sync_mode: If set to True, the xloader callback will be executed right
away, instead of a job being enqueued. It will also delete any existing jobs
for the given resource. (optional, default: False)
:type sync_mode: bool
Returns ``True`` if the job has been submitted and ``False`` if the job
has not been submitted, i.e. when ckanext-xloader is not configured.
Expand All @@ -53,6 +57,9 @@ def xloader_submit(context, data_dict):

p.toolkit.check_access('xloader_submit', context, data_dict)

sync_mode = data_dict.pop('sync_mode', False)
#TODO: implement the sync_mode logic

res_id = data_dict['resource_id']
try:
resource_dict = p.toolkit.get_action('resource_show')(context, {
Expand Down
8 changes: 8 additions & 0 deletions ckanext/xloader/config_declaration.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,14 @@ groups:
See https://github.com/frictionlessdata/ckanext-validation?tab=readme-ov-file#data-schema
for more details.
- key: ckanext.xloader.validation.chain_xloader
default: True
example: False
description: |
Resources that pass Validation will immediately get XLoadered instead of having
a job enqueued for it.
If this option is set to `False`, jobs will be enqueued like normal.
- key: ckanext.xloader.clean_datastore_tables
default: False
example: True
Expand Down
13 changes: 12 additions & 1 deletion ckanext/xloader/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,12 @@ def notify(self, entity, operation):
log.debug("Skipping xloading resource %s because the "
"resource did not pass validation yet.", entity.id)
return
elif utils.do_chain_after_validation(resource_dict.get('id')):
# At this point, the Resource has passed validation requirements,
# and chainging is turned on. We will execute XLoader right away,
# instead of enqueueing a job.
self._submit_to_xloader(resource_dict, sync_mode=True)
return
elif not getattr(entity, 'url_changed', False):
# do not submit to xloader if the url has not changed.
return
Expand All @@ -118,6 +124,10 @@ def after_resource_create(self, context, resource_dict):
"resource did not pass validation yet.", resource_dict.get('id'))
return

if utils.do_chain_after_validation(resource_dict.get('id')):
self._submit_to_xloader(resource_dict, sync_mode=True)
return

self._submit_to_xloader(resource_dict)

def before_resource_show(self, resource_dict):
Expand Down Expand Up @@ -160,7 +170,7 @@ def before_show(self, resource_dict):
def after_update(self, context, resource_dict):
self.after_resource_update(context, resource_dict)

def _submit_to_xloader(self, resource_dict):
def _submit_to_xloader(self, resource_dict, sync_mode=False):
context = {"ignore_auth": True, "defer_commit": True}
if not XLoaderFormats.is_it_an_xloader_format(resource_dict["format"]):
log.debug(
Expand All @@ -187,6 +197,7 @@ def _submit_to_xloader(self, resource_dict):
{
"resource_id": resource_dict["id"],
"ignore_hash": self.ignore_hash,
"sync_mode": sync_mode,
},
)
except toolkit.ValidationError as e:
Expand Down
1 change: 1 addition & 0 deletions ckanext/xloader/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ def xloader_submit_schema():
'id': [ignore_missing],
'set_url_type': [ignore_missing, boolean_validator],
'ignore_hash': [ignore_missing, boolean_validator],
'sync_mode': [ignore_missing, boolean_validator],
'__junk': [empty],
'__before': [dsschema.rename('id', 'resource_id')]
}
Expand Down
35 changes: 35 additions & 0 deletions ckanext/xloader/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import json
import datetime
from rq import get_current_job

from ckan import model
from ckan.lib import search
Expand Down Expand Up @@ -48,6 +49,7 @@ def is_it_an_xloader_format(cls, format_):


def awaiting_validation(res_dict):
# type: (dict) -> bool
"""
Checks the existence of a logic action from the ckanext-validation
plugin, thus supporting any extending of the Validation Plugin class.
Expand Down Expand Up @@ -88,6 +90,39 @@ def awaiting_validation(res_dict):
return False


def do_chain_after_validation(resource_id):
# type: (str) -> bool
if not p.toolkit.asbool(config.get('ckanext.xloader.validation.requires_successful_report', False)):
# we are not requiring resources to pass validation
return False

if not p.toolkit.asbool(config.get('ckanext.xloader.validation.chain_xloader', True)):
# we are not chaining validation to xloader
return False

current_job = get_current_job()

if not current_job:
# we are outside of the job context, thus not running a job
return False

if current_job.func_name != 'ckanext.validation.jobs.run_validation_job':
# the current running job is not the ckanext-validation validate job
#FIXME: how to do a better check for the caller in the stack??
return False

try:
job_rid = current_job.args[0].get('id', None)
except (KeyError):
job_rid = None
if resource_id != job_rid:
# the current running job's Resource ID is not
# the same as the passed Resource ID
return False

return True


def resource_data(id, resource_id, rows=None):

if p.toolkit.request.method == "POST":
Expand Down

0 comments on commit e182eb7

Please sign in to comment.