Skip to content

Commit a997d0c

Browse files
committed
Reorganize task revocation logic
1 parent 94ae2ac commit a997d0c

File tree

4 files changed

+38
-31
lines changed

4 files changed

+38
-31
lines changed

contentcuration/contentcuration/tests/helpers.py

+2
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
from importlib import import_module
33

44
import mock
5+
from celery import states
56

67
from contentcuration.models import TaskResult
78

@@ -20,6 +21,7 @@ def clear_tasks(except_task_id=None):
2021
qs = qs.exclude(task_id=except_task_id)
2122
for task_id in qs.values_list("task_id", flat=True):
2223
app.control.revoke(task_id, terminate=True)
24+
qs.update(status=states.REVOKED)
2325

2426

2527
def mock_class_instance(target):

contentcuration/contentcuration/tests/test_asynctask.py

+17
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,9 @@ def _wait_for(self, async_result, timeout=30):
142142
with allow_join_result():
143143
return async_result.get(timeout=timeout)
144144

145+
def test_app_count_queued_tasks(self):
146+
self.assertIsInstance(app.count_queued_tasks(), int)
147+
145148
def test_asynctask_reports_success(self):
146149
"""
147150
Tests that when an async task is created and completed, the Task object has a status of 'SUCCESS' and
@@ -245,3 +248,17 @@ def test_requeue_task(self):
245248
second_result = self._wait_for(second_async_result)
246249
self.assertIsNone(second_result)
247250
self.assertTrue(second_async_result.successful())
251+
252+
def test_revoke_task(self):
253+
channel_id = uuid.uuid4()
254+
async_result = test_task.enqueue(self.user, channel_id=channel_id)
255+
test_task.revoke(channel_id=channel_id)
256+
257+
# this should raise an exception, even though revoked, because the task is in ready state but not success
258+
with self.assertRaises(Exception):
259+
self._wait_for(async_result)
260+
261+
try:
262+
TaskResult.objects.get(task_id=async_result.task_id, status=states.REVOKED)
263+
except TaskResult.DoesNotExist:
264+
self.fail('Missing revoked task result')

contentcuration/contentcuration/utils/celery/app.py

-31
Original file line numberDiff line numberDiff line change
@@ -54,37 +54,6 @@ def count_queued_tasks(self, queue_name="celery"):
5454
count = conn.default_channel.client.llen(queue_name)
5555
return count
5656

57-
def _revoke_matching(self, tasks, task_name, matching_kwargs=None):
58-
count = 0
59-
for task in tasks:
60-
if task['name'] == task_name and (matching_kwargs is None or task['kwargs'] == matching_kwargs):
61-
print('Revoking task {}'.format(task))
62-
count += 1
63-
self.control.revoke(task['id'], terminate=True)
64-
return count
65-
66-
def revoke_reserved_tasks_by_name(self, task_name, matching_kwargs=None):
67-
"""
68-
:param task_name: The string name of the task
69-
:param matching_kwargs: A dict of kwargs to match to the task
70-
:return: The count of revoked tasks
71-
"""
72-
count = 0
73-
for worker_name, tasks in self.control.inspect().reserved().items():
74-
count += self._revoke_matching(tasks, task_name, matching_kwargs)
75-
return count
76-
77-
def terminate_active_tasks_by_name(self, task_name, matching_kwargs=None):
78-
"""
79-
:param task_name: The string name of the task
80-
:param matching_kwargs: A dict of kwargs to match to the task
81-
:return: The count of terminated tasks
82-
"""
83-
count = 0
84-
for worker_name, tasks in self.control.inspect().active().items():
85-
count += self._revoke_matching(tasks, task_name, matching_kwargs)
86-
return count
87-
8857
def decode_result(self, result, status=None):
8958
"""
9059
Decodes the celery result, like the raw result from the database, using celery tools

contentcuration/contentcuration/utils/celery/tasks.py

+19
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,25 @@ def requeue(self, **kwargs):
246246
logging.info(f"Re-queuing task {self.name} for user {task_result.user.pk} from {request.id} | {task_kwargs}")
247247
return self.enqueue(task_result.user, **task_kwargs)
248248

249+
def revoke(self, exclude_task_ids=None, **kwargs):
250+
"""
251+
Revokes and terminates all unready tasks matching the kwargs
252+
:param exclude_task_ids: Any task ids to exclude from this behavior
253+
:param kwargs: Task keyword arguments that will be used to match against tasks
254+
:return: The number of tasks revoked
255+
"""
256+
task_ids = self.find_incomplete_ids(**self.backend.decode(self._prepare_kwargs(kwargs)))
257+
if exclude_task_ids is not None:
258+
task_ids = task_ids.exclude(task_id__in=task_ids)
259+
count = 0
260+
for task_id in task_ids:
261+
logging.info(f"Revoking task {task_id}")
262+
self.app.control.revoke(task_id, terminate=True)
263+
count += 1
264+
# be sure the database backend has these marked appropriately
265+
self.TaskModel.objects.filter(task_id__in=task_ids).update(status=states.REVOKED)
266+
return count
267+
249268

250269
class CeleryAsyncResult(AsyncResult):
251270
"""

0 commit comments

Comments
 (0)