File tree 2 files changed +50
-0
lines changed
styleguide_example/blog_examples
2 files changed +50
-0
lines changed Original file line number Diff line number Diff line change
1
+ import logging
2
+ from functools import wraps
3
+
4
+ from celery import shared_task
5
+
6
+ from styleguide_example .tasks import celery_app
7
+
8
+ inspect = celery_app .control .inspect
9
+
10
+ logger = logging .getLogger (__name__ )
11
+
12
+
13
+ def non_concurrent_task (_func = None , * args , ** kwargs ):
14
+ def wrapper (func ):
15
+ @wraps (func )
16
+ def inner (_bound_self , * _func_args , ** _func_kwargs ):
17
+ running_task_count = 0
18
+
19
+ queues = inspect ().active ()
20
+
21
+ if queues is None :
22
+ queues = {}
23
+
24
+ for running_tasks in queues .values ():
25
+ for task in running_tasks :
26
+ if task ["name" ] == _bound_self .name :
27
+ running_task_count += 1
28
+
29
+ if running_task_count > 1 :
30
+ logger .warning (f"[non_concurrent_task] Task { _bound_self .name } is already running" )
31
+ return
32
+
33
+ return func (* _func_args , ** _func_kwargs )
34
+
35
+ return shared_task (bind = True , * args , ** kwargs )(inner )
36
+
37
+ if _func is None :
38
+ return wrapper
39
+
40
+ return wrapper (_func )
41
+
42
+
43
+ @non_concurrent_task
44
+ def test_non_concurrent_task ():
45
+ logger .info ("A non-concurrent task is running" )
46
+ import time
47
+ time .sleep (10 )
48
+ logger .info ("A non-concurrent task finished" )
Original file line number Diff line number Diff line change
1
+ # noqa
2
+ from styleguide_example .blog_examples .celery_non_concurrent .tasks import test_non_concurrent_task
You can’t perform that action at this time.
0 commit comments