Skip to content

Commit

Permalink
Add Job model to separate active from results
Browse files Browse the repository at this point in the history
  • Loading branch information
davegaeddert committed Jan 17, 2024
1 parent f937009 commit a50afb9
Show file tree
Hide file tree
Showing 6 changed files with 269 additions and 100 deletions.
26 changes: 18 additions & 8 deletions bolt-jobs/bolt/jobs/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from bolt.admin.dates import DatetimeRangeAliases
from bolt.http import HttpResponseRedirect

from .models import JobRequest, JobResult
from .models import Job, JobRequest, JobResult


class SuccessfulJobsCard(Card):
Expand Down Expand Up @@ -82,6 +82,22 @@ class DetailView(AdminModelDetailView):
model = JobRequest


@register_viewset
class JobViewset(AdminModelViewset):
class ListView(AdminModelListView):
nav_section = "Jobs"
model = Job
fields = ["id", "job_class", "priority", "created_at", "started_at"]
actions = ["Delete"]

def perform_action(self, action: str, target_pks: list):
if action == "Delete":
Job.objects.filter(pk__in=target_pks).delete()

class DetailView(AdminModelDetailView):
model = Job


@register_viewset
class JobResultViewset(AdminModelViewset):
class ListView(AdminModelListView):
Expand All @@ -91,7 +107,7 @@ class ListView(AdminModelListView):
"id",
"job_class",
"priority",
"started_at",
"created_at",
"status",
]
cards = [
Expand All @@ -102,11 +118,9 @@ class ListView(AdminModelListView):
]
filters = [
"Successful",
"Processing",
"Errored",
"Lost",
"Retried",
"Unknown",
]
actions = [
"Retry",
Expand All @@ -120,14 +134,10 @@ def get_initial_queryset(self):
return queryset.successful()
if self.filter == "Errored":
return queryset.errored()
if self.filter == "Processing":
return queryset.processing()
if self.filter == "Lost":
return queryset.lost()
if self.filter == "Retried":
return queryset.retried()
if self.filter == "Unknown":
return queryset.unknown()
return queryset

def get_fields(self):
Expand Down
14 changes: 8 additions & 6 deletions bolt-jobs/bolt/jobs/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from bolt.utils import timezone

from .models import JobRequest, JobResult
from .models import Job, JobRequest, JobResult
from .workers import Worker

logger = logging.getLogger("bolt.jobs")
Expand Down Expand Up @@ -62,12 +62,14 @@ def clear_completed(older_than):
@cli.command()
def stats():
pending = JobRequest.objects.count()
processing = Job.objects.count()

processing = JobResult.objects.processing().count()
successful = JobResult.objects.successful().count()
errored = JobResult.objects.errored().count()
lost = JobResult.objects.lost().count()

click.echo(f"Pending: {click.style(pending, bold=True)}")
click.echo(f"Processing: {click.style(processing, bold=True)}")
click.echo(f"Successful: {click.style(successful, bold=True)}")
click.echo(f"Errored: {click.style(errored, bold=True)}")
click.secho(f"Pending: {pending}", bold=True)
click.secho(f"Processing: {processing}", bold=True)
click.secho(f"Successful: {successful}", bold=True, fg="green")
click.secho(f"Errored: {errored}", bold=True, fg="red")
click.secho(f"Lost: {lost}", bold=True, fg="yellow")
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# Generated by Bolt 5.0.dev20240114170303 on 2024-01-17 18:45

import uuid

from bolt.db import migrations, models


class Migration(migrations.Migration):
dependencies = [
("boltqueue", "0011_jobrequest_retries_jobrequest_retry_attempt_and_more"),
]

operations = [
migrations.CreateModel(
name="Job",
fields=[
(
"id",
models.BigAutoField(
auto_created=True,
primary_key=True,
serialize=False,
verbose_name="ID",
),
),
(
"uuid",
models.UUIDField(default=uuid.uuid4, editable=False, unique=True),
),
("created_at", models.DateTimeField(auto_now_add=True)),
(
"started_at",
models.DateTimeField(blank=True, db_index=True, null=True),
),
("job_request_uuid", models.UUIDField(db_index=True)),
("job_class", models.CharField(db_index=True, max_length=255)),
("parameters", models.JSONField(blank=True, null=True)),
("priority", models.IntegerField(db_index=True, default=0)),
("source", models.TextField(blank=True)),
("retries", models.IntegerField(default=0)),
("retry_attempt", models.IntegerField(default=0)),
],
),
migrations.AddField(
model_name="jobresult",
name="job_uuid",
field=models.UUIDField(db_index=True, default=uuid.uuid4),
preserve_default=False,
),
migrations.AlterField(
model_name="jobresult",
name="status",
field=models.CharField(
choices=[
("SUCCESSFUL", "Successful"),
("ERRORED", "Errored"),
("LOST", "Lost"),
],
db_index=True,
max_length=20,
),
),
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Generated by Bolt 5.0.dev20240114170303 on 2024-01-17 19:24

from bolt.db import migrations, models


class Migration(migrations.Migration):
dependencies = [
("boltqueue", "0012_job_jobresult_job_uuid_alter_jobresult_status"),
]

operations = [
migrations.AlterModelOptions(
name="job",
options={"ordering": ["-created_at"]},
),
migrations.AlterModelOptions(
name="jobresult",
options={"ordering": ["-created_at"]},
),
migrations.AlterField(
model_name="job",
name="created_at",
field=models.DateTimeField(auto_now_add=True, db_index=True),
),
migrations.AlterField(
model_name="jobrequest",
name="created_at",
field=models.DateTimeField(auto_now_add=True, db_index=True),
),
migrations.AlterField(
model_name="jobresult",
name="created_at",
field=models.DateTimeField(auto_now_add=True, db_index=True),
),
]
134 changes: 91 additions & 43 deletions bolt-jobs/bolt/jobs/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class JobRequest(models.Model):
Keep all pending job requests in a single table.
"""

created_at = models.DateTimeField(auto_now_add=True)
created_at = models.DateTimeField(auto_now_add=True, db_index=True)
uuid = models.UUIDField(default=uuid.uuid4, editable=False, unique=True)

job_class = models.CharField(max_length=255, db_index=True)
Expand All @@ -52,12 +52,12 @@ class Meta:
def __str__(self):
return f"{self.job_class} [{self.uuid}]"

def convert_to_result(self):
def convert_to_job(self):
"""
JobRequests are the pending jobs that are waiting to be executed.
We immediately convert them to JobResults when they are picked up.
"""
result = JobResult.objects.create(
result = Job.objects.create(
job_request_uuid=self.uuid,
job_class=self.job_class,
parameters=self.parameters,
Expand All @@ -73,13 +73,91 @@ def convert_to_result(self):
return result


class JobResultQuerySet(models.QuerySet):
def unknown(self):
return self.filter(status=JobResultStatuses.UNKNOWN)
class JobQuerySet(models.QuerySet):
def mark_lost_jobs(self):
# Nothing should be pending after more than a 24 hrs... consider it lost
# Downside to these is that they are mark lost pretty late?
# In theory we could save a timeout per-job and mark them timed-out more quickly,
# but if they're still running, we can't actually send a signal to cancel it...
now = timezone.now()
one_day_ago = now - datetime.timedelta(days=1)
lost_jobs = self.filter(created_at__lt=one_day_ago)
for job in lost_jobs:
job.convert_to_result(
ended_at=now,
error="",
status=JobResultStatuses.LOST,
)


class Job(models.Model):
"""
All active jobs are stored in this table.
"""

uuid = models.UUIDField(default=uuid.uuid4, editable=False, unique=True)
created_at = models.DateTimeField(auto_now_add=True, db_index=True)
started_at = models.DateTimeField(blank=True, null=True, db_index=True)

# From the JobRequest
job_request_uuid = models.UUIDField(db_index=True)
job_class = models.CharField(max_length=255, db_index=True)
parameters = models.JSONField(blank=True, null=True)
priority = models.IntegerField(default=0, db_index=True)
source = models.TextField(blank=True)
retries = models.IntegerField(default=0)
retry_attempt = models.IntegerField(default=0)

objects = JobQuerySet.as_manager()

class Meta:
ordering = ["-created_at"]

def run(self):
# This is how we know it has been picked up
self.started_at = timezone.now()
self.save(update_fields=["started_at"])

try:
job = load_job(self.job_class, self.parameters)
job.run()
status = JobResultStatuses.SUCCESSFUL
error = ""
except Exception as e:
status = JobResultStatuses.ERRORED
error = "".join(traceback.format_tb(e.__traceback__))
logger.exception(e)

return self.convert_to_result(status=status, error=error)

def convert_to_result(self, *, status, error=""):
"""
Convert this Job to a JobResult.
"""
result = JobResult.objects.create(
ended_at=timezone.now(),
error=error,
status=status,
# From the Job
job_uuid=self.uuid,
started_at=self.started_at,
# From the JobRequest
job_request_uuid=self.job_request_uuid,
job_class=self.job_class,
parameters=self.parameters,
priority=self.priority,
source=self.source,
retries=self.retries,
retry_attempt=self.retry_attempt,
)

def processing(self):
return self.filter(status=JobResultStatuses.PROCESSING)
# Delete the Job now
self.delete()

return result


class JobResultQuerySet(models.QuerySet):
def successful(self):
return self.filter(status=JobResultStatuses.SUCCESSFUL)

Expand All @@ -95,18 +173,6 @@ def retried(self):
| models.Q(retry_attempt__gt=0)
)

def mark_lost_jobs(self):
# Nothing should be pending after more than a 24 hrs... consider it lost
# Downside to these is that they are mark lost pretty late?
# In theory we could save a timeout per-job and mark them timed-out more quickly,
# but if they're still running, we can't actually send a signal to cancel it...
now = timezone.now()
one_day_ago = now - datetime.timedelta(days=1)
self.filter(
status__in=[JobResultStatuses.PROCESSING, JobResultStatuses.UNKNOWN],
created_at__lt=one_day_ago,
).update(status=JobResultStatuses.LOST, ended_at=now)

def retry_failed_jobs(self):
for result in self.filter(
status__in=[JobResultStatuses.ERRORED, JobResultStatuses.LOST],
Expand All @@ -118,8 +184,6 @@ def retry_failed_jobs(self):


class JobResultStatuses(models.TextChoices):
UNKNOWN = "", "Unknown" # The initial state
PROCESSING = "PROCESSING", "Processing"
SUCCESSFUL = "SUCCESSFUL", "Successful"
ERRORED = "ERRORED", "Errored" # Threw an error
LOST = (
Expand All @@ -134,15 +198,16 @@ class JobResult(models.Model):
"""

uuid = models.UUIDField(default=uuid.uuid4, editable=False, unique=True)
created_at = models.DateTimeField(auto_now_add=True)
created_at = models.DateTimeField(auto_now_add=True, db_index=True)

# From the Job
job_uuid = models.UUIDField(db_index=True)
started_at = models.DateTimeField(blank=True, null=True, db_index=True)
ended_at = models.DateTimeField(blank=True, null=True, db_index=True)
error = models.TextField(blank=True)
status = models.CharField(
max_length=20,
choices=JobResultStatuses.choices,
blank=True,
default=JobResultStatuses.UNKNOWN,
db_index=True,
)

Expand All @@ -161,24 +226,7 @@ class JobResult(models.Model):
objects = JobResultQuerySet.as_manager()

class Meta:
ordering = ["-started_at"]

def process_job(self):
self.started_at = timezone.now()
self.status = JobResultStatuses.PROCESSING
self.save(update_fields=["started_at", "status"])

try:
job = load_job(self.job_class, self.parameters)
job.run()
self.status = JobResultStatuses.SUCCESSFUL
except Exception as e:
self.error = "".join(traceback.format_tb(e.__traceback__))
self.status = JobResultStatuses.ERRORED
logger.exception(e)

self.ended_at = timezone.now()
self.save(update_fields=["ended_at", "error", "status"])
ordering = ["-created_at"]

def retry_job(self, delay: int | None = None):
retry_attempt = self.retry_attempt + 1
Expand Down
Loading

0 comments on commit a50afb9

Please sign in to comment.