Skip to content

Commit 5f1cc8c

Browse files
committed
Use the future callback to convert jobs to cancelled
1 parent 79ca9c6 commit 5f1cc8c

File tree

3 files changed

+36
-30
lines changed

3 files changed

+36
-30
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
# Generated by Bolt 5.0.dev20240121220125 on 2024-01-22 16:53
2+
3+
from bolt.db import migrations
4+
5+
6+
class Migration(migrations.Migration):
7+
dependencies = [
8+
("boltqueue", "0015_job_worker_uuid_jobresult_worker_uuid_and_more"),
9+
]
10+
11+
operations = [
12+
migrations.RemoveField(
13+
model_name="job",
14+
name="worker_uuid",
15+
),
16+
migrations.RemoveField(
17+
model_name="jobresult",
18+
name="worker_uuid",
19+
),
20+
]

bolt-jobs/bolt/jobs/models.py

+1-10
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ class Meta:
6161
def __str__(self):
6262
return f"{self.job_class} [{self.uuid}]"
6363

64-
def convert_to_job(self, *, worker_uuid=None):
64+
def convert_to_job(self):
6565
"""
6666
JobRequests are the pending jobs that are waiting to be executed.
6767
We immediately convert them to JobResults when they are picked up.
@@ -76,7 +76,6 @@ def convert_to_job(self, *, worker_uuid=None):
7676
retries=self.retries,
7777
retry_attempt=self.retry_attempt,
7878
unique_key=self.unique_key,
79-
worker_uuid=worker_uuid,
8079
)
8180

8281
# Delete the pending JobRequest now
@@ -118,9 +117,6 @@ class Job(models.Model):
118117
created_at = models.DateTimeField(auto_now_add=True, db_index=True)
119118
started_at = models.DateTimeField(blank=True, null=True, db_index=True)
120119

121-
# To associate with a worker
122-
worker_uuid = models.UUIDField(blank=True, null=True, db_index=True)
123-
124120
# From the JobRequest
125121
job_request_uuid = models.UUIDField(db_index=True)
126122
job_class = models.CharField(max_length=255, db_index=True)
@@ -168,8 +164,6 @@ def convert_to_result(self, *, status, error=""):
168164
ended_at=timezone.now(),
169165
error=error,
170166
status=status,
171-
# From the worker
172-
worker_uuid=self.worker_uuid,
173167
# From the Job
174168
job_uuid=self.uuid,
175169
started_at=self.started_at,
@@ -248,9 +242,6 @@ class JobResult(models.Model):
248242
uuid = models.UUIDField(default=uuid.uuid4, editable=False, unique=True)
249243
created_at = models.DateTimeField(auto_now_add=True, db_index=True)
250244

251-
# To associate with a worker
252-
worker_uuid = models.UUIDField(blank=True, null=True, db_index=True)
253-
254245
# From the Job
255246
job_uuid = models.UUIDField(db_index=True)
256247
started_at = models.DateTimeField(blank=True, null=True, db_index=True)

bolt-jobs/bolt/jobs/workers.py

+15-20
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@
22
import multiprocessing
33
import os
44
import time
5-
import uuid
6-
from concurrent.futures import ProcessPoolExecutor
5+
from concurrent.futures import Future, ProcessPoolExecutor
6+
from functools import partial
77

88
from bolt.db import transaction
99
from bolt.logs import app_logger
@@ -28,8 +28,6 @@ def __init__(self, max_processes=None, max_jobs_per_process=None, stats_every=No
2828
self.max_processes = self.executor._max_workers
2929
self.max_jobs_per_process = max_jobs_per_process
3030

31-
self.uuid = uuid.uuid4()
32-
3331
self._is_shutting_down = False
3432

3533
def run(self):
@@ -63,37 +61,25 @@ def run(self):
6361
job_request.source,
6462
)
6563

66-
job = job_request.convert_to_job(worker_uuid=self.uuid)
64+
job = job_request.convert_to_job()
6765

6866
job_uuid = str(job.uuid) # Make a str copy
6967

7068
# Release these now
7169
del job_request
7270
del job
7371

74-
self.executor.submit(process_job, job_uuid)
72+
future = self.executor.submit(process_job, job_uuid)
73+
future.add_done_callback(partial(future_finished_callback, job_uuid))
7574

7675
def shutdown(self):
7776
if self._is_shutting_down:
7877
# Already shutting down somewhere else
7978
return
8079

81-
self._is_shutting_down = True
82-
8380
logger.info("Job worker shutdown started")
84-
85-
# Make an attept to immediatelly move any unstarted jobs from this worker
86-
# to the JobResults as cancelled. If they have a retry, they'll be picked
87-
# up by the next worker process.
88-
for job in Job.objects.filter(worker_uuid=self.uuid, started_at__isnull=True):
89-
job.convert_to_result(status=JobResultStatuses.CANCELLED)
90-
91-
# Now shutdown the process pool.
92-
# There's still some chance of a race condition here where we
93-
# just deleted a Job and it was still picked up, but we'll log
94-
# missing jobs as warnings instead of exceptions.
81+
self._is_shutting_down = True
9582
self.executor.shutdown(wait=True, cancel_futures=True)
96-
9783
logger.info("Job worker shutdown complete")
9884

9985
def maybe_log_stats(self):
@@ -149,6 +135,15 @@ def check_job_results(self):
149135
JobResult.objects.retry_failed_jobs()
150136

151137

138+
def future_finished_callback(job_uuid: str, future: Future):
139+
if future.cancelled():
140+
logger.warning("Job cancelled job_uuid=%s", job_uuid)
141+
job = Job.objects.get(uuid=job_uuid)
142+
job.convert_to_result(status=JobResultStatuses.CANCELLED)
143+
else:
144+
logger.debug("Job finished job_uuid=%s", job_uuid)
145+
146+
152147
def process_job(job_uuid):
153148
try:
154149
worker_pid = os.getpid()

0 commit comments

Comments
 (0)