Skip to content

Commit 1378347

Browse files
committed
bolt-jobs
1 parent 878b1d4 commit 1378347

24 files changed

+1135
-0
lines changed

bolt-jobs/.gitignore

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
# Local development files
2+
/.env
3+
/.bolt
4+
*.sqlite3
5+
6+
# Publishing
7+
/dist
8+
9+
# Python
10+
/.venv
11+
__pycache__/
12+
*.py[cod]
13+
*$py.class
14+
15+
# OS files
16+
.DS_Store

bolt-jobs/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
# bolt-jobs

bolt-jobs/bolt/jobs/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
from .jobs import Job
2+
3+
__all__ = ["Job"]

bolt-jobs/bolt/jobs/admin.py

Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
from bolt.admin import (
2+
AdminModelDetailView,
3+
AdminModelListView,
4+
AdminModelViewset,
5+
register_viewset,
6+
)
7+
from bolt.admin.cards import Card
8+
from bolt.admin.dates import DatetimeRangeAliases
9+
from bolt.http import HttpResponseRedirect
10+
11+
from .models import JobRequest, JobResult
12+
13+
14+
class SuccessfulJobsCard(Card):
15+
title = "Successful Jobs"
16+
text = "View"
17+
18+
def get_number(self):
19+
return (
20+
JobResult.objects.successful()
21+
.filter(created_at__range=self.datetime_range.as_tuple())
22+
.count()
23+
)
24+
25+
def get_link(self):
26+
return JobResultViewset.ListView.get_absolute_url() + "?filter=Successful"
27+
28+
29+
class ErroredJobsCard(Card):
30+
title = "Errored Jobs"
31+
text = "View"
32+
33+
def get_number(self):
34+
return (
35+
JobResult.objects.errored()
36+
.filter(created_at__range=self.datetime_range.as_tuple())
37+
.count()
38+
)
39+
40+
def get_link(self):
41+
return JobResultViewset.ListView.get_absolute_url() + "?filter=Errored"
42+
43+
44+
class LostJobsCard(Card):
45+
title = "Lost Jobs"
46+
text = "View" # TODO make not required - just an icon?
47+
48+
def get_number(self):
49+
return (
50+
JobResult.objects.lost()
51+
.filter(created_at__range=self.datetime_range.as_tuple())
52+
.count()
53+
)
54+
55+
def get_link(self):
56+
return JobResultViewset.ListView.get_absolute_url() + "?filter=Lost"
57+
58+
59+
class RetriedJobsCard(Card):
60+
title = "Retried Jobs"
61+
text = "View" # TODO make not required - just an icon?
62+
63+
def get_number(self):
64+
return (
65+
JobResult.objects.retried()
66+
.filter(created_at__range=self.datetime_range.as_tuple())
67+
.count()
68+
)
69+
70+
def get_link(self):
71+
return JobResultViewset.ListView.get_absolute_url() + "?filter=Retried"
72+
73+
74+
@register_viewset
75+
class JobRequestViewset(AdminModelViewset):
76+
class ListView(AdminModelListView):
77+
nav_section = "Jobs"
78+
model = JobRequest
79+
fields = ["id", "job_class", "priority", "created_at"]
80+
81+
class DetailView(AdminModelDetailView):
82+
model = JobRequest
83+
84+
85+
@register_viewset
86+
class JobResultViewset(AdminModelViewset):
87+
class ListView(AdminModelListView):
88+
nav_section = "Jobs"
89+
model = JobResult
90+
fields = [
91+
"id",
92+
"job_class",
93+
"priority",
94+
"started_at",
95+
"status",
96+
]
97+
cards = [
98+
SuccessfulJobsCard,
99+
ErroredJobsCard,
100+
LostJobsCard,
101+
RetriedJobsCard,
102+
]
103+
filters = [
104+
"Successful",
105+
"Processing",
106+
"Errored",
107+
"Lost",
108+
"Retried",
109+
"Unknown",
110+
]
111+
actions = [
112+
"Retry",
113+
]
114+
allow_global_search = False
115+
default_datetime_range = DatetimeRangeAliases.LAST_7_DAYS
116+
117+
def get_initial_queryset(self):
118+
queryset = super().get_initial_queryset()
119+
if self.filter == "Successful":
120+
return queryset.successful()
121+
if self.filter == "Errored":
122+
return queryset.errored()
123+
if self.filter == "Processing":
124+
return queryset.processing()
125+
if self.filter == "Lost":
126+
return queryset.lost()
127+
if self.filter == "Retried":
128+
return queryset.retried()
129+
if self.filter == "Unknown":
130+
return queryset.unknown()
131+
return queryset
132+
133+
def get_fields(self):
134+
fields = super().get_fields()
135+
if self.filter == "Retried":
136+
fields.append("retries")
137+
fields.append("retry_attempt")
138+
return fields
139+
140+
def perform_action(self, action: str, target_pks: list):
141+
if action == "Retry":
142+
for result in JobResult.objects.filter(pk__in=target_pks):
143+
result.retry_job(delay=0)
144+
else:
145+
raise ValueError("Invalid action")
146+
147+
class DetailView(AdminModelDetailView):
148+
model = JobResult
149+
150+
def post(self):
151+
self.load_object()
152+
self.object.retry_job(delay=0)
153+
return HttpResponseRedirect(".")

bolt-jobs/bolt/jobs/cli.py

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
import datetime
2+
import logging
3+
4+
import click
5+
6+
from bolt.utils import timezone
7+
8+
from .models import JobRequest, JobResult
9+
from .workers import Worker
10+
11+
logger = logging.getLogger("bolt.jobs")
12+
13+
14+
@click.group()
15+
def cli():
16+
pass
17+
18+
19+
@cli.command()
20+
@click.option(
21+
"--max-processes",
22+
"max_processes",
23+
default=None,
24+
type=int,
25+
envvar="BOLT_JOBS_MAX_PROCESSES",
26+
)
27+
@click.option(
28+
"--max-jobs-per-process",
29+
"max_jobs_per_process",
30+
default=None,
31+
type=int,
32+
envvar="BOLT_JOBS_MAX_JOBS_PER_PROCESS",
33+
)
34+
@click.option(
35+
"--stats-every",
36+
"stats_every",
37+
default=60,
38+
type=int,
39+
envvar="BOLT_JOBS_STATS_EVERY",
40+
)
41+
def worker(max_processes, max_jobs_per_process, stats_every):
42+
Worker(
43+
max_processes=max_processes,
44+
max_jobs_per_process=max_jobs_per_process,
45+
stats_every=stats_every,
46+
).run()
47+
48+
49+
@cli.command()
50+
@click.option("--older-than", type=int, default=60 * 60 * 24 * 7)
51+
def clear_completed(older_than):
52+
cutoff = timezone.now() - datetime.timedelta(seconds=older_than)
53+
click.echo(f"Clearing jobs finished before {cutoff}")
54+
results = (
55+
JobResult.objects.exclude(ended_at__isnull=True)
56+
.filter(ended_at__lt=cutoff)
57+
.delete()
58+
)
59+
click.echo(f"Deleted {results[0]} jobs")
60+
61+
62+
@cli.command()
63+
def stats():
64+
pending = JobRequest.objects.count()
65+
66+
processing = JobResult.objects.processing().count()
67+
successful = JobResult.objects.successful().count()
68+
errored = JobResult.objects.errored().count()
69+
70+
click.echo(f"Pending: {click.style(pending, bold=True)}")
71+
click.echo(f"Processing: {click.style(processing, bold=True)}")
72+
click.echo(f"Successful: {click.style(successful, bold=True)}")
73+
click.echo(f"Errored: {click.style(errored, bold=True)}")

bolt-jobs/bolt/jobs/config.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
from importlib import import_module
2+
3+
from bolt.packages import PackageConfig, packages
4+
5+
MODULE_NAME = "jobs"
6+
7+
8+
class BoltJobsConfig(PackageConfig):
9+
default_auto_field = "bolt.db.models.BigAutoField"
10+
name = "bolt.jobs"
11+
label = "boltqueue"
12+
13+
def ready(self):
14+
# Trigger register calls to fire by importing the modules
15+
for package_config in packages.get_package_configs():
16+
try:
17+
import_module(f"{package_config.name}.{MODULE_NAME}")
18+
except ModuleNotFoundError:
19+
pass
20+
21+
# Also trigger for the root app/admin.py module
22+
try:
23+
import_module(MODULE_NAME)
24+
except ModuleNotFoundError:
25+
pass

bolt-jobs/bolt/jobs/gid.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
class GlobalID:
2+
"""A global identifier for a model instance."""
3+
4+
@staticmethod
5+
def from_instance(instance):
6+
return f"gid://{instance._meta.package_label}/{instance._meta.model_name}/{instance.pk}"
7+
8+
@staticmethod
9+
def to_instance(s):
10+
if not s.startswith("gid://"):
11+
raise ValueError("Invalid GlobalID string")
12+
package, model, pk = s[6:].split("/")
13+
from bolt.packages import packages
14+
15+
model = packages.get_model(package, model)
16+
return model.objects.get(pk=pk)
17+
18+
@staticmethod
19+
def is_gid(x):
20+
if not isinstance(x, str):
21+
return False
22+
return x.startswith("gid://")

0 commit comments

Comments
 (0)