Skip to content

Commit c52941f

Browse files
committed
Refactor to pull task and hook running into Job model
1 parent dba1e91 commit c52941f

File tree

2 files changed

+17
-17
lines changed

2 files changed

+17
-17
lines changed

django_dbq/management/commands/worker.py

Lines changed: 3 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -74,29 +74,18 @@ def _process_job(self):
7474
self.current_job = job
7575

7676
try:
77-
task_function = import_string(job.next_task)
78-
7977
job.run_pre_task_hook()
80-
task_function(job)
81-
78+
job.run_next_task()
8279
job.update_next_task()
80+
8381
if not job.next_task:
8482
job.state = Job.STATES.COMPLETE
8583
else:
8684
job.state = Job.STATES.READY
8785
except Exception as exception:
8886
logger.exception("Job id=%s failed", job.pk)
8987
job.state = Job.STATES.FAILED
90-
91-
failure_hook_name = job.get_failure_hook_name()
92-
if failure_hook_name:
93-
logger.info(
94-
"Running failure hook %s for job id=%s", failure_hook_name, job.pk
95-
)
96-
failure_hook_function = import_string(failure_hook_name)
97-
failure_hook_function(job, exception)
98-
else:
99-
logger.info("No failure hook for job id=%s", job.pk)
88+
job.run_failure_hook(exception)
10089
finally:
10190
try:
10291
job.run_post_task_hook()

django_dbq/models.py

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,10 @@ def save(self, *args, **kwargs):
128128
def update_next_task(self):
129129
self.next_task = get_next_task_name(self.name, self.next_task) or ""
130130

131+
def run_next_task(self):
132+
next_task_function = import_string(self.next_task)
133+
next_task_function(self)
134+
131135
def get_pre_task_hook_name(self):
132136
return get_pre_task_hook_name(self.name)
133137

@@ -143,21 +147,28 @@ def get_creation_hook_name(self):
143147
def run_pre_task_hook(self):
144148
pre_task_hook_name = self.get_pre_task_hook_name()
145149
if pre_task_hook_name:
146-
logger.info("Running pre_task hook %s for new job", pre_task_hook_name)
150+
logger.info("Running pre_task hook %s for job", pre_task_hook_name)
147151
pre_task_hook_function = import_string(pre_task_hook_name)
148152
pre_task_hook_function(self)
149153

150154
def run_post_task_hook(self):
151155
post_task_hook_name = self.get_post_task_hook_name()
152156
if post_task_hook_name:
153-
logger.info("Running post_task hook %s for new job", post_task_hook_name)
157+
logger.info("Running post_task hook %s for job", post_task_hook_name)
154158
post_task_hook_function = import_string(post_task_hook_name)
155159
post_task_hook_function(self)
156160

161+
def run_failure_hook(self, exception):
162+
failure_hook_name = self.get_failure_hook_name()
163+
if failure_hook_name:
164+
logger.info("Running failure hook %s for job", failure_hook_name)
165+
failure_hook_function = import_string(failure_hook_name)
166+
failure_hook_function(self, exception)
167+
157168
def run_creation_hook(self):
158169
creation_hook_name = self.get_creation_hook_name()
159170
if creation_hook_name:
160-
logger.info("Running creation hook %s for new job", creation_hook_name)
171+
logger.info("Running creation hook %s for job", creation_hook_name)
161172
creation_hook_function = import_string(creation_hook_name)
162173
creation_hook_function(self)
163174

0 commit comments

Comments
 (0)