Skip to content

Commit 1f901b3

Browse files
committed
Add a job cancelled status, shutdown behavior, some atomic usage
1 parent 42cc20d commit 1f901b3

File tree

5 files changed

+187
-78
lines changed

5 files changed

+187
-78
lines changed

bolt-jobs/bolt/jobs/admin.py

+4-1
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ def _td_format(td_object):
3030
if seconds > period_seconds:
3131
period_value, seconds = divmod(seconds, period_seconds)
3232
has_s = "s" if period_value > 1 else ""
33-
strings.append("%s %s%s" % (period_value, period_name, has_s))
33+
strings.append(f"{period_value} {period_name}{has_s}")
3434

3535
return ", ".join(strings)
3636

@@ -147,6 +147,7 @@ class ListView(AdminModelListView):
147147
filters = [
148148
"Successful",
149149
"Errored",
150+
"Cancelled",
150151
"Lost",
151152
"Retried",
152153
]
@@ -166,6 +167,8 @@ def get_initial_queryset(self):
166167
return queryset.successful()
167168
if self.filter == "Errored":
168169
return queryset.errored()
170+
if self.filter == "Cancelled":
171+
return queryset.cancelled()
169172
if self.filter == "Lost":
170173
return queryset.lost()
171174
if self.filter == "Retried":

bolt-jobs/bolt/jobs/cli.py

+14-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import datetime
22
import logging
3+
import signal
34

45
import click
56

@@ -40,11 +41,22 @@ def cli():
4041
envvar="BOLT_JOBS_STATS_EVERY",
4142
)
4243
def worker(max_processes, max_jobs_per_process, stats_every):
43-
Worker(
44+
worker = Worker(
4445
max_processes=max_processes,
4546
max_jobs_per_process=max_jobs_per_process,
4647
stats_every=stats_every,
47-
).run()
48+
)
49+
50+
def _shutdown(signalnum, _):
51+
logger.info("Job worker shutdown signal received signalnum=%s", signalnum)
52+
worker.shutdown()
53+
54+
# Allow the worker to be stopped gracefully on SIGTERM
55+
signal.signal(signal.SIGTERM, _shutdown)
56+
signal.signal(signal.SIGINT, _shutdown)
57+
58+
# Start processing jobs
59+
worker.run()
4860

4961

5062
@cli.command()
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
# Generated by Bolt 5.0.dev20240118212224 on 2024-01-18 21:53
2+
3+
from bolt.db import migrations, models
4+
5+
6+
class Migration(migrations.Migration):
7+
dependencies = [
8+
("boltqueue", "0014_job_unique_key_jobrequest_unique_key_and_more"),
9+
]
10+
11+
operations = [
12+
migrations.AddField(
13+
model_name="job",
14+
name="worker_uuid",
15+
field=models.UUIDField(blank=True, db_index=True, null=True),
16+
),
17+
migrations.AddField(
18+
model_name="jobresult",
19+
name="worker_uuid",
20+
field=models.UUIDField(blank=True, db_index=True, null=True),
21+
),
22+
migrations.AlterField(
23+
model_name="jobresult",
24+
name="status",
25+
field=models.CharField(
26+
choices=[
27+
("SUCCESSFUL", "Successful"),
28+
("ERRORED", "Errored"),
29+
("CANCELLED", "Cancelled"),
30+
("LOST", "Lost"),
31+
],
32+
db_index=True,
33+
max_length=20,
34+
),
35+
),
36+
]

bolt-jobs/bolt/jobs/models.py

+62-36
Original file line numberDiff line numberDiff line change
@@ -61,24 +61,26 @@ class Meta:
6161
def __str__(self):
6262
return f"{self.job_class} [{self.uuid}]"
6363

64-
def convert_to_job(self):
64+
def convert_to_job(self, *, worker_uuid=None):
6565
"""
6666
JobRequests are the pending jobs that are waiting to be executed.
6767
We immediately convert them to JobResults when they are picked up.
6868
"""
69-
result = Job.objects.create(
70-
job_request_uuid=self.uuid,
71-
job_class=self.job_class,
72-
parameters=self.parameters,
73-
priority=self.priority,
74-
source=self.source,
75-
retries=self.retries,
76-
retry_attempt=self.retry_attempt,
77-
unique_key=self.unique_key,
78-
)
69+
with transaction.atomic():
70+
result = Job.objects.create(
71+
job_request_uuid=self.uuid,
72+
job_class=self.job_class,
73+
parameters=self.parameters,
74+
priority=self.priority,
75+
source=self.source,
76+
retries=self.retries,
77+
retry_attempt=self.retry_attempt,
78+
unique_key=self.unique_key,
79+
worker_uuid=worker_uuid,
80+
)
7981

80-
# Delete the pending JobRequest now
81-
self.delete()
82+
# Delete the pending JobRequest now
83+
self.delete()
8284

8385
return result
8486

@@ -110,6 +112,9 @@ class Job(models.Model):
110112
created_at = models.DateTimeField(auto_now_add=True, db_index=True)
111113
started_at = models.DateTimeField(blank=True, null=True, db_index=True)
112114

115+
# To associate with a worker
116+
worker_uuid = models.UUIDField(blank=True, null=True, db_index=True)
117+
113118
# From the JobRequest
114119
job_request_uuid = models.UUIDField(db_index=True)
115120
job_class = models.CharField(max_length=255, db_index=True)
@@ -152,26 +157,29 @@ def convert_to_result(self, *, status, error=""):
152157
"""
153158
Convert this Job to a JobResult.
154159
"""
155-
result = JobResult.objects.create(
156-
ended_at=timezone.now(),
157-
error=error,
158-
status=status,
159-
# From the Job
160-
job_uuid=self.uuid,
161-
started_at=self.started_at,
162-
# From the JobRequest
163-
job_request_uuid=self.job_request_uuid,
164-
job_class=self.job_class,
165-
parameters=self.parameters,
166-
priority=self.priority,
167-
source=self.source,
168-
retries=self.retries,
169-
retry_attempt=self.retry_attempt,
170-
unique_key=self.unique_key,
171-
)
160+
with transaction.atomic():
161+
result = JobResult.objects.create(
162+
ended_at=timezone.now(),
163+
error=error,
164+
status=status,
165+
# From the worker
166+
worker_uuid=self.worker_uuid,
167+
# From the Job
168+
job_uuid=self.uuid,
169+
started_at=self.started_at,
170+
# From the JobRequest
171+
job_request_uuid=self.job_request_uuid,
172+
job_class=self.job_class,
173+
parameters=self.parameters,
174+
priority=self.priority,
175+
source=self.source,
176+
retries=self.retries,
177+
retry_attempt=self.retry_attempt,
178+
unique_key=self.unique_key,
179+
)
172180

173-
# Delete the Job now
174-
self.delete()
181+
# Delete the Job now
182+
self.delete()
175183

176184
return result
177185

@@ -180,6 +188,9 @@ class JobResultQuerySet(models.QuerySet):
180188
def successful(self):
181189
return self.filter(status=JobResultStatuses.SUCCESSFUL)
182190

191+
def cancelled(self):
192+
return self.filter(status=JobResultStatuses.CANCELLED)
193+
183194
def lost(self):
184195
return self.filter(status=JobResultStatuses.LOST)
185196

@@ -192,19 +203,31 @@ def retried(self):
192203
| models.Q(retry_attempt__gt=0)
193204
)
194205

195-
def retry_failed_jobs(self):
196-
for result in self.filter(
197-
status__in=[JobResultStatuses.ERRORED, JobResultStatuses.LOST],
206+
def failed(self):
207+
return self.filter(
208+
status__in=[
209+
JobResultStatuses.ERRORED,
210+
JobResultStatuses.LOST,
211+
JobResultStatuses.CANCELLED,
212+
]
213+
)
214+
215+
def retryable(self):
216+
return self.failed().filter(
198217
retry_job_request_uuid__isnull=True,
199218
retries__gt=0,
200219
retry_attempt__lt=models.F("retries"),
201-
):
220+
)
221+
222+
def retry_failed_jobs(self):
223+
for result in self.retryable():
202224
result.retry_job()
203225

204226

205227
class JobResultStatuses(models.TextChoices):
206228
SUCCESSFUL = "SUCCESSFUL", "Successful"
207229
ERRORED = "ERRORED", "Errored" # Threw an error
230+
CANCELLED = "CANCELLED", "Cancelled" # Cancelled (probably by deploy)
208231
LOST = (
209232
"LOST",
210233
"Lost",
@@ -219,6 +242,9 @@ class JobResult(models.Model):
219242
uuid = models.UUIDField(default=uuid.uuid4, editable=False, unique=True)
220243
created_at = models.DateTimeField(auto_now_add=True, db_index=True)
221244

245+
# To associate with a worker
246+
worker_uuid = models.UUIDField(blank=True, null=True, db_index=True)
247+
222248
# From the Job
223249
job_uuid = models.UUIDField(db_index=True)
224250
started_at = models.DateTimeField(blank=True, null=True, db_index=True)

bolt-jobs/bolt/jobs/workers.py

+71-39
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,14 @@
22
import multiprocessing
33
import os
44
import time
5+
import uuid
56
from concurrent.futures import ProcessPoolExecutor
67

78
from bolt.db import transaction
89
from bolt.logs import app_logger
910
from bolt.signals import request_finished, request_started
1011

11-
from .models import Job, JobRequest, JobResult
12+
from .models import Job, JobRequest, JobResult, JobResultStatuses
1213

1314
logger = logging.getLogger("bolt.jobs")
1415

@@ -27,49 +28,71 @@ def __init__(self, max_processes=None, max_jobs_per_process=None, stats_every=No
2728
self.max_processes = self.executor._max_workers
2829
self.max_jobs_per_process = max_jobs_per_process
2930

31+
self.uuid = uuid.uuid4()
32+
3033
def run(self):
3134
logger.info(
3235
"Starting job worker with %s max processes",
3336
self.max_processes,
3437
)
3538

36-
try:
37-
while True:
38-
try:
39-
self.maybe_log_stats()
40-
self.maybe_check_job_results()
41-
except Exception as e:
42-
# Log the issue, but don't stop the worker
43-
# (these tasks are kind of ancilarry to the main job processing)
44-
logger.exception(e)
45-
46-
with transaction.atomic():
47-
job_request = JobRequest.objects.next_up()
48-
if not job_request:
49-
# Potentially no jobs to process (who knows for how long)
50-
# but sleep for a second to give the CPU and DB a break
51-
time.sleep(1)
52-
continue
53-
54-
logger.info(
55-
'Preparing to execute job job_class=%s job_request_uuid=%s job_priority=%s job_source="%s"',
56-
job_request.job_class,
57-
job_request.uuid,
58-
job_request.priority,
59-
job_request.source,
60-
)
61-
62-
job = job_request.convert_to_job()
63-
64-
job_uuid = str(job.uuid) # Make a str copy
65-
66-
# Release these now
67-
del job_request
68-
del job
69-
70-
self.executor.submit(process_job, job_uuid)
71-
except (KeyboardInterrupt, SystemExit):
72-
self.executor.shutdown(wait=True, cancel_futures=True)
39+
while True:
40+
try:
41+
self.maybe_log_stats()
42+
self.maybe_check_job_results()
43+
except Exception as e:
44+
# Log the issue, but don't stop the worker
45+
# (these tasks are kind of ancilarry to the main job processing)
46+
logger.exception(e)
47+
48+
with transaction.atomic():
49+
job_request = JobRequest.objects.next_up()
50+
if not job_request:
51+
# Potentially no jobs to process (who knows for how long)
52+
# but sleep for a second to give the CPU and DB a break
53+
time.sleep(1)
54+
continue
55+
56+
logger.info(
57+
'Preparing to execute job job_class=%s job_request_uuid=%s job_priority=%s job_source="%s"',
58+
job_request.job_class,
59+
job_request.uuid,
60+
job_request.priority,
61+
job_request.source,
62+
)
63+
64+
job = job_request.convert_to_job(worker_uuid=self.uuid)
65+
66+
job_uuid = str(job.uuid) # Make a str copy
67+
68+
# Release these now
69+
del job_request
70+
del job
71+
72+
self.executor.submit(process_job, job_uuid)
73+
74+
def shutdown(self):
75+
# Prevent duplicate keyboard and sigterm calls
76+
# (the way this works also only lets us shutdown once per instance)
77+
if getattr(self, "_is_shutting_down", False):
78+
return
79+
self._is_shutting_down = True
80+
81+
logger.info("Job worker shutdown started")
82+
83+
# Make an attept to immediatelly move any unstarted jobs from this worker
84+
# to the JobResults as cancelled. If they have a retry, they'll be picked
85+
# up by the next worker process.
86+
for job in Job.objects.filter(worker_uuid=self.uuid, started_at__isnull=True):
87+
job.convert_to_result(status=JobResultStatuses.CANCELLED)
88+
89+
# Now shutdown the process pool.
90+
# There's still some chance of a race condition here where we
91+
# just deleted a Job and it was still picked up, but we'll log
92+
# missing jobs as warnings instead of exceptions.
93+
self.executor.shutdown(wait=True, cancel_futures=True)
94+
95+
logger.info("Job worker shutdown complete")
7396

7497
def maybe_log_stats(self):
7598
if not self.stats_every:
@@ -124,7 +147,16 @@ def process_job(job_uuid):
124147
worker_pid = os.getpid()
125148

126149
request_started.send(sender=None)
127-
job = Job.objects.get(uuid=job_uuid)
150+
151+
try:
152+
job = Job.objects.get(uuid=job_uuid)
153+
except Job.DoesNotExist:
154+
logger.warning(
155+
"Job not found worker_pid=%s job_uuid=%s",
156+
worker_pid,
157+
job_uuid,
158+
)
159+
return
128160

129161
logger.info(
130162
'Executing job worker_pid=%s job_class=%s job_request_uuid=%s job_priority=%s job_source="%s"',

0 commit comments

Comments
 (0)