From cd81b0f459e6a4980fda23263c8fe562d01d14a7 Mon Sep 17 00:00:00 2001 From: Dave Gaeddert Date: Thu, 18 Jan 2024 17:30:38 -0600 Subject: [PATCH] More shutdown cleanup --- bolt-jobs/bolt/jobs/workers.py | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/bolt-jobs/bolt/jobs/workers.py b/bolt-jobs/bolt/jobs/workers.py index 71fe366d0c..f69b9b88e6 100644 --- a/bolt-jobs/bolt/jobs/workers.py +++ b/bolt-jobs/bolt/jobs/workers.py @@ -30,13 +30,15 @@ def __init__(self, max_processes=None, max_jobs_per_process=None, stats_every=No self.uuid = uuid.uuid4() + self._is_shutting_down = False + def run(self): logger.info( "Starting job worker with %s max processes", self.max_processes, ) - while True: + while not self._is_shutting_down: try: self.maybe_log_stats() self.maybe_check_job_results() @@ -72,10 +74,10 @@ def run(self): self.executor.submit(process_job, job_uuid) def shutdown(self): - # Prevent duplicate keyboard and sigterm calls - # (the way this works also only lets us shutdown once per instance) - if getattr(self, "_is_shutting_down", False): + if self._is_shutting_down: + # Already shutting down somewhere else return + self._is_shutting_down = True logger.info("Job worker shutdown started") @@ -120,7 +122,12 @@ def maybe_check_job_results(self): self.check_job_results() def log_stats(self): - num_proccesses = len(self.executor._processes) + try: + num_proccesses = len(self.executor._processes) + except (AttributeError, TypeError): + # Depending on shutdown timing and internal behavior, this might not work + num_proccesses = 0 + num_backlog_jobs = ( JobRequest.objects.count() + Job.objects.filter(started_at__isnull=True).count()