Skip to content

Commit a331538

Browse files
committed
Add management command that reconciles tasks for changes
1 parent a997d0c commit a331538

File tree

1 file changed

+48
-0
lines changed

1 file changed

+48
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
import logging
2+
3+
from django.core.management.base import BaseCommand
4+
5+
from contentcuration.celery import app
6+
from contentcuration.models import Change
7+
from contentcuration.models import User
8+
9+
logging.basicConfig()
10+
logger = logging.getLogger('command')
11+
12+
13+
class Command(BaseCommand):
14+
"""
15+
Reconciles that unready tasks are marked as reserved or active according to celery control
16+
"""
17+
18+
def handle(self, *args, **options):
19+
from contentcuration.tasks import apply_channel_changes_task
20+
from contentcuration.tasks import apply_user_changes_task
21+
22+
active_task_ids = []
23+
for worker_name, tasks in app.control.inspect().active().items():
24+
active_task_ids.extend(task['id'] for task in tasks)
25+
for worker_name, tasks in app.control.inspect().reserved().items():
26+
active_task_ids.extend(task['id'] for task in tasks)
27+
28+
channel_changes = Change.objects.filter(channel_id__isnull=False, applied=False, errored=False) \
29+
.order_by('channel_id', 'created_by_id') \
30+
.values('channel_id', 'created_by_id') \
31+
.distinct()
32+
for channel_change in channel_changes:
33+
apply_channel_changes_task.revoke(exclude_task_ids=active_task_ids, channel_id=channel_change['channel_id'])
34+
apply_channel_changes_task.fetch_or_enqueue(
35+
User.objects.get(pk=channel_change['created_by_id']),
36+
channel_id=channel_change['channel_id']
37+
)
38+
39+
user_changes = Change.objects.filter(channel_id__isnull=True, user_id__isnull=False, applied=False, errored=False) \
40+
.order_by('user_id', 'created_by_id') \
41+
.values('user_id', 'created_by_id') \
42+
.distinct()
43+
for user_change in user_changes:
44+
apply_user_changes_task.revoke(exclude_task_ids=active_task_ids, user_id=user_change['user_id'])
45+
apply_user_changes_task.fetch_or_enqueue(
46+
User.objects.get(pk=user_change['created_by_id']),
47+
user_id=user_change['user_id']
48+
)

0 commit comments

Comments
 (0)