Skip to content

Commit e454e90

Browse files
committed
Merge branch 'master-queensland' into feature/qld-gov-au/remove-unsupported-datastore-tables
# Conflicts: # ckanext/xloader/plugin.py ### RESOLVED.
2 parents 7fabca4 + afc45b7 commit e454e90

File tree

10 files changed

+231
-53
lines changed

10 files changed

+231
-53
lines changed

README.rst

+5
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,11 @@ Configuration:
191191

192192
See the extension's `config_declaration.yaml <ckanext/xloader/config_declaration.yaml>`_ file.
193193

194+
You may also wish to configure the database to use your preferred date input style on COPY.
195+
For example, to make [PostgreSQL](https://www.postgresql.org/docs/current/runtime-config-client.html#RUNTIME-CONFIG-CLIENT-FORMAT)
196+
expect European (day-first) dates, you could add to ``postgresql.conf``:
197+
198+
datestyle=ISO,DMY
194199

195200
------------------------
196201
Developer installation

ckanext/xloader/db.py

+1-9
Original file line numberDiff line numberDiff line change
@@ -191,9 +191,7 @@ def add_pending_job(job_id, job_type, api_key,
191191
if not metadata:
192192
metadata = {}
193193

194-
conn = ENGINE.connect()
195-
trans = conn.begin()
196-
try:
194+
with ENGINE.begin() as conn:
197195
conn.execute(JOBS_TABLE.insert().values(
198196
job_id=job_id,
199197
job_type=job_type,
@@ -225,12 +223,6 @@ def add_pending_job(job_id, job_type, api_key,
225223
)
226224
if inserts:
227225
conn.execute(METADATA_TABLE.insert(), inserts)
228-
trans.commit()
229-
except Exception:
230-
trans.rollback()
231-
raise
232-
finally:
233-
conn.close()
234226

235227

236228
class InvalidErrorObjectError(Exception):

ckanext/xloader/jobs.py

+25-5
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,14 @@
1111
import traceback
1212
import sys
1313

14+
from psycopg2 import errors
1415
from six.moves.urllib.parse import urlsplit
1516
import requests
1617
from rq import get_current_job
1718
import sqlalchemy as sa
1819

1920
from ckan import model
20-
from ckan.plugins.toolkit import get_action, asbool, ObjectNotFound, config
21+
from ckan.plugins.toolkit import get_action, asbool, enqueue_job, ObjectNotFound, config
2122

2223
from . import db, loader
2324
from .job_exceptions import JobError, HTTPError, DataTooBigError, FileCouldNotBeLoadedError
@@ -41,6 +42,15 @@
4142
CHUNK_SIZE = 16 * 1024 # 16kb
4243
DOWNLOAD_TIMEOUT = 30
4344

45+
RETRYABLE_ERRORS = (
46+
errors.DeadlockDetected,
47+
errors.LockNotAvailable,
48+
errors.ObjectInUse,
49+
)
50+
# Retries can only occur in cases where the datastore entry exists,
51+
# so use the standard timeout
52+
RETRIED_JOB_TIMEOUT = config.get('ckanext.xloader.job_timeout', '3600')
53+
4454

4555
# input = {
4656
# 'api_key': user['apikey'],
@@ -87,6 +97,19 @@ def xloader_data_into_datastore(input):
8797
log.error('xloader error: {0}, {1}'.format(e, traceback.format_exc()))
8898
errored = True
8999
except Exception as e:
100+
if isinstance(e, RETRYABLE_ERRORS):
101+
tries = job_dict['metadata'].get('tries', 0)
102+
if tries == 0:
103+
log.info("Job %s failed due to temporary error [%s], retrying", job_id, e)
104+
job_dict['status'] = 'pending'
105+
job_dict['metadata']['tries'] = tries + 1
106+
enqueue_job(
107+
xloader_data_into_datastore,
108+
[input],
109+
rq_kwargs=dict(timeout=RETRIED_JOB_TIMEOUT)
110+
)
111+
return None
112+
90113
db.mark_job_as_errored(
91114
job_id, traceback.format_tb(sys.exc_info()[2])[-1] + repr(e))
92115
job_dict['status'] = 'error'
@@ -541,8 +564,7 @@ def __init__(self, task_id, input):
541564
self.input = input
542565

543566
def emit(self, record):
544-
conn = db.ENGINE.connect()
545-
try:
567+
with db.ENGINE.connect() as conn:
546568
# Turn strings into unicode to stop SQLAlchemy
547569
# "Unicode type received non-unicode bind param value" warnings.
548570
message = str(record.getMessage())
@@ -558,8 +580,6 @@ def emit(self, record):
558580
module=module,
559581
funcName=funcName,
560582
lineno=record.lineno))
561-
finally:
562-
conn.close()
563583

564584

565585
class DatetimeJsonEncoder(json.JSONEncoder):

ckanext/xloader/loader.py

+106-36
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,49 @@ def detect_encoding(file_path):
7878
return detector.result # e.g. {'encoding': 'EUC-JP', 'confidence': 0.99}
7979

8080

81+
def _fields_match(fields, existing_fields, logger):
82+
''' Check whether all columns have the same names and types as previously,
83+
independent of ordering.
84+
'''
85+
# drop the generated '_id' field
86+
for index in range(len(existing_fields)):
87+
if existing_fields[index]['id'] == '_id':
88+
existing_fields.pop(index)
89+
break
90+
91+
# fail fast if number of fields doesn't match
92+
field_count = len(fields)
93+
if field_count != len(existing_fields):
94+
logger.info("Fields do not match; there are now %s fields but previously %s", field_count, len(existing_fields))
95+
return False
96+
97+
# ensure each field is present in both collections with the same type
98+
for index in range(field_count):
99+
field_id = fields[index]['id']
100+
for existing_index in range(field_count):
101+
existing_field_id = existing_fields[existing_index]['id']
102+
if field_id == existing_field_id:
103+
if fields[index]['type'] == existing_fields[existing_index]['type']:
104+
break
105+
else:
106+
logger.info("Fields do not match; new type for %s field is %s but existing type is %s",
107+
field_id, fields[index]["type"], existing_fields[existing_index]['type'])
108+
return False
109+
else:
110+
logger.info("Fields do not match; no existing entry found for %s", field_id)
111+
return False
112+
return True
113+
114+
115+
def _clear_datastore_resource(resource_id):
116+
''' Delete all records from the datastore table, without dropping the table itself.
117+
'''
118+
engine = get_write_engine()
119+
with engine.begin() as conn:
120+
conn.execute("SET LOCAL lock_timeout = '5s'")
121+
conn.execute('TRUNCATE TABLE "{}"'.format(resource_id))
122+
123+
81124
def load_csv(csv_filepath, resource_id, mimetype='text/csv', logger=None):
82125
'''Loads a CSV into DataStore. Does not create the indexes.'''
83126

@@ -140,34 +183,43 @@ def load_csv(csv_filepath, resource_id, mimetype='text/csv', logger=None):
140183
existing = datastore_resource_exists(resource_id)
141184
existing_info = {}
142185
if existing:
186+
existing_fields = existing.get('fields', [])
143187
existing_info = dict((f['id'], f['info'])
144-
for f in existing.get('fields', [])
188+
for f in existing_fields
145189
if 'info' in f)
146190

147-
'''
148-
Delete existing datastore table before proceeding. Otherwise
149-
the COPY will append to the existing table. And if
150-
the fields have significantly changed, it may also fail.
151-
'''
152-
logger.info('Deleting "{res_id}" from DataStore.'.format(
153-
res_id=resource_id))
154-
delete_datastore_resource(resource_id)
155-
156-
# Columns types are either set (overridden) in the Data Dictionary page
157-
# or default to text type (which is robust)
158-
fields = [
159-
{'id': header_name,
160-
'type': existing_info.get(header_name, {})
161-
.get('type_override') or 'text',
162-
}
163-
for header_name in headers]
191+
# Column types are either set (overridden) in the Data Dictionary page
192+
# or default to text type (which is robust)
193+
fields = [
194+
{'id': header_name,
195+
'type': existing_info.get(header_name, {})
196+
.get('type_override') or 'text',
197+
}
198+
for header_name in headers]
164199

165-
# Maintain data dictionaries from matching column names
166-
if existing_info:
200+
# Maintain data dictionaries from matching column names
167201
for f in fields:
168202
if f['id'] in existing_info:
169203
f['info'] = existing_info[f['id']]
170204

205+
'''
206+
Delete or truncate existing datastore table before proceeding,
207+
depending on whether any fields have changed.
208+
Otherwise the COPY will append to the existing table.
209+
And if the fields have significantly changed, it may also fail.
210+
'''
211+
if _fields_match(fields, existing_fields, logger):
212+
logger.info('Clearing records for "%s" from DataStore.', resource_id)
213+
_clear_datastore_resource(resource_id)
214+
else:
215+
logger.info('Deleting "%s" from DataStore.', resource_id)
216+
delete_datastore_resource(resource_id)
217+
else:
218+
fields = [
219+
{'id': header_name,
220+
'type': 'text'}
221+
for header_name in headers]
222+
171223
logger.info('Fields: %s', fields)
172224

173225
# Create table
@@ -281,6 +333,18 @@ def create_column_indexes(fields, resource_id, logger):
281333
logger.info('...column indexes created.')
282334

283335

336+
def _save_type_overrides(headers_dicts):
337+
# copy 'type' to 'type_override' if it's not the default type (text)
338+
# and there isn't already an override in place
339+
for h in headers_dicts:
340+
if h['type'] != 'text':
341+
if 'info' in h:
342+
if 'type_override' not in h['info']:
343+
h['info']['type_override'] = h['type']
344+
else:
345+
h['info'] = {'type_override': h['type']}
346+
347+
284348
def load_table(table_filepath, resource_id, mimetype='text/csv', logger=None):
285349
'''Loads an Excel file (or other tabular data recognized by tabulator)
286350
into Datastore and creates indexes.
@@ -311,9 +375,10 @@ def load_table(table_filepath, resource_id, mimetype='text/csv', logger=None):
311375
existing = datastore_resource_exists(resource_id)
312376
existing_info = None
313377
if existing:
378+
existing_fields = existing.get('fields', [])
314379
existing_info = dict(
315380
(f['id'], f['info'])
316-
for f in existing.get('fields', []) if 'info' in f)
381+
for f in existing_fields if 'info' in f)
317382

318383
# Some headers might have been converted from strings to floats and such.
319384
headers = encode_headers(headers)
@@ -349,16 +414,6 @@ def row_iterator():
349414
yield data_row
350415
result = row_iterator()
351416

352-
'''
353-
Delete existing datstore resource before proceeding. Otherwise
354-
'datastore_create' will append to the existing datastore. And if
355-
the fields have significantly changed, it may also fail.
356-
'''
357-
if existing:
358-
logger.info('Deleting "{res_id}" from datastore.'.format(
359-
res_id=resource_id))
360-
delete_datastore_resource(resource_id)
361-
362417
headers_dicts = [dict(id=field[0], type=TYPE_MAPPING[str(field[1])])
363418
for field in zip(headers, types)]
364419

@@ -372,8 +427,24 @@ def row_iterator():
372427
if type_override in list(_TYPE_MAPPING.values()):
373428
h['type'] = type_override
374429

375-
logger.info('Determined headers and types: {headers}'.format(
376-
headers=headers_dicts))
430+
# preserve any types that we have sniffed unless told otherwise
431+
_save_type_overrides(headers_dicts)
432+
433+
logger.info('Determined headers and types: %s', headers_dicts)
434+
435+
'''
436+
Delete or truncate existing datastore table before proceeding,
437+
depending on whether any fields have changed.
438+
Otherwise 'datastore_create' will append to the existing datastore.
439+
And if the fields have significantly changed, it may also fail.
440+
'''
441+
if existing:
442+
if _fields_match(headers_dicts, existing_fields, logger):
443+
logger.info('Clearing records for "%s" from DataStore.', resource_id)
444+
_clear_datastore_resource(resource_id)
445+
else:
446+
logger.info('Deleting "%s" from datastore.', resource_id)
447+
delete_datastore_resource(resource_id)
377448

378449
logger.info('Copying to database...')
379450
count = 0
@@ -382,7 +453,7 @@ def row_iterator():
382453
non_empty_types = ['timestamp', 'numeric']
383454
for i, records in enumerate(chunky(result, 250)):
384455
count += len(records)
385-
logger.info('Saving chunk {number}'.format(number=i))
456+
logger.info('Saving chunk %s', i)
386457
for row in records:
387458
for column_index, column_name in enumerate(row):
388459
if headers_dicts[column_index]['type'] in non_empty_types and row[column_name] == '':
@@ -391,8 +462,7 @@ def row_iterator():
391462
logger.info('...copying done')
392463

393464
if count:
394-
logger.info('Successfully pushed {n} entries to "{res_id}".'.format(
395-
n=count, res_id=resource_id))
465+
logger.info('Successfully pushed %s entries to "%s".', count, resource_id)
396466
else:
397467
# no datastore table is created
398468
raise LoaderError('No entries found - nothing to load')

ckanext/xloader/plugin.py

+3
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,9 @@ def notify(self, entity, operation):
9595
if _should_remove_unsupported_resource_from_datastore(resource_dict):
9696
toolkit.enqueue_job(fn=_remove_unsupported_resource_from_datastore, args=[entity.id])
9797

98+
if not getattr(entity, 'url_changed', False):
99+
return
100+
98101
self._submit_to_xloader(resource_dict)
99102

100103
# IResourceController
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
{% extends "page.html" %}
2+
3+
{% block subtitle %}{{ _("Confirm Delete") }}{% endblock %}
4+
5+
{% block maintag %}<div class="row" role="main">{% endblock %}
6+
7+
{% block main_content %}
8+
<section class="module col-md-6 col-md-offset-3">
9+
<div class="module-content">
10+
{% block form %}
11+
<p>{{ _('Are you sure you want to delete the DataStore and Data Dictionary?') }}</p>
12+
<p class="form-actions">
13+
<form action="{{ h.url_for('xloader.delete_datastore_table', id=package_id, resource_id=resource_id) }}" method="post">
14+
{{ h.csrf_input() if 'csrf_input' in h }}
15+
<button class="btn btn-danger" type="submit" name="cancel" >{{ _('Cancel') }}</button>
16+
<button class="btn btn-primary" type="submit" name="delete" >{{ _('Confirm Delete') }}</button>
17+
</form>
18+
</p>
19+
{% endblock %}
20+
</div>
21+
</section>
22+
{% endblock %}

ckanext/xloader/templates/xloader/resource_data.html

+17-2
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,26 @@
44

55
{% block primary_content_inner %}
66

7-
{% set action = h.url_for('xloader.resource_data', id=pkg.name, resource_id=res.id) %}
87
{% set show_table = true %}
98

9+
{% block delete_ds_button %}
10+
{% if res.datastore_active %}
11+
{% set delete_action = h.url_for('xloader.delete_datastore_table', id=pkg.id, resource_id=res.id) %}
12+
<form method="post" action="{{ delete_action }}" class="mb-3 d-inline-block pull-right">
13+
{{ h.csrf_input() if 'csrf_input' in h }}
14+
<a href="{{ delete_action }}"
15+
class="btn btn-danger pull-left"
16+
type="submit"
17+
data-module="confirm-action"
18+
data-module-with-data=true
19+
data-module-content="{{ _('Are you sure you want to delete the DataStore and Data Dictionary?') }}"
20+
>{% block delete_datastore_button_text %}<i class="fa fa-remove"></i> {{ _('Delete from DataStore') }}{% endblock %}</a>
21+
</form>
22+
{% endif %}
23+
{% endblock %}
24+
1025
{% block upload_ds_button %}
11-
<form method="post" action="{{ action }}" class="datapusher-form">
26+
<form method="post" action="{{ h.url_for('xloader.resource_data', id=pkg.name, resource_id=res.id) }}" class="datapusher-form mb-3 d-inline-block">
1227
{{ h.csrf_input() if 'csrf_input' in h }}
1328
<button class="btn btn-primary" name="save" type="submit">
1429
<i class="fa fa-cloud-upload"></i> {{ _('Upload to DataStore') }}

ckanext/xloader/tests/test_loader.py

+8
Original file line numberDiff line numberDiff line change
@@ -949,6 +949,14 @@ def test_simple(self, Session):
949949
u"numeric",
950950
u"text",
951951
]
952+
# Check that the sniffed types have been recorded as overrides
953+
rec = p.toolkit.get_action("datastore_search")(
954+
None, {"resource_id": resource_id, "limit": 0}
955+
)
956+
fields = [f for f in rec["fields"] if not f["id"].startswith("_")]
957+
assert fields[0].get("info", {}).get("type_override", "") == "timestamp"
958+
assert fields[1].get("info", {}).get("type_override", "") == "numeric"
959+
assert fields[2].get("info", {}).get("type_override", "") == ""
952960

953961
# test disabled by default to avoid adding large file to repo and slow test
954962
@pytest.mark.skip

ckanext/xloader/utils.py

+1
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ def set_resource_metadata(update_dict):
113113
# better fix
114114

115115
q = model.Session.query(model.Resource). \
116+
with_for_update(of=model.Resource). \
116117
filter(model.Resource.id == update_dict['resource_id'])
117118
resource = q.one()
118119

0 commit comments

Comments
 (0)