Skip to content

Commit 9d84374

Browse files
authored
Merge pull request #3852 from bjester/angry-tasks
Add task helpers and stop enqueuing storage calculation tasks for admins
2 parents 7f7e675 + a331538 commit 9d84374

File tree

9 files changed

+112
-7
lines changed

9 files changed

+112
-7
lines changed

contentcuration/contentcuration/decorators.py

+4
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,10 @@ class DelayUserStorageCalculation(ContextDecorator):
7676
def is_active(self):
7777
return self.depth > 0
7878

79+
def add(self, user_id):
80+
if user_id not in self.queue:
81+
self.queue.append(user_id)
82+
7983
def __enter__(self):
8084
self.depth += 1
8185

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
import logging
2+
3+
from django.core.management.base import BaseCommand
4+
5+
from contentcuration.celery import app
6+
from contentcuration.models import Change
7+
from contentcuration.models import User
8+
9+
logging.basicConfig()
10+
logger = logging.getLogger('command')
11+
12+
13+
class Command(BaseCommand):
14+
"""
15+
Reconciles that unready tasks are marked as reserved or active according to celery control
16+
"""
17+
18+
def handle(self, *args, **options):
19+
from contentcuration.tasks import apply_channel_changes_task
20+
from contentcuration.tasks import apply_user_changes_task
21+
22+
active_task_ids = []
23+
for worker_name, tasks in app.control.inspect().active().items():
24+
active_task_ids.extend(task['id'] for task in tasks)
25+
for worker_name, tasks in app.control.inspect().reserved().items():
26+
active_task_ids.extend(task['id'] for task in tasks)
27+
28+
channel_changes = Change.objects.filter(channel_id__isnull=False, applied=False, errored=False) \
29+
.order_by('channel_id', 'created_by_id') \
30+
.values('channel_id', 'created_by_id') \
31+
.distinct()
32+
for channel_change in channel_changes:
33+
apply_channel_changes_task.revoke(exclude_task_ids=active_task_ids, channel_id=channel_change['channel_id'])
34+
apply_channel_changes_task.fetch_or_enqueue(
35+
User.objects.get(pk=channel_change['created_by_id']),
36+
channel_id=channel_change['channel_id']
37+
)
38+
39+
user_changes = Change.objects.filter(channel_id__isnull=True, user_id__isnull=False, applied=False, errored=False) \
40+
.order_by('user_id', 'created_by_id') \
41+
.values('user_id', 'created_by_id') \
42+
.distinct()
43+
for user_change in user_changes:
44+
apply_user_changes_task.revoke(exclude_task_ids=active_task_ids, user_id=user_change['user_id'])
45+
apply_user_changes_task.fetch_or_enqueue(
46+
User.objects.get(pk=user_change['created_by_id']),
47+
user_id=user_change['user_id']
48+
)

contentcuration/contentcuration/tests/helpers.py

+2
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
from importlib import import_module
33

44
import mock
5+
from celery import states
56

67
from contentcuration.models import TaskResult
78

@@ -20,6 +21,7 @@ def clear_tasks(except_task_id=None):
2021
qs = qs.exclude(task_id=except_task_id)
2122
for task_id in qs.values_list("task_id", flat=True):
2223
app.control.revoke(task_id, terminate=True)
24+
qs.update(status=states.REVOKED)
2325

2426

2527
def mock_class_instance(target):

contentcuration/contentcuration/tests/test_asynctask.py

+17
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,9 @@ def _wait_for(self, async_result, timeout=30):
142142
with allow_join_result():
143143
return async_result.get(timeout=timeout)
144144

145+
def test_app_count_queued_tasks(self):
146+
self.assertIsInstance(app.count_queued_tasks(), int)
147+
145148
def test_asynctask_reports_success(self):
146149
"""
147150
Tests that when an async task is created and completed, the Task object has a status of 'SUCCESS' and
@@ -245,3 +248,17 @@ def test_requeue_task(self):
245248
second_result = self._wait_for(second_async_result)
246249
self.assertIsNone(second_result)
247250
self.assertTrue(second_async_result.successful())
251+
252+
def test_revoke_task(self):
253+
channel_id = uuid.uuid4()
254+
async_result = test_task.enqueue(self.user, channel_id=channel_id)
255+
test_task.revoke(channel_id=channel_id)
256+
257+
# this should raise an exception, even though revoked, because the task is in ready state but not success
258+
with self.assertRaises(Exception):
259+
self._wait_for(async_result)
260+
261+
try:
262+
TaskResult.objects.get(task_id=async_result.task_id, status=states.REVOKED)
263+
except TaskResult.DoesNotExist:
264+
self.fail('Missing revoked task result')

contentcuration/contentcuration/tests/test_decorators.py

+8-3
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,22 @@
22

33
from contentcuration.decorators import delay_user_storage_calculation
44
from contentcuration.tests.base import StudioTestCase
5+
from contentcuration.tests.base import testdata
56
from contentcuration.utils.user import calculate_user_storage
67

78

89
class DecoratorsTestCase(StudioTestCase):
10+
def setUp(self):
11+
super(DecoratorsTestCase, self).setUp()
12+
self.user = testdata.user()
13+
914
@mock.patch("contentcuration.utils.user.calculate_user_storage_task")
1015
def test_delay_storage_calculation(self, mock_task):
1116
@delay_user_storage_calculation
1217
def do_test():
13-
calculate_user_storage(self.admin_user.id)
14-
calculate_user_storage(self.admin_user.id)
18+
calculate_user_storage(self.user.id)
19+
calculate_user_storage(self.user.id)
1520
mock_task.fetch_or_enqueue.assert_not_called()
1621

1722
do_test()
18-
mock_task.fetch_or_enqueue.assert_called_once_with(self.admin_user, user_id=self.admin_user.id)
23+
mock_task.fetch_or_enqueue.assert_called_once_with(self.user, user_id=self.user.id)

contentcuration/contentcuration/utils/celery/app.py

+9
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,15 @@ def get_queued_tasks(self, queue_name="celery"):
4545

4646
return decoded_tasks
4747

48+
def count_queued_tasks(self, queue_name="celery"):
49+
"""
50+
:param queue_name: The queue name, defaults to the default "celery" queue
51+
:return: int
52+
"""
53+
with self.pool.acquire(block=True) as conn:
54+
count = conn.default_channel.client.llen(queue_name)
55+
return count
56+
4857
def decode_result(self, result, status=None):
4958
"""
5059
Decodes the celery result, like the raw result from the database, using celery tools

contentcuration/contentcuration/utils/celery/tasks.py

+19
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,25 @@ def requeue(self, **kwargs):
246246
logging.info(f"Re-queuing task {self.name} for user {task_result.user.pk} from {request.id} | {task_kwargs}")
247247
return self.enqueue(task_result.user, **task_kwargs)
248248

249+
def revoke(self, exclude_task_ids=None, **kwargs):
250+
"""
251+
Revokes and terminates all unready tasks matching the kwargs
252+
:param exclude_task_ids: Any task ids to exclude from this behavior
253+
:param kwargs: Task keyword arguments that will be used to match against tasks
254+
:return: The number of tasks revoked
255+
"""
256+
task_ids = self.find_incomplete_ids(**self.backend.decode(self._prepare_kwargs(kwargs)))
257+
if exclude_task_ids is not None:
258+
task_ids = task_ids.exclude(task_id__in=task_ids)
259+
count = 0
260+
for task_id in task_ids:
261+
logging.info(f"Revoking task {task_id}")
262+
self.app.control.revoke(task_id, terminate=True)
263+
count += 1
264+
# be sure the database backend has these marked appropriately
265+
self.TaskModel.objects.filter(task_id__in=task_ids).update(status=states.REVOKED)
266+
return count
267+
249268

250269
class CeleryAsyncResult(AsyncResult):
251270
"""

contentcuration/contentcuration/utils/user.py

+3-2
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,14 @@ def calculate_user_storage(user_id):
99
from contentcuration.decorators import delay_user_storage_calculation
1010

1111
if delay_user_storage_calculation.is_active:
12-
delay_user_storage_calculation.queue.append(user_id)
12+
delay_user_storage_calculation.add(user_id)
1313
return
1414

1515
try:
1616
if user_id is None:
1717
raise User.DoesNotExist
1818
user = User.objects.get(pk=user_id)
19-
calculate_user_storage_task.fetch_or_enqueue(user, user_id=user_id)
19+
if not user.is_admin:
20+
calculate_user_storage_task.fetch_or_enqueue(user, user_id=user_id)
2021
except User.DoesNotExist:
2122
logging.error("Tried to calculate user storage for user with id {} but they do not exist".format(user_id))

contentcuration/contentcuration/views/internal.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
from contentcuration.api import activate_channel
3838
from contentcuration.api import write_file_to_storage
3939
from contentcuration.constants import completion_criteria
40+
from contentcuration.decorators import delay_user_storage_calculation
4041
from contentcuration.models import AssessmentItem
4142
from contentcuration.models import Change
4243
from contentcuration.models import Channel
@@ -54,7 +55,6 @@
5455
from contentcuration.utils.nodes import map_files_to_node
5556
from contentcuration.utils.nodes import map_files_to_slideshow_slide_item
5657
from contentcuration.utils.sentry import report_exception
57-
from contentcuration.utils.tracing import trace
5858
from contentcuration.viewsets.sync.constants import CHANNEL
5959
from contentcuration.viewsets.sync.utils import generate_publish_event
6060
from contentcuration.viewsets.sync.utils import generate_update_event
@@ -565,7 +565,7 @@ def __init__(self, node, errors):
565565
super(IncompleteNodeError, self).__init__(self.message)
566566

567567

568-
@trace
568+
@delay_user_storage_calculation
569569
def convert_data_to_nodes(user, content_data, parent_node):
570570
""" Parse dict and create nodes accordingly """
571571
try:

0 commit comments

Comments
 (0)