Skip to content

Commit 2a784e6

Browse files
authored
Merge branch 'develop' into feature/qld-gov-au/validation-support
2 parents 16d1dd6 + ee5760b commit 2a784e6

21 files changed

+284
-84
lines changed

README.rst

+5
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,11 @@ Configuration:
191191

192192
See the extension's `config_declaration.yaml <ckanext/xloader/config_declaration.yaml>`_ file.
193193

194+
This plugin also supports the `ckan.download_proxy` setting, to use a proxy server when downloading files.
195+
This setting is shared with other plugins that download resource files, such as ckanext-archiver. Eg:
196+
197+
ckan.download_proxy = http://my-proxy:1234/
198+
194199
You may also wish to configure the database to use your preferred date input style on COPY.
195200
For example, to make [PostgreSQL](https://www.postgresql.org/docs/current/runtime-config-client.html#RUNTIME-CONFIG-CLIENT-FORMAT)
196201
expect European (day-first) dates, you could add to ``postgresql.conf``:

ckanext/xloader/action.py

+9-3
Original file line numberDiff line numberDiff line change
@@ -46,13 +46,13 @@ def xloader_submit(context, data_dict):
4646
4747
:rtype: bool
4848
'''
49+
p.toolkit.check_access('xloader_submit', context, data_dict)
50+
custom_queue = data_dict.pop('queue', rq_jobs.DEFAULT_QUEUE_NAME)
4951
schema = context.get('schema', ckanext.xloader.schema.xloader_submit_schema())
5052
data_dict, errors = _validate(data_dict, schema, context)
5153
if errors:
5254
raise p.toolkit.ValidationError(errors)
5355

54-
p.toolkit.check_access('xloader_submit', context, data_dict)
55-
5656
res_id = data_dict['resource_id']
5757
try:
5858
resource_dict = p.toolkit.get_action('resource_show')(context, {
@@ -152,6 +152,10 @@ def xloader_submit(context, data_dict):
152152
'original_url': resource_dict.get('url'),
153153
}
154154
}
155+
if custom_queue != rq_jobs.DEFAULT_QUEUE_NAME:
156+
# Don't automatically retry if it's a custom run
157+
data['metadata']['tries'] = jobs.MAX_RETRIES
158+
155159
# Expand timeout for resources that have to be type-guessed
156160
timeout = config.get(
157161
'ckanext.xloader.job_timeout',
@@ -160,7 +164,9 @@ def xloader_submit(context, data_dict):
160164

161165
try:
162166
job = enqueue_job(
163-
jobs.xloader_data_into_datastore, [data], rq_kwargs=dict(timeout=timeout)
167+
jobs.xloader_data_into_datastore, [data], queue=custom_queue,
168+
title="xloader_submit: package: {} resource: {}".format(resource_dict.get('package_id'), res_id),
169+
rq_kwargs=dict(timeout=timeout)
164170
)
165171
except Exception:
166172
log.exception('Unable to enqueued xloader res_id=%s', res_id)

ckanext/xloader/auth.py

+7
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,14 @@
1+
from ckan import authz
2+
from ckan.lib import jobs as rq_jobs
3+
14
import ckanext.datastore.logic.auth as auth
25

36

47
def xloader_submit(context, data_dict):
8+
# only sysadmins can specify a custom processing queue
9+
custom_queue = data_dict.get('queue')
10+
if custom_queue and custom_queue != rq_jobs.DEFAULT_QUEUE_NAME:
11+
return authz.is_authorized('config_option_update', context, data_dict)
512
return auth.datastore_auth(context, data_dict)
613

714

ckanext/xloader/cli.py

+7-4
Original file line numberDiff line numberDiff line change
@@ -26,23 +26,26 @@ def status():
2626
@click.argument(u'dataset-spec')
2727
@click.option('-y', is_flag=True, default=False, help='Always answer yes to questions')
2828
@click.option('--dry-run', is_flag=True, default=False, help='Don\'t actually submit any resources')
29-
def submit(dataset_spec, y, dry_run):
29+
@click.option('--queue', help='Queue name for asynchronous processing, unused if executing immediately')
30+
@click.option('--sync', is_flag=True, default=False,
31+
help='Execute immediately instead of enqueueing for asynchronous processing')
32+
def submit(dataset_spec, y, dry_run, queue, sync):
3033
"""
3134
xloader submit [options] <dataset-spec>
3235
"""
3336
cmd = XloaderCmd(dry_run)
3437

3538
if dataset_spec == 'all':
3639
cmd._setup_xloader_logger()
37-
cmd._submit_all()
40+
cmd._submit_all(sync=sync, queue=queue)
3841
elif dataset_spec == 'all-existing':
3942
_confirm_or_abort(y, dry_run)
4043
cmd._setup_xloader_logger()
41-
cmd._submit_all_existing()
44+
cmd._submit_all_existing(sync=sync, queue=queue)
4245
else:
4346
pkg_name_or_id = dataset_spec
4447
cmd._setup_xloader_logger()
45-
cmd._submit_package(pkg_name_or_id)
48+
cmd._submit_package(pkg_name_or_id, sync=sync, queue=queue)
4649

4750
if cmd.error_occured:
4851
print('Finished but saw errors - see above for details')

ckanext/xloader/command.py

+32-18
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
import sys
44
import logging
55
import ckan.plugins.toolkit as tk
6+
7+
from ckanext.xloader.jobs import xloader_data_into_datastore_
68
from ckanext.xloader.utils import XLoaderFormats
79

810

@@ -23,7 +25,7 @@ def _setup_xloader_logger(self):
2325
logger.setLevel(logging.DEBUG)
2426
logger.propagate = False # in case the config
2527

26-
def _submit_all_existing(self):
28+
def _submit_all_existing(self, sync=False, queue=None):
2729
from ckanext.datastore.backend \
2830
import get_all_resources_ids_in_datastore
2931
resource_ids = get_all_resources_ids_in_datastore()
@@ -38,9 +40,9 @@ def _submit_all_existing(self):
3840
print(' Skipping resource {} found in datastore but not in '
3941
'metadata'.format(resource_id))
4042
continue
41-
self._submit_resource(resource_dict, user, indent=2)
43+
self._submit_resource(resource_dict, user, indent=2, sync=sync, queue=queue)
4244

43-
def _submit_all(self):
45+
def _submit_all(self, sync=False, queue=None):
4446
# submit every package
4547
# for each package in the package list,
4648
# submit each resource w/ _submit_package
@@ -51,9 +53,9 @@ def _submit_all(self):
5153
user = tk.get_action('get_site_user')(
5254
{'ignore_auth': True}, {})
5355
for p_id in package_list:
54-
self._submit_package(p_id, user, indent=2)
56+
self._submit_package(p_id, user, indent=2, sync=sync, queue=queue)
5557

56-
def _submit_package(self, pkg_id, user=None, indent=0):
58+
def _submit_package(self, pkg_id, user=None, indent=0, sync=False, queue=None):
5759
indentation = ' ' * indent
5860
if not user:
5961
user = tk.get_action('get_site_user')(
@@ -73,15 +75,15 @@ def _submit_package(self, pkg_id, user=None, indent=0):
7375
for resource in pkg['resources']:
7476
try:
7577
resource['package_name'] = pkg['name'] # for debug output
76-
self._submit_resource(resource, user, indent=indent + 2)
78+
self._submit_resource(resource, user, indent=indent + 2, sync=sync, queue=queue)
7779
except Exception as e:
7880
self.error_occured = True
79-
print(e)
81+
print(str(e))
8082
print(indentation + 'ERROR submitting resource "{}" '.format(
8183
resource['id']))
8284
continue
8385

84-
def _submit_resource(self, resource, user, indent=0):
86+
def _submit_resource(self, resource, user, indent=0, sync=False, queue=None):
8587
'''resource: resource dictionary
8688
'''
8789
indentation = ' ' * indent
@@ -99,23 +101,35 @@ def _submit_resource(self, resource, user, indent=0):
99101
r=resource))
100102
return
101103
dataset_ref = resource.get('package_name', resource['package_id'])
102-
print('{indent}Submitting /dataset/{dataset}/resource/{r[id]}\n'
104+
print('{indent}{sync_style} /dataset/{dataset}/resource/{r[id]}\n'
103105
'{indent} url={r[url]}\n'
104106
'{indent} format={r[format]}'
105-
.format(dataset=dataset_ref, r=resource, indent=indentation))
107+
.format(sync_style='Processing' if sync else 'Submitting',
108+
dataset=dataset_ref, r=resource, indent=indentation))
109+
if self.dry_run:
110+
print(indentation + '(not submitted - dry-run)')
111+
return
106112
data_dict = {
107113
'resource_id': resource['id'],
108114
'ignore_hash': True,
109115
}
110-
if self.dry_run:
111-
print(indentation + '(not submitted - dry-run)')
112-
return
113-
success = tk.get_action('xloader_submit')({'user': user['name']}, data_dict)
114-
if success:
115-
print(indentation + '...ok')
116+
if sync:
117+
data_dict['ckan_url'] = tk.config.get('ckan.site_url')
118+
input_dict = {
119+
'metadata': data_dict,
120+
'api_key': 'TODO'
121+
}
122+
logger = logging.getLogger('ckanext.xloader.cli')
123+
xloader_data_into_datastore_(input_dict, None, logger)
116124
else:
117-
print(indentation + 'ERROR submitting resource')
118-
self.error_occured = True
125+
if queue:
126+
data_dict['queue'] = queue
127+
success = tk.get_action('xloader_submit')({'user': user['name']}, data_dict)
128+
if success:
129+
print(indentation + '...ok')
130+
else:
131+
print(indentation + 'ERROR submitting resource')
132+
self.error_occured = True
119133

120134
def print_status(self):
121135
import ckan.lib.jobs as rq_jobs

ckanext/xloader/config_declaration.yaml

+15
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,15 @@ groups:
4646
type: bool
4747
required: false
4848
legacy_key: ckanext.xloader.just_load_with_messytables
49+
- key: ckanext.xloader.strict_type_guessing
50+
default: True
51+
example: False
52+
description: |
53+
Use with ckanext.xloader.use_type_guessing to set strict true or false
54+
for type guessing. If set to False, the types will always fallback to string type.
55+
56+
Strict means that a type will not be guessed if parsing fails for a single cell in the column.
57+
type: bool
4958
- key: ckanext.xloader.max_type_guessing_length
5059
default: 0
5160
example: 100000
@@ -138,6 +147,12 @@ groups:
138147
139148
See https://github.com/frictionlessdata/ckanext-validation?tab=readme-ov-file#data-schema
140149
for more details.
150+
- key: ckanext.xloader.clean_datastore_tables
151+
default: False
152+
example: True
153+
description: |
154+
Enqueue jobs to remove Datastore tables from Resources that have a format
155+
that is not in ckanext.xloader.formats after a Resource is updated.
141156
type: bool
142157
required: false
143158

ckanext/xloader/helpers.py

+2-4
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,8 @@ def xloader_status_description(status):
3131
def is_resource_supported_by_xloader(res_dict, check_access=True):
3232
is_supported_format = XLoaderFormats.is_it_an_xloader_format(res_dict.get('format'))
3333
is_datastore_active = res_dict.get('datastore_active', False)
34-
if check_access:
35-
user_has_access = toolkit.h.check_access('package_update', {'id': res_dict.get('package_id')})
36-
else:
37-
user_has_access = True
34+
user_has_access = not check_access or toolkit.h.check_access('package_update',
35+
{'id':res_dict.get('package_id')})
3836
url_type = res_dict.get('url_type')
3937
if url_type:
4038
try:

ckanext/xloader/jobs.py

+36-30
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
CHUNK_SIZE = 16 * 1024 # 16kb
4343
DOWNLOAD_TIMEOUT = 30
4444

45+
MAX_RETRIES = 1
4546
RETRYABLE_ERRORS = (
4647
errors.DeadlockDetected,
4748
errors.LockNotAvailable,
@@ -86,26 +87,50 @@ def xloader_data_into_datastore(input):
8687

8788
job_id = get_current_job().id
8889
errored = False
90+
91+
# Set-up logging to the db
92+
handler = StoringHandler(job_id, input)
93+
level = logging.DEBUG
94+
handler.setLevel(level)
95+
logger = logging.getLogger(job_id)
96+
handler.setFormatter(logging.Formatter('%(message)s'))
97+
logger.addHandler(handler)
98+
# also show logs on stderr
99+
logger.addHandler(logging.StreamHandler())
100+
logger.setLevel(logging.DEBUG)
101+
102+
db.init(config)
89103
try:
90-
xloader_data_into_datastore_(input, job_dict)
104+
# Store details of the job in the db
105+
db.add_pending_job(job_id, **input)
106+
xloader_data_into_datastore_(input, job_dict, logger)
91107
job_dict['status'] = 'complete'
92108
db.mark_job_as_completed(job_id, job_dict)
109+
except sa.exc.IntegrityError as e:
110+
db.mark_job_as_errored(job_id, str(e))
111+
job_dict['status'] = 'error'
112+
job_dict['error'] = str(e)
113+
log.error('xloader error: job_id %s already exists', job_id)
114+
errored = True
93115
except JobError as e:
94116
db.mark_job_as_errored(job_id, str(e))
95117
job_dict['status'] = 'error'
96118
job_dict['error'] = str(e)
97-
log.error('xloader error: {0}, {1}'.format(e, traceback.format_exc()))
119+
log.error('xloader error: %s, %s', e, traceback.format_exc())
98120
errored = True
99121
except Exception as e:
100122
if isinstance(e, RETRYABLE_ERRORS):
101123
tries = job_dict['metadata'].get('tries', 0)
102-
if tries == 0:
124+
if tries < MAX_RETRIES:
125+
tries = tries + 1
103126
log.info("Job %s failed due to temporary error [%s], retrying", job_id, e)
104127
job_dict['status'] = 'pending'
105-
job_dict['metadata']['tries'] = tries + 1
128+
job_dict['metadata']['tries'] = tries
106129
enqueue_job(
107130
xloader_data_into_datastore,
108131
[input],
132+
title="retry xloader_data_into_datastore: resource: {} attempt {}".format(
133+
job_dict['metadata']['resource_id'], tries),
109134
rq_kwargs=dict(timeout=RETRIED_JOB_TIMEOUT)
110135
)
111136
return None
@@ -114,7 +139,7 @@ def xloader_data_into_datastore(input):
114139
job_id, traceback.format_tb(sys.exc_info()[2])[-1] + repr(e))
115140
job_dict['status'] = 'error'
116141
job_dict['error'] = str(e)
117-
log.error('xloader error: {0}, {1}'.format(e, traceback.format_exc()))
142+
log.error('xloader error: %s, %s', e, traceback.format_exc())
118143
errored = True
119144
finally:
120145
# job_dict is defined in xloader_hook's docstring
@@ -125,7 +150,7 @@ def xloader_data_into_datastore(input):
125150
return 'error' if errored else None
126151

127152

128-
def xloader_data_into_datastore_(input, job_dict):
153+
def xloader_data_into_datastore_(input, job_dict, logger):
129154
'''This function:
130155
* downloads the resource (metadata) from CKAN
131156
* downloads the data
@@ -134,26 +159,6 @@ def xloader_data_into_datastore_(input, job_dict):
134159
135160
(datapusher called this function 'push_to_datastore')
136161
'''
137-
job_id = get_current_job().id
138-
db.init(config)
139-
140-
# Store details of the job in the db
141-
try:
142-
db.add_pending_job(job_id, **input)
143-
except sa.exc.IntegrityError:
144-
raise JobError('job_id {} already exists'.format(job_id))
145-
146-
# Set-up logging to the db
147-
handler = StoringHandler(job_id, input)
148-
level = logging.DEBUG
149-
handler.setLevel(level)
150-
logger = logging.getLogger(job_id)
151-
handler.setFormatter(logging.Formatter('%(message)s'))
152-
logger.addHandler(handler)
153-
# also show logs on stderr
154-
logger.addHandler(logging.StreamHandler())
155-
logger.setLevel(logging.DEBUG)
156-
157162
validate_input(input)
158163

159164
data = input['metadata']
@@ -197,10 +202,11 @@ def direct_load():
197202
loader.calculate_record_count(
198203
resource_id=resource['id'], logger=logger)
199204
set_datastore_active(data, resource, logger)
200-
job_dict['status'] = 'running_but_viewable'
201-
callback_xloader_hook(result_url=input['result_url'],
202-
api_key=api_key,
203-
job_dict=job_dict)
205+
if 'result_url' in input:
206+
job_dict['status'] = 'running_but_viewable'
207+
callback_xloader_hook(result_url=input['result_url'],
208+
api_key=api_key,
209+
job_dict=job_dict)
204210
logger.info('Data now available to users: %s', resource_ckan_url)
205211
loader.create_column_indexes(
206212
fields=fields,

0 commit comments

Comments
 (0)