Skip to content

Commit b26b0d5

Browse files
authored
Merge branch 'master' into priority-queueing
2 parents 083a05e + a17be76 commit b26b0d5

File tree

6 files changed

+60
-7
lines changed

6 files changed

+60
-7
lines changed

.python-version

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
3.7.4

.travis.yml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
language: python
22
sudo: false
3+
services:
4+
- mysql
35
python:
4-
- '2.7'
56
- '3.4'
67
- '3.6'
8+
- '3.7'
79
env:
810
- DJANGO_VERSION=1.8
911
- DJANGO_VERSION=1.9

README.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -181,9 +181,10 @@ The `workspace` flag is optional. If supplied, it must be a valid JSON string.
181181

182182
To start a worker:
183183

184-
manage.py worker [queue_name]
184+
manage.py worker [queue_name] [--rate_limit]
185185

186-
`queue_name` is optional, and will default to `default`
186+
- `queue_name` is optional, and will default to `default`
187+
- The `--rate_limit` flag is optional, and will default to `1`. It is the minimum number of seconds that must have elapsed before a subsequent job can be run.
187188

188189
## Testing
189190

django_dbq/management/commands/worker.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from django.db import transaction
22
from django.core.management.base import BaseCommand, CommandError
3+
from django.utils import timezone
34
from django.utils.module_loading import import_string
45
from django_dbq.models import Job
56
from simplesignals.process import WorkerProcessBase
@@ -58,13 +59,19 @@ class Worker(WorkerProcessBase):
5859

5960
process_title = "jobworker"
6061

61-
def __init__(self, name):
62+
def __init__(self, name, rate_limit_in_seconds):
6263
self.queue_name = name
64+
self.rate_limit_in_seconds = rate_limit_in_seconds
65+
self.last_job_finished = None
6366
super(Worker, self).__init__()
6467

6568
def do_work(self):
6669
sleep(1)
70+
if self.last_job_finished and (timezone.now() - self.last_job_finished).total_seconds() < self.rate_limit_in_seconds:
71+
return
72+
6773
process_job(self.queue_name)
74+
self.last_job_finished = timezone.now()
6875

6976

7077
class Command(BaseCommand):
@@ -73,6 +80,7 @@ class Command(BaseCommand):
7380

7481
def add_arguments(self, parser):
7582
parser.add_argument('queue_name', nargs='?', default='default', type=str)
83+
parser.add_argument('rate_limit', help='The rate limit in seconds. The default rate limit is 1 job per second.', nargs='?', default=1, type=int)
7684
parser.add_argument(
7785
'--dry-run',
7886
action='store_true',
@@ -89,10 +97,11 @@ def handle(self, *args, **options):
8997
raise CommandError("Please supply a single queue job name")
9098

9199
queue_name = options['queue_name']
100+
rate_limit_in_seconds = options['rate_limit']
92101

93-
self.stdout.write("Starting job worker for queue \"%s\"" % queue_name)
102+
self.stdout.write("Starting job worker for queue \"%s\" with rate limit %s/s" % (queue_name, rate_limit_in_seconds))
94103

95-
worker = Worker(queue_name)
104+
worker = Worker(queue_name, rate_limit_in_seconds)
96105

97106
if options['dry_run']:
98107
return

django_dbq/tests.py

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,13 @@
11
from datetime import datetime, timedelta
2+
import mock
3+
4+
import freezegun
25
from django.core.management import call_command, CommandError
36
from django.test import TestCase
47
from django.test.utils import override_settings
5-
from django_dbq.management.commands.worker import process_job
8+
from django.utils import timezone
9+
10+
from django_dbq.management.commands.worker import process_job, Worker
611
from django_dbq.models import Job
712
try:
813
from StringIO import StringIO
@@ -77,6 +82,39 @@ def test_worker_with_queue_name(self):
7782
self.assertTrue('test_queue' in output)
7883

7984

85+
@freezegun.freeze_time()
86+
@mock.patch('django_dbq.management.commands.worker.sleep')
87+
@mock.patch('django_dbq.management.commands.worker.process_job')
88+
class WorkerProcessDoWorkTestCase(TestCase):
89+
90+
def setUp(self):
91+
super().setUp()
92+
self.MockWorker = mock.MagicMock()
93+
self.MockWorker.queue_name = 'default'
94+
self.MockWorker.rate_limit_in_seconds = 5
95+
self.MockWorker.last_job_finished = None
96+
97+
def test_do_work_no_previous_job_run(self, mock_process_job, mock_sleep):
98+
Worker.do_work(self.MockWorker)
99+
self.assertEqual(mock_sleep.call_count, 1)
100+
self.assertEqual(mock_process_job.call_count, 1)
101+
self.assertEqual(self.MockWorker.last_job_finished, timezone.now())
102+
103+
def test_do_work_previous_job_too_soon(self, mock_process_job, mock_sleep):
104+
self.MockWorker.last_job_finished = timezone.now() - timezone.timedelta(seconds=2)
105+
Worker.do_work(self.MockWorker)
106+
self.assertEqual(mock_sleep.call_count, 1)
107+
self.assertEqual(mock_process_job.call_count, 0)
108+
self.assertEqual(self.MockWorker.last_job_finished, timezone.now() - timezone.timedelta(seconds=2))
109+
110+
def test_do_work_previous_job_long_time_ago(self, mock_process_job, mock_sleep):
111+
self.MockWorker.last_job_finished = timezone.now() - timezone.timedelta(seconds=7)
112+
Worker.do_work(self.MockWorker)
113+
self.assertEqual(mock_sleep.call_count, 1)
114+
self.assertEqual(mock_process_job.call_count, 1)
115+
self.assertEqual(self.MockWorker.last_job_finished, timezone.now())
116+
117+
80118
@override_settings(JOBS={'testjob': {'tasks': ['a']}})
81119
class JobTestCase(TestCase):
82120

test-requirements.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,4 @@
11
-r requirements.txt
22
pymysql==0.7.11
3+
freezegun==0.3.12
4+
mock==3.0.5

0 commit comments

Comments
 (0)