Skip to content

Commit

Permalink
Merge branch 'master' into feature/different-site-url
Browse files Browse the repository at this point in the history
  • Loading branch information
JVickery-TBS committed May 3, 2024
2 parents 928d770 + 67fabfc commit 7d715b9
Show file tree
Hide file tree
Showing 33 changed files with 1,219 additions and 310 deletions.
4 changes: 0 additions & 4 deletions .flake8
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,4 @@ max-line-length=127

# List ignore rules one per line.
ignore =
E501
C901
W503
F401
F403
18 changes: 18 additions & 0 deletions .github/dependabot.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
version: 2
registries:
python-index-pypi-org:
type: python-index
url: https://pypi.org/
replaces-base: true
username: "${{secrets.PYTHON_INDEX_PYPI_ORG_USERNAME}}"
password: "${{secrets.PYTHON_INDEX_PYPI_ORG_PASSWORD}}"

updates:
- package-ecosystem: pip
directory: "/"
schedule:
interval: daily
time: "19:00"
open-pull-requests-limit: 10
registries:
- python-index-pypi-org
10 changes: 10 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,16 @@ Configuration:

See the extension's `config_declaration.yaml <ckanext/xloader/config_declaration.yaml>`_ file.

This plugin also supports the `ckan.download_proxy` setting, to use a proxy server when downloading files.
This setting is shared with other plugins that download resource files, such as ckanext-archiver. Eg:

ckan.download_proxy = http://my-proxy:1234/

You may also wish to configure the database to use your preferred date input style on COPY.
For example, to make [PostgreSQL](https://www.postgresql.org/docs/current/runtime-config-client.html#RUNTIME-CONFIG-CLIENT-FORMAT)
expect European (day-first) dates, you could add to ``postgresql.conf``:

datestyle=ISO,DMY

------------------------
Developer installation
Expand Down
11 changes: 9 additions & 2 deletions ckanext/xloader/action.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,10 +152,17 @@ def xloader_submit(context, data_dict):
'original_url': resource_dict.get('url'),
}
}
timeout = config.get('ckanext.xloader.job_timeout', '3600')
# Expand timeout for resources that have to be type-guessed
timeout = config.get(
'ckanext.xloader.job_timeout',
'3600' if utils.datastore_resource_exists(res_id) else '10800')
log.debug("Timeout for XLoading resource %s is %s", res_id, timeout)

try:
job = enqueue_job(
jobs.xloader_data_into_datastore, [data], rq_kwargs=dict(timeout=timeout)
jobs.xloader_data_into_datastore, [data],
title="xloader_submit: package: {} resource: {}".format(resource_dict.get('package_id'), res_id),
rq_kwargs=dict(timeout=timeout)
)
except Exception:
log.exception('Unable to enqueued xloader res_id=%s', res_id)
Expand Down
3 changes: 1 addition & 2 deletions ckanext/xloader/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import sys
import logging
import ckan.plugins.toolkit as tk
from ckanext.xloader.utils import XLoaderFormats


class XloaderCmd:
Expand Down Expand Up @@ -84,8 +85,6 @@ def _submit_resource(self, resource, user, indent=0):
'''resource: resource dictionary
'''
indentation = ' ' * indent
# import here, so that that loggers are setup
from ckanext.xloader.plugin import XLoaderFormats

if not XLoaderFormats.is_it_an_xloader_format(resource['format']):
print(indentation
Expand Down
21 changes: 18 additions & 3 deletions ckanext/xloader/config_declaration.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,7 @@ groups:
default: 1_000_000_000
example: 100000
description: |
The connection string for the jobs database used by XLoader. The
default of an sqlite file is fine for development. For production use a
Postgresql database.
The maximum file size that XLoader will attempt to load.
type: int
required: false
- key: ckanext.xloader.use_type_guessing
Expand All @@ -48,6 +46,15 @@ groups:
type: bool
required: false
legacy_key: ckanext.xloader.just_load_with_messytables
- key: ckanext.xloader.max_type_guessing_length
default: 0
example: 100000
description: |
The maximum file size that will be passed to Tabulator if the
use_type_guessing flag is enabled. Larger files will use COPY even if
the flag is set. Defaults to 1/10 of the maximum content length.
type: int
required: false
- key: ckanext.xloader.parse_dates_dayfirst
default: False
example: False
Expand Down Expand Up @@ -112,5 +119,13 @@ groups:
to True.
type: bool
required: false
- key: ckanext.xloader.clean_datastore_tables
default: False
example: True
description: |
Enqueue jobs to remove Datastore tables from Resources that have a format
that is not in ckanext.xloader.formats after a Resource is updated.
type: bool
required: false


10 changes: 1 addition & 9 deletions ckanext/xloader/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,9 +191,7 @@ def add_pending_job(job_id, job_type, api_key,
if not metadata:
metadata = {}

conn = ENGINE.connect()
trans = conn.begin()
try:
with ENGINE.begin() as conn:
conn.execute(JOBS_TABLE.insert().values(
job_id=job_id,
job_type=job_type,
Expand Down Expand Up @@ -225,12 +223,6 @@ def add_pending_job(job_id, job_type, api_key,
)
if inserts:
conn.execute(METADATA_TABLE.insert(), inserts)
trans.commit()
except Exception:
trans.rollback()
raise
finally:
conn.close()


class InvalidErrorObjectError(Exception):
Expand Down
17 changes: 17 additions & 0 deletions ckanext/xloader/helpers.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import ckan.plugins.toolkit as toolkit
from ckanext.xloader.utils import XLoaderFormats


def xloader_status(resource_id):
Expand All @@ -25,3 +26,19 @@ def xloader_status_description(status):
return captions.get(status['status'], status['status'].capitalize())
else:
return _('Not Uploaded Yet')


def is_resource_supported_by_xloader(res_dict, check_access=True):
is_supported_format = XLoaderFormats.is_it_an_xloader_format(res_dict.get('format'))
is_datastore_active = res_dict.get('datastore_active', False)
user_has_access = not check_access or toolkit.h.check_access('package_update',
{'id':res_dict.get('package_id')})
url_type = res_dict.get('url_type')
if url_type:
try:
is_supported_url_type = url_type not in toolkit.h.datastore_rw_resource_url_types()
except AttributeError:
is_supported_url_type = (url_type == 'upload')
else:
is_supported_url_type = True
return (is_supported_format or is_datastore_active) and user_has_access and is_supported_url_type
59 changes: 42 additions & 17 deletions ckanext/xloader/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,36 +7,49 @@
import tempfile
import json
import datetime
import os
import traceback
import sys

from psycopg2 import errors
from six.moves.urllib.parse import urlsplit
import requests
from rq import get_current_job
import sqlalchemy as sa

from ckan import model
from ckan.plugins.toolkit import get_action, asbool, ObjectNotFound, config, check_ckan_version
from ckan.plugins.toolkit import get_action, asbool, enqueue_job, ObjectNotFound, config

from . import loader
from . import db
from . import db, loader
from .job_exceptions import JobError, HTTPError, DataTooBigError, FileCouldNotBeLoadedError
from .utils import set_resource_metadata
from .utils import datastore_resource_exists, set_resource_metadata

try:
from ckan.lib.api_token import get_user_from_token
except ImportError:
get_user_from_token = None

log = logging.getLogger(__name__)

SSL_VERIFY = asbool(config.get('ckanext.xloader.ssl_verify', True))
if not SSL_VERIFY:
requests.packages.urllib3.disable_warnings()

MAX_CONTENT_LENGTH = int(config.get('ckanext.xloader.max_content_length') or 1e9)
# Don't try Tabulator load on large files
MAX_TYPE_GUESSING_LENGTH = int(config.get('ckanext.xloader.max_type_guessing_length') or MAX_CONTENT_LENGTH / 10)
MAX_EXCERPT_LINES = int(config.get('ckanext.xloader.max_excerpt_lines') or 0)
CHUNK_SIZE = 16 * 1024 # 16kb
DOWNLOAD_TIMEOUT = 30

MAX_RETRIES = 1
RETRYABLE_ERRORS = (
errors.DeadlockDetected,
errors.LockNotAvailable,
errors.ObjectInUse,
)
RETRIED_JOB_TIMEOUT = config.get('ckanext.xloader.job_timeout', '3600')


# input = {
# 'api_key': user['apikey'],
Expand Down Expand Up @@ -80,16 +93,30 @@ def xloader_data_into_datastore(input):
db.mark_job_as_errored(job_id, str(e))
job_dict['status'] = 'error'
job_dict['error'] = str(e)
log = logging.getLogger(__name__)
log.error('xloader error: {0}, {1}'.format(e, traceback.format_exc()))
log.error('xloader error: %s, %s', e, traceback.format_exc())
errored = True
except Exception as e:
if isinstance(e, RETRYABLE_ERRORS):
tries = job_dict['metadata'].get('tries', 0)
if tries < MAX_RETRIES:
tries = tries + 1
log.info("Job %s failed due to temporary error [%s], retrying", job_id, e)
job_dict['status'] = 'pending'
job_dict['metadata']['tries'] = tries
enqueue_job(
xloader_data_into_datastore,
[input],
title="retry xloader_data_into_datastore: resource: {} attempt {}".format(
job_dict['metadata']['resource_id'], tries),
rq_kwargs=dict(timeout=RETRIED_JOB_TIMEOUT)
)
return None

db.mark_job_as_errored(
job_id, traceback.format_tb(sys.exc_info()[2])[-1] + repr(e))
job_dict['status'] = 'error'
job_dict['error'] = str(e)
log = logging.getLogger(__name__)
log.error('xloader error: {0}, {1}'.format(e, traceback.format_exc()))
log.error('xloader error: %s, %s', e, traceback.format_exc())
errored = True
finally:
# job_dict is defined in xloader_hook's docstring
Expand Down Expand Up @@ -206,11 +233,12 @@ def tabulator_load():
logger.info('Loading CSV')
# If ckanext.xloader.use_type_guessing is not configured, fall back to
# deprecated ckanext.xloader.just_load_with_messytables
use_type_guessing = asbool(config.get(
'ckanext.xloader.use_type_guessing', config.get(
'ckanext.xloader.just_load_with_messytables', False)))
logger.info("'use_type_guessing' mode is: %s",
use_type_guessing)
use_type_guessing = asbool(
config.get('ckanext.xloader.use_type_guessing', config.get(
'ckanext.xloader.just_load_with_messytables', False))) \
and not datastore_resource_exists(resource['id']) \
and os.path.getsize(tmp_file.name) <= MAX_TYPE_GUESSING_LENGTH
logger.info("'use_type_guessing' mode is: %s", use_type_guessing)
try:
if use_type_guessing:
tabulator_load()
Expand Down Expand Up @@ -539,8 +567,7 @@ def __init__(self, task_id, input):
self.input = input

def emit(self, record):
conn = db.ENGINE.connect()
try:
with db.ENGINE.connect() as conn:
# Turn strings into unicode to stop SQLAlchemy
# "Unicode type received non-unicode bind param value" warnings.
message = str(record.getMessage())
Expand All @@ -556,8 +583,6 @@ def emit(self, record):
module=module,
funcName=funcName,
lineno=record.lineno))
finally:
conn.close()


class DatetimeJsonEncoder(json.JSONEncoder):
Expand Down
Loading

0 comments on commit 7d715b9

Please sign in to comment.