Skip to content

Commit 1678dff

Browse files
authored
Merge pull request #1054 from scitran/job-improvements
Job improvements
2 parents 568f286 + fe080e8 commit 1678dff

File tree

4 files changed

+78
-16
lines changed

4 files changed

+78
-16
lines changed

api/api.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,7 @@ def prefix(path, routes):
132132
prefix('/jobs', [
133133
route('/next', JobsHandler, h='next', m=['GET']),
134134
route('/stats', JobsHandler, h='stats', m=['GET']),
135+
route('/pending', JobsHandler, h='pending', m=['GET']),
135136
route('/reap', JobsHandler, h='reap_stale', m=['POST']),
136137
route('/add', JobsHandler, h='add', m=['POST']),
137138
route('/<:[^/]+>', JobHandler),

api/jobs/handlers.py

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -287,7 +287,29 @@ def stats(self):
287287
if not self.superuser_request and not self.user_is_admin:
288288
self.abort(403, 'Request requires admin')
289289

290-
return Queue.get_statistics()
290+
all_flag = self.is_true('all')
291+
unique = self.is_true('unique')
292+
tags = self.request.GET.getall('tags')
293+
last = self.request.GET.get('last')
294+
295+
# Allow for tags to be specified multiple times, or just comma-deliminated
296+
if len(tags) == 1:
297+
tags = tags[0].split(',')
298+
299+
if last is not None:
300+
last = int(last)
301+
302+
return Queue.get_statistics(tags=tags, last=last, unique=unique, all_flag=all_flag)
303+
304+
def pending(self):
305+
if not self.superuser_request and not self.user_is_admin:
306+
self.abort(403, 'Request requires admin')
307+
308+
tags = self.request.GET.getall('tags')
309+
if len(tags) == 1:
310+
tags = tags[0].split(',')
311+
312+
return Queue.get_pending(tags=tags)
291313

292314
def next(self):
293315

@@ -325,7 +347,7 @@ def get(self, _id):
325347

326348
def get_config(self, _id):
327349
"""Get a job's config"""
328-
if not self.superuser_request:
350+
if not self.superuser_request and not self.user_is_admin:
329351
self.abort(403, 'Request requires superuser')
330352

331353
j = Job.get(_id)
@@ -373,10 +395,12 @@ def get_config(self, _id):
373395

374396
encoded = pseudo_consistent_json_encode(c)
375397
self.response.app_iter = StringIO.StringIO(encoded)
398+
self.response.headers['Content-Length'] = str(len(encoded.encode('utf-8'))) # must be set after app_iter
376399
else:
377400
# Legacy behavior
378401
encoded = pseudo_consistent_json_encode({"config": c})
379402
self.response.app_iter = StringIO.StringIO(encoded)
403+
self.response.headers['Content-Length'] = str(len(encoded.encode('utf-8'))) # must be set after app_iter
380404

381405
@require_login
382406
def put(self, _id):

api/jobs/queue.py

Lines changed: 37 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -360,30 +360,53 @@ def search(containers, states=None, tags=None):
360360
])
361361

362362
@staticmethod
363-
def get_statistics():
363+
def get_statistics(tags=None, last=None, unique=False, all_flag=False):
364364
"""
365365
Return a variety of interesting information about the job queue.
366366
"""
367367

368-
# Count jobs by state
369-
result = config.db.jobs.aggregate([{"$group": {"_id": "$state", "count": {"$sum": 1}}}])
370-
# Map mongo result to a useful object
368+
if all_flag:
369+
unique = True
370+
if last is None:
371+
last = 3
372+
373+
results = { }
374+
match = { } # match all jobs
375+
376+
if tags is not None and len(tags) > 0:
377+
match = { 'tags': {'$in': tags } } # match only jobs with given tags
378+
379+
# Count jobs by state, mapping the mongo result to a useful object
380+
result = list(config.db.jobs.aggregate([{'$match': match }, {'$group': {'_id': '$state', 'count': {'$sum': 1}}}]))
371381
by_state = {s: 0 for s in JOB_STATES}
372382
by_state.update({r['_id']: r['count'] for r in result})
383+
results['states'] = by_state
384+
385+
# List unique tags
386+
if unique:
387+
results['unique'] = sorted(config.db.jobs.distinct('tags'))
388+
389+
# List recently modified jobs for each state
390+
if last is not None:
391+
results['recent'] = {s: config.db.jobs.find({'$and': [match, {'state': s}]}, {'modified':1}).sort([('modified', pymongo.DESCENDING)]).limit(last) for s in JOB_STATES}
373392

374-
# Count jobs by tag grouping
375-
result = list(config.db.jobs.aggregate([{"$group": {"_id": "$tags", "count": {"$sum": 1}}}]))
376-
by_tag = []
377-
for r in result:
378-
by_tag.append({'tags': r['_id'], 'count': r['count']})
393+
return results
379394

380-
# Count jobs that will not be retried
381-
permafailed = config.db.jobs.count({"attempt": {"$gte": max_attempts()}, "state":"failed"})
395+
@staticmethod
396+
def get_pending(tags=None):
397+
"""
398+
Returns the same format as get_statistics, but only the pending number.
399+
Designed to be as efficient as possible for frequent polling :(
400+
"""
401+
402+
match = { } # match all jobs
403+
if tags is not None and len(tags) > 0:
404+
match = { 'tags': {'$in': tags } } # match only jobs with given tags
382405

383406
return {
384-
'by-state': by_state,
385-
'by-tag': by_tag,
386-
'permafailed': permafailed
407+
'states': {
408+
'pending': config.db.jobs.count({'$and': [match, {'state': 'pending'}]})
409+
}
387410
}
388411

389412
@staticmethod

tests/integration_tests/python/test_jobs.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,9 @@ def test_jobs_access(as_user):
1010
r = as_user.get('/jobs/stats')
1111
assert r.status_code == 403
1212

13+
r = as_user.get('/jobs/pending')
14+
assert r.status_code == 403
15+
1316
r = as_user.post('/jobs/reap')
1417
assert r.status_code == 403
1518

@@ -328,6 +331,17 @@ def test_jobs(data_builder, default_payload, as_public, as_user, as_admin, as_ro
328331
assert "The job did not report in for a long time and was canceled." in [log["msg"] for log in r.json()['logs']]
329332
api_db.jobs.delete_one({"_id": bson.ObjectId("5a007cdb0f352600d94c845f")})
330333

334+
r = as_admin.get('/jobs/stats')
335+
assert r.ok
336+
r = as_admin.get('/jobs/pending')
337+
assert r.ok
338+
r = as_admin.get('/jobs/pending', params={'tags': 'auto,unused'})
339+
assert r.ok
340+
r = as_admin.get('/jobs/stats', params={'all': '1'})
341+
assert r.ok
342+
r = as_admin.get('/jobs/stats', params={'tags': 'auto,unused', 'last': '2'})
343+
assert r.ok
344+
331345

332346
def test_failed_job_output(data_builder, default_payload, as_user, as_admin, as_drone, api_db, file_form):
333347
# create gear

0 commit comments

Comments
 (0)