Skip to content

Commit 7cc9266

Browse files
authored
Merge pull request #25 from dabapps/priority-queueing
Priority queueing
2 parents a17be76 + b26b0d5 commit 7cc9266

File tree

3 files changed

+69
-3
lines changed

3 files changed

+69
-3
lines changed
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
# -*- coding: utf-8 -*-
2+
# Generated by Django 1.11 on 2018-07-13 10:00
3+
from __future__ import unicode_literals
4+
5+
from django.db import migrations, models
6+
7+
8+
class Migration(migrations.Migration):
9+
10+
dependencies = [
11+
('django_dbq', '0002_auto_20151016_1027'),
12+
]
13+
14+
operations = [
15+
migrations.AlterModelOptions(
16+
name='job',
17+
options={'ordering': ['-priority', 'created']},
18+
),
19+
migrations.AddField(
20+
model_name='job',
21+
name='priority',
22+
field=models.SmallIntegerField(db_index=True, default=0),
23+
),
24+
]

django_dbq/models.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020

2121

2222
class JobManager(models.Manager):
23-
2423
def get_ready_or_none(self, queue_name, max_retries=3):
2524
"""
2625
Get a job in state READY or NEW for a given queue. Supports retrying in case of database deadlock
@@ -40,7 +39,7 @@ def get_ready_or_none(self, queue_name, max_retries=3):
4039
retries_left = max_retries
4140
while True:
4241
try:
43-
return self.select_for_update().filter(queue_name=queue_name, state__in=(Job.STATES.READY, Job.STATES.NEW)).first()
42+
return self.to_process(queue_name).first()
4443
except Exception as e:
4544
if retries_left == 0:
4645
raise
@@ -56,6 +55,9 @@ def delete_old(self):
5655
logger.info("Deleting all job in states %s created before %s", ", ".join(delete_jobs_in_states), delete_jobs_created_before.isoformat())
5756
Job.objects.filter(state__in=delete_jobs_in_states, created__lte=delete_jobs_created_before).delete()
5857

58+
def to_process(self, queue_name):
59+
return self.select_for_update().filter(queue_name=queue_name, state__in=(Job.STATES.READY, Job.STATES.NEW))
60+
5961

6062
class Job(models.Model):
6163

@@ -69,9 +71,10 @@ class Job(models.Model):
6971
next_task = models.CharField(max_length=100, blank=True)
7072
workspace = JSONField(null=True)
7173
queue_name = models.CharField(max_length=20, default='default', db_index=True)
74+
priority = models.SmallIntegerField(default=0, db_index=True)
7275

7376
class Meta:
74-
ordering = ['created']
77+
ordering = ['-priority', 'created']
7578

7679
objects = JobManager()
7780

django_dbq/tests.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,45 @@ def test_get_next_ready_job(self):
138138

139139
self.assertEqual(Job.objects.get_ready_or_none('default'), expected)
140140

141+
def test_gets_jobs_in_priority_order(self):
142+
job_1 = Job.objects.create(name='testjob')
143+
job_2 = Job.objects.create(name='testjob', state=Job.STATES.PROCESSING)
144+
job_3 = Job.objects.create(name='testjob', priority=3)
145+
job_4 = Job.objects.create(name='testjob', priority=2)
146+
self.assertEqual({
147+
job for job in Job.objects.to_process('default')
148+
}, {
149+
job_3, job_4, job_1
150+
})
151+
self.assertEqual(Job.objects.get_ready_or_none('default'), job_3)
152+
self.assertFalse(Job.objects.to_process('default').filter(id=job_2.id).exists())
153+
154+
def test_gets_jobs_in_negative_priority_order(self):
155+
job_1 = Job.objects.create(name='testjob')
156+
job_2 = Job.objects.create(name='testjob', state=Job.STATES.PROCESSING)
157+
job_3 = Job.objects.create(name='testjob', priority=-2)
158+
job_4 = Job.objects.create(name='testjob', priority=1)
159+
self.assertEqual({
160+
job for job in Job.objects.to_process('default')
161+
}, {
162+
job_4, job_3, job_1
163+
})
164+
self.assertEqual(Job.objects.get_ready_or_none('default'), job_4)
165+
self.assertFalse(Job.objects.to_process('default').filter(id=job_2.id).exists())
166+
167+
def test_gets_jobs_in_priority_and_date_order(self):
168+
job_1 = Job.objects.create(name='testjob', priority=3)
169+
job_2 = Job.objects.create(name='testjob', state=Job.STATES.PROCESSING, priority=3)
170+
job_3 = Job.objects.create(name='testjob', priority=3)
171+
job_4 = Job.objects.create(name='testjob', priority=3)
172+
self.assertEqual({
173+
job for job in Job.objects.to_process('default')
174+
}, {
175+
job_1, job_3, job_4
176+
})
177+
self.assertEqual(Job.objects.get_ready_or_none('default'), job_1)
178+
self.assertFalse(Job.objects.to_process('default').filter(id=job_2.id).exists())
179+
141180
def test_get_next_ready_job_created(self):
142181
"""
143182
Created jobs should be picked too.

0 commit comments

Comments
 (0)