@@ -30,13 +30,15 @@ def __init__(self, max_processes=None, max_jobs_per_process=None, stats_every=No
30
30
31
31
self .uuid = uuid .uuid4 ()
32
32
33
+ self ._is_shutting_down = False
34
+
33
35
def run (self ):
34
36
logger .info (
35
37
"Starting job worker with %s max processes" ,
36
38
self .max_processes ,
37
39
)
38
40
39
- while True :
41
+ while not self . _is_shutting_down :
40
42
try :
41
43
self .maybe_log_stats ()
42
44
self .maybe_check_job_results ()
@@ -72,10 +74,10 @@ def run(self):
72
74
self .executor .submit (process_job , job_uuid )
73
75
74
76
def shutdown (self ):
75
- # Prevent duplicate keyboard and sigterm calls
76
- # (the way this works also only lets us shutdown once per instance)
77
- if getattr (self , "_is_shutting_down" , False ):
77
+ if self ._is_shutting_down :
78
+ # Already shutting down somewhere else
78
79
return
80
+
79
81
self ._is_shutting_down = True
80
82
81
83
logger .info ("Job worker shutdown started" )
@@ -120,7 +122,12 @@ def maybe_check_job_results(self):
120
122
self .check_job_results ()
121
123
122
124
def log_stats (self ):
123
- num_proccesses = len (self .executor ._processes )
125
+ try :
126
+ num_proccesses = len (self .executor ._processes )
127
+ except (AttributeError , TypeError ):
128
+ # Depending on shutdown timing and internal behavior, this might not work
129
+ num_proccesses = 0
130
+
124
131
num_backlog_jobs = (
125
132
JobRequest .objects .count ()
126
133
+ Job .objects .filter (started_at__isnull = True ).count ()
0 commit comments