Skip to content

Commit 84d3c9d

Browse files
committed
Unique jobs key
1 parent 79830e0 commit 84d3c9d

File tree

4 files changed

+145
-40
lines changed

4 files changed

+145
-40
lines changed

bolt-jobs/bolt/jobs/gid.py

-22
This file was deleted.

bolt-jobs/bolt/jobs/jobs.py

+86-18
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,6 @@
44

55
from bolt.db.models import Model
66

7-
from .gid import GlobalID
8-
97

108
def load_job(job_class_path, parameters):
119
module_path, class_name = job_class_path.rsplit(".", 1)
@@ -21,14 +19,14 @@ def to_json(args, kwargs):
2119
serialized_args = []
2220
for arg in args:
2321
if isinstance(arg, Model):
24-
serialized_args.append(GlobalID.from_instance(arg))
22+
serialized_args.append(ModelInstanceParameter.from_instance(arg))
2523
else:
2624
serialized_args.append(arg)
2725

2826
serialized_kwargs = {}
2927
for key, value in kwargs.items():
3028
if isinstance(value, Model):
31-
serialized_kwargs[key] = GlobalID.from_instance(value)
29+
serialized_kwargs[key] = ModelInstanceParameter.from_instance(value)
3230
else:
3331
serialized_kwargs[key] = value
3432

@@ -38,21 +36,49 @@ def to_json(args, kwargs):
3836
def from_json(data):
3937
args = []
4038
for arg in data["args"]:
41-
if GlobalID.is_gid(arg):
42-
args.append(GlobalID.to_instance(arg))
39+
if ModelInstanceParameter.is_gid(arg):
40+
args.append(ModelInstanceParameter.to_instance(arg))
4341
else:
4442
args.append(arg)
4543

4644
kwargs = {}
4745
for key, value in data["kwargs"].items():
48-
if GlobalID.is_gid(value):
49-
kwargs[key] = GlobalID.to_instance(value)
46+
if ModelInstanceParameter.is_gid(value):
47+
kwargs[key] = ModelInstanceParameter.to_instance(value)
5048
else:
5149
kwargs[key] = value
5250

5351
return args, kwargs
5452

5553

54+
class ModelInstanceParameter:
55+
"""
56+
A string representation of a model instance,
57+
so we can convert a single parameter (model instance itself)
58+
into a string that can be serialized and stored in the database.
59+
"""
60+
61+
@staticmethod
62+
def from_instance(instance):
63+
return f"gid://{instance._meta.package_label}/{instance._meta.model_name}/{instance.pk}"
64+
65+
@staticmethod
66+
def to_instance(s):
67+
if not s.startswith("gid://"):
68+
raise ValueError("Invalid ModelInstanceParameter string")
69+
package, model, pk = s[6:].split("/")
70+
from bolt.packages import packages
71+
72+
model = packages.get_model(package, model)
73+
return model.objects.get(pk=pk)
74+
75+
@staticmethod
76+
def is_gid(x):
77+
if not isinstance(x, str):
78+
return False
79+
return x.startswith("gid://")
80+
81+
5682
class JobType(type):
5783
"""
5884
Metaclass allows us to capture the original args/kwargs
@@ -68,7 +94,15 @@ def __call__(self, *args, **kwargs):
6894

6995

7096
class Job(metaclass=JobType):
97+
def run(self):
98+
raise NotImplementedError
99+
71100
def run_in_background(self, start_at: datetime.datetime | None = None):
101+
from .models import JobRequest
102+
103+
if unique_existing := self._get_existing_unique_job_or_request():
104+
return unique_existing
105+
72106
try:
73107
# Try to automatically annotate the source of the job
74108
caller = inspect.stack()[1]
@@ -78,22 +112,56 @@ def run_in_background(self, start_at: datetime.datetime | None = None):
78112

79113
parameters = JobParameters.to_json(self._init_args, self._init_kwargs)
80114

81-
from .models import JobRequest
82-
83-
priority = self.get_priority()
84-
retries = self.get_retries()
85-
86115
return JobRequest.objects.create(
87-
job_class=f"{self.__module__}.{self.__class__.__name__}",
116+
job_class=self._job_class_str(),
88117
parameters=parameters,
89-
priority=priority,
118+
priority=self.get_priority(),
90119
source=source,
91-
retries=retries,
120+
retries=self.get_retries(),
92121
start_at=start_at,
93122
)
94123

95-
def run(self):
96-
raise NotImplementedError
124+
def _job_class_str(self):
125+
return f"{self.__module__}.{self.__class__.__name__}"
126+
127+
def _get_existing_unique_job_or_request(self):
128+
"""
129+
Find pending or running versions of this job that already exist.
130+
Note this doesn't include instances that may have failed and are
131+
not yet queued for retry.
132+
"""
133+
from .models import Job, JobRequest
134+
135+
job_class = self._job_class_str()
136+
unique_key = self.get_unique_key()
137+
138+
if not unique_key:
139+
return None
140+
141+
try:
142+
return JobRequest.objects.get(
143+
job_class=job_class,
144+
unique_key=unique_key,
145+
)
146+
except JobRequest.DoesNotExist:
147+
pass
148+
149+
try:
150+
return Job.objects.get(
151+
job_class=job_class,
152+
unique_key=unique_key,
153+
)
154+
except Job.DoesNotExist:
155+
pass
156+
157+
return None
158+
159+
def get_unique_key(self) -> str:
160+
"""
161+
A unique key to prevent duplicate jobs from being queued.
162+
Enabled by returning a non-empty string.
163+
"""
164+
raise ""
97165

98166
def get_priority(self) -> int:
99167
return 0
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
# Generated by Bolt 5.0.dev20240117193239 on 2024-01-17 19:41
2+
3+
from bolt.db import migrations, models
4+
5+
6+
class Migration(migrations.Migration):
7+
dependencies = [
8+
("boltqueue", "0013_alter_job_options_alter_jobresult_options_and_more"),
9+
]
10+
11+
operations = [
12+
migrations.AddField(
13+
model_name="job",
14+
name="unique_key",
15+
field=models.CharField(blank=True, db_index=True, max_length=255),
16+
),
17+
migrations.AddField(
18+
model_name="jobrequest",
19+
name="unique_key",
20+
field=models.CharField(blank=True, db_index=True, max_length=255),
21+
),
22+
migrations.AddField(
23+
model_name="jobresult",
24+
name="unique_key",
25+
field=models.CharField(blank=True, db_index=True, max_length=255),
26+
),
27+
migrations.AddIndex(
28+
model_name="job",
29+
index=models.Index(
30+
fields=["job_class", "unique_key"], name="job_class_unique_key"
31+
),
32+
),
33+
migrations.AddIndex(
34+
model_name="jobrequest",
35+
index=models.Index(
36+
fields=["job_class", "unique_key"], name="job_request_class_unique_key"
37+
),
38+
),
39+
]

bolt-jobs/bolt/jobs/models.py

+20
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ class JobRequest(models.Model):
3939
retries = models.IntegerField(default=0)
4040
retry_attempt = models.IntegerField(default=0)
4141

42+
unique_key = models.CharField(max_length=255, blank=True, db_index=True)
43+
4244
start_at = models.DateTimeField(blank=True, null=True, db_index=True)
4345

4446
# context
@@ -48,6 +50,12 @@ class JobRequest(models.Model):
4850

4951
class Meta:
5052
ordering = ["priority", "-created_at"]
53+
indexes = [
54+
# Used to dedupe unique in-process jobs
55+
models.Index(
56+
name="job_request_class_unique_key", fields=["job_class", "unique_key"]
57+
),
58+
]
5159

5260
def __str__(self):
5361
return f"{self.job_class} [{self.uuid}]"
@@ -65,6 +73,7 @@ def convert_to_job(self):
6573
source=self.source,
6674
retries=self.retries,
6775
retry_attempt=self.retry_attempt,
76+
unique_key=self.unique_key,
6877
)
6978

7079
# Delete the pending JobRequest now
@@ -107,11 +116,18 @@ class Job(models.Model):
107116
source = models.TextField(blank=True)
108117
retries = models.IntegerField(default=0)
109118
retry_attempt = models.IntegerField(default=0)
119+
unique_key = models.CharField(max_length=255, blank=True, db_index=True)
110120

111121
objects = JobQuerySet.as_manager()
112122

113123
class Meta:
114124
ordering = ["-created_at"]
125+
indexes = [
126+
# Used to dedupe unique in-process jobs
127+
models.Index(
128+
name="job_class_unique_key", fields=["job_class", "unique_key"]
129+
),
130+
]
115131

116132
def run(self):
117133
# This is how we know it has been picked up
@@ -149,6 +165,7 @@ def convert_to_result(self, *, status, error=""):
149165
source=self.source,
150166
retries=self.retries,
151167
retry_attempt=self.retry_attempt,
168+
unique_key=self.unique_key,
152169
)
153170

154171
# Delete the Job now
@@ -219,6 +236,7 @@ class JobResult(models.Model):
219236
source = models.TextField(blank=True)
220237
retries = models.IntegerField(default=0)
221238
retry_attempt = models.IntegerField(default=0)
239+
unique_key = models.CharField(max_length=255, blank=True, db_index=True)
222240

223241
# Retries
224242
retry_job_request_uuid = models.UUIDField(blank=True, null=True)
@@ -250,6 +268,8 @@ def retry_job(self, delay: int | None = None):
250268
priority=self.priority,
251269
source=self.source,
252270
retries=self.retries,
271+
unique_key=self.unique_key,
272+
# For the retry
253273
retry_attempt=retry_attempt,
254274
start_at=start_at,
255275
)

0 commit comments

Comments
 (0)