7
7
import tempfile
8
8
import json
9
9
import datetime
10
+ import os
10
11
import traceback
11
12
import sys
12
13
14
+ from psycopg2 import errors
13
15
from six .moves .urllib .parse import urlsplit
14
16
import requests
15
17
from rq import get_current_job
16
18
import sqlalchemy as sa
17
19
18
20
from ckan import model
19
- from ckan .plugins .toolkit import get_action , asbool , ObjectNotFound , config
21
+ from ckan .plugins .toolkit import get_action , asbool , enqueue_job , ObjectNotFound , config
20
22
21
- from . import loader
22
- from . import db
23
+ from . import db , loader
23
24
from .job_exceptions import JobError , HTTPError , DataTooBigError , FileCouldNotBeLoadedError
24
- from .utils import set_resource_metadata
25
+ from .utils import datastore_resource_exists , set_resource_metadata
25
26
26
27
try :
27
28
from ckan .lib .api_token import get_user_from_token
28
29
except ImportError :
29
30
get_user_from_token = None
30
31
32
+ log = logging .getLogger (__name__ )
33
+
31
34
SSL_VERIFY = asbool (config .get ('ckanext.xloader.ssl_verify' , True ))
32
35
if not SSL_VERIFY :
33
36
requests .packages .urllib3 .disable_warnings ()
34
37
35
38
MAX_CONTENT_LENGTH = int (config .get ('ckanext.xloader.max_content_length' ) or 1e9 )
39
+ # Don't try Tabulator load on large files
40
+ MAX_TYPE_GUESSING_LENGTH = int (config .get ('ckanext.xloader.max_type_guessing_length' ) or MAX_CONTENT_LENGTH / 10 )
36
41
MAX_EXCERPT_LINES = int (config .get ('ckanext.xloader.max_excerpt_lines' ) or 0 )
37
42
CHUNK_SIZE = 16 * 1024 # 16kb
38
43
DOWNLOAD_TIMEOUT = 30
39
44
45
+ MAX_RETRIES = 1
46
+ RETRYABLE_ERRORS = (
47
+ errors .DeadlockDetected ,
48
+ errors .LockNotAvailable ,
49
+ errors .ObjectInUse ,
50
+ )
51
+ RETRIED_JOB_TIMEOUT = config .get ('ckanext.xloader.job_timeout' , '3600' )
52
+
40
53
41
54
# input = {
42
55
# 'api_key': user['apikey'],
@@ -80,16 +93,30 @@ def xloader_data_into_datastore(input):
80
93
db .mark_job_as_errored (job_id , str (e ))
81
94
job_dict ['status' ] = 'error'
82
95
job_dict ['error' ] = str (e )
83
- log = logging .getLogger (__name__ )
84
- log .error ('xloader error: {0}, {1}' .format (e , traceback .format_exc ()))
96
+ log .error ('xloader error: %s, %s' , e , traceback .format_exc ())
85
97
errored = True
86
98
except Exception as e :
99
+ if isinstance (e , RETRYABLE_ERRORS ):
100
+ tries = job_dict ['metadata' ].get ('tries' , 0 )
101
+ if tries < MAX_RETRIES :
102
+ tries = tries + 1
103
+ log .info ("Job %s failed due to temporary error [%s], retrying" , job_id , e )
104
+ job_dict ['status' ] = 'pending'
105
+ job_dict ['metadata' ]['tries' ] = tries
106
+ enqueue_job (
107
+ xloader_data_into_datastore ,
108
+ [input ],
109
+ title = "retry xloader_data_into_datastore: resource: {} attempt {}" .format (
110
+ job_dict ['metadata' ]['resource_id' ], tries ),
111
+ rq_kwargs = dict (timeout = RETRIED_JOB_TIMEOUT )
112
+ )
113
+ return None
114
+
87
115
db .mark_job_as_errored (
88
116
job_id , traceback .format_tb (sys .exc_info ()[2 ])[- 1 ] + repr (e ))
89
117
job_dict ['status' ] = 'error'
90
118
job_dict ['error' ] = str (e )
91
- log = logging .getLogger (__name__ )
92
- log .error ('xloader error: {0}, {1}' .format (e , traceback .format_exc ()))
119
+ log .error ('xloader error: %s, %s' , e , traceback .format_exc ())
93
120
errored = True
94
121
finally :
95
122
# job_dict is defined in xloader_hook's docstring
@@ -206,11 +233,12 @@ def tabulator_load():
206
233
logger .info ('Loading CSV' )
207
234
# If ckanext.xloader.use_type_guessing is not configured, fall back to
208
235
# deprecated ckanext.xloader.just_load_with_messytables
209
- use_type_guessing = asbool (config .get (
210
- 'ckanext.xloader.use_type_guessing' , config .get (
211
- 'ckanext.xloader.just_load_with_messytables' , False )))
212
- logger .info ("'use_type_guessing' mode is: %s" ,
213
- use_type_guessing )
236
+ use_type_guessing = asbool (
237
+ config .get ('ckanext.xloader.use_type_guessing' , config .get (
238
+ 'ckanext.xloader.just_load_with_messytables' , False ))) \
239
+ and not datastore_resource_exists (resource ['id' ]) \
240
+ and os .path .getsize (tmp_file .name ) <= MAX_TYPE_GUESSING_LENGTH
241
+ logger .info ("'use_type_guessing' mode is: %s" , use_type_guessing )
214
242
try :
215
243
if use_type_guessing :
216
244
tabulator_load ()
@@ -538,8 +566,7 @@ def __init__(self, task_id, input):
538
566
self .input = input
539
567
540
568
def emit (self , record ):
541
- conn = db .ENGINE .connect ()
542
- try :
569
+ with db .ENGINE .connect () as conn :
543
570
# Turn strings into unicode to stop SQLAlchemy
544
571
# "Unicode type received non-unicode bind param value" warnings.
545
572
message = str (record .getMessage ())
@@ -555,8 +582,6 @@ def emit(self, record):
555
582
module = module ,
556
583
funcName = funcName ,
557
584
lineno = record .lineno ))
558
- finally :
559
- conn .close ()
560
585
561
586
562
587
class DatetimeJsonEncoder (json .JSONEncoder ):
0 commit comments