Skip to content

Commit 82403a1

Browse files
committed
feat: add option for challenging/removing lock on task start
add challenge_lock_on_startup setting on Singleton add celery_app setting as passing Celery application is mandatory for inspection of celery workers
2 parents c2cf1cd + c5265cd commit 82403a1

File tree

5 files changed

+271
-11
lines changed

5 files changed

+271
-11
lines changed

README.md

+39-11
Original file line numberDiff line numberDiff line change
@@ -7,17 +7,19 @@ This is a baseclass for celery tasks that ensures only one instance of the task
77
**Table of Contents**
88

99
- [Celery-Singleton](#celery-singleton)
10-
- [Prerequisites](#prerequisites)
11-
- [Quick start](#quick-start)
12-
- [How does it work?](#how-does-it-work)
13-
- [Handling deadlocks](#handling-deadlocks)
14-
- [Backends](#backends)
15-
- [Task configuration](#task-configuration)
16-
- [unique\_on](#uniqueon)
17-
- [raise\_on\_duplicate](#raiseonduplicate)
18-
- [App Configuration](#app-configuration)
19-
- [Testing](#testing)
20-
- [Contribute](#contribute)
10+
- [Prerequisites](#prerequisites)
11+
- [Quick start](#quick-start)
12+
- [How does it work?](#how-does-it-work)
13+
- [Handling deadlocks](#handling-deadlocks)
14+
- [Backends](#backends)
15+
- [Task configuration](#task-configuration)
16+
- [unique\_on](#uniqueon)
17+
- [raise\_on\_duplicate](#raiseonduplicate)
18+
- [lock\_expiry](#lockexpiry)
19+
- [callenge_lock_on_startup](#callengelockonstartup)
20+
- [App Configuration](#app-configuration)
21+
- [Testing](#testing)
22+
- [Contribute](#contribute)
2123

2224
<!-- markdown-toc end -->
2325

@@ -182,6 +184,32 @@ assert task1 != task2 # These are two separate task instances
182184

183185
This option can be applied globally in the [app config](#app-configuration) with `singleton_lock_expiry`. Task option supersedes the app config.
184186

187+
### callenge_lock_on_startup
188+
189+
Some tasks may be stucked after unexpected shuttdown of a worker.
190+
If this situation should be avoided, the lock can be challenged on task startup.
191+
192+
> **Use with caution** : this configuration may lead to unexpected behaviour. Try using classic [deadlocks handling](#markdown-header-handling-deadlocks) if possible.
193+
> this may lead to unexpected behaviour if celery is in a transition state at task startup
194+
> ex: Tasks beeing scheduled while no worker online will all start on worker startup
195+
196+
```python
197+
from celery_singleton import Singleton
198+
from .local_path import app
199+
200+
201+
@app.task(
202+
base=Singleton,
203+
challenge_lock_on_startup=True,
204+
celery_app=app,
205+
)
206+
def do_something(username):
207+
time.sleep(5)
208+
209+
task1 = do_something.delay('bob')
210+
# Worker cold shuttdown causes lock to remain
211+
task2 = do_something.delay('bob') # lock should be removed
212+
```
185213

186214
## App Configuration
187215

celery_singleton/inspect_celery.py

+35
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
from celery import Celery
2+
3+
class CeleryTask:
4+
def __init__(self, name, args, kwargs):
5+
self.name = name
6+
self.args = args
7+
self.kwargs = kwargs
8+
9+
def list_celery_tasks_with_name(app: Celery, name: str) -> [CeleryTask]:
10+
active_tasks = app.control.inspect().active()
11+
if not active_tasks:
12+
return []
13+
global_task_list = []
14+
try:
15+
for worker, task_list in active_tasks.items():
16+
for task in task_list:
17+
if not task['name'] == name:
18+
continue
19+
celery_task = CeleryTask(task["name"], task["args"], task["kwargs"])
20+
global_task_list.append(celery_task)
21+
except Exception:
22+
pass
23+
return global_task_list
24+
25+
def are_worker_active(app: Celery) -> bool:
26+
worke_pong = app.control.inspect().ping()
27+
if not worke_pong:
28+
return False
29+
try:
30+
for worker, response in worke_pong.items():
31+
if response['ok']:
32+
return True
33+
except Exception:
34+
pass
35+
return False

celery_singleton/singleton.py

+18
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from .config import Config
77
from .exceptions import DuplicateTaskError
88
from . import util
9+
from . import inspect_celery
910

1011

1112
def clear_locks(app):
@@ -21,6 +22,8 @@ class Singleton(BaseTask):
2122
unique_on = None
2223
raise_on_duplicate = None
2324
lock_expiry = None
25+
challenge_lock_on_startup = False
26+
celery_app = None
2427

2528
@property
2629
def _raise_on_duplicate(self):
@@ -54,6 +57,10 @@ def get_existing_task_id(self, lock):
5457
return self.singleton_backend.get(lock)
5558

5659
def generate_lock(self, task_name, task_args=None, task_kwargs=None):
60+
unique_on = self.unique_on
61+
return self._generate_lock_filtered_by_unique_on(unique_on, task_name, task_args, task_kwargs)
62+
63+
def _generate_lock_filtered_by_unique_on(self, unique_on, task_name, task_args=None, task_kwargs=None):
5764
unique_on = self.unique_on
5865
task_args = task_args or []
5966
task_kwargs = task_kwargs or {}
@@ -103,6 +110,9 @@ def apply_async(
103110
**options
104111
)
105112

113+
if self.challenge_lock_on_startup:
114+
self.challenge_lock(lock, args, kwargs)
115+
106116
task = self.lock_and_run(**run_args)
107117
if task:
108118
return task
@@ -126,6 +136,14 @@ def lock_and_run(self, lock, *args, task_id=None, **kwargs):
126136
# Clear the lock if apply_async fails
127137
self.unlock(lock)
128138
raise
139+
140+
def challenge_lock(self, lock, args, kwargs):
141+
current_task_lock = self.generate_lock(self.name, args, kwargs)
142+
active_tasks = inspect_celery.list_celery_tasks_with_name(self.celery_app, self.name)
143+
active_tasks_lock = [self.generate_lock(task.name, task.args, task.kwargs) for task in active_tasks]
144+
if lock in active_tasks_lock or not inspect_celery.are_worker_active(self.celery_app):
145+
return
146+
self.unlock(lock)
129147

130148
def release_lock(self, task_args=None, task_kwargs=None):
131149
lock = self.generate_lock(self.name, task_args, task_kwargs)

tests/test_inspect_celery.py

+121
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
import pytest
2+
import sys
3+
from unittest import mock
4+
from celery_singleton.inspect_celery import list_celery_tasks_with_name, are_worker_active, CeleryTask
5+
6+
7+
class TestInspectCelery:
8+
def test__list_celery_tasks_with_name__should_return_matching_task(self):
9+
# Given
10+
active_tasks = {'celery@worker_host': [{'id': '262a1cf9-2c4f-4680-8261-7498fb39756c', 'name': 'simple_task', 'args': [1, 2, 3], 'kwargs': {}, 'type': 'simple_task', 'hostname': 'celery@worker_host', 'time_start': 1588508284.207397, 'acknowledged': True, 'delivery_info': {'exchange': '', 'routing_key': 'celery', 'priority': 0, 'redelivered': None}, 'worker_pid': 45895}]}
11+
inspect_mock = mock.MagicMock()
12+
inspect_mock.active.return_value = active_tasks
13+
control_mock = mock.MagicMock()
14+
control_mock.inspect.return_value = inspect_mock
15+
mock_app = mock.MagicMock()
16+
mock_app.control = control_mock
17+
18+
# When
19+
task_list = list_celery_tasks_with_name(mock_app, "simple_task")
20+
21+
# Then
22+
assert len(task_list) == 1
23+
actual = task_list[0]
24+
assert actual.name == 'simple_task'
25+
assert actual.args == [1, 2, 3]
26+
assert actual.kwargs == {}
27+
28+
def test__list_celery_tasks_with_name__should_return_empty_list_when_no_matching_taskname(self):
29+
# Given
30+
active_tasks = {'celery@worker_host': [{'id': '262a1cf9-2c4f-4680-8261-7498fb39756c', 'name': 'simple_task', 'args': [1, 2, 3], 'kwargs': {}, 'type': 'simple_task', 'hostname': 'celery@worker_host', 'time_start': 1588508284.207397, 'acknowledged': True, 'delivery_info': {'exchange': '', 'routing_key': 'celery', 'priority': 0, 'redelivered': None}, 'worker_pid': 45895}]}
31+
inspect_mock = mock.MagicMock()
32+
inspect_mock.active.return_value = active_tasks
33+
control_mock = mock.MagicMock()
34+
control_mock.inspect.return_value = inspect_mock
35+
mock_app = mock.MagicMock()
36+
mock_app.control = control_mock
37+
38+
# When
39+
task_list = list_celery_tasks_with_name(mock_app, "bad_task_name")
40+
41+
# Then
42+
assert len(task_list) == 0
43+
44+
def test__list_celery_tasks_with_name__should_return_empty_list_when_no_active_tasts(self):
45+
# Given
46+
inspect_mock = mock.MagicMock()
47+
inspect_mock.active.return_value = None
48+
control_mock = mock.MagicMock()
49+
control_mock.inspect.return_value = inspect_mock
50+
mock_app = mock.MagicMock()
51+
mock_app.control = control_mock
52+
53+
# When
54+
task_list = list_celery_tasks_with_name(mock_app, "any_name")
55+
56+
# Then
57+
assert len(task_list) == 0
58+
59+
def test__list_celery_tasks_with_name__should_return_empty_list_when_worker_answer_cannot_be_parsed(self):
60+
# Given
61+
inspect_mock = mock.MagicMock()
62+
inspect_mock.active.return_value = {'worker': ['bad_task_definition']}
63+
control_mock = mock.MagicMock()
64+
control_mock.inspect.return_value = inspect_mock
65+
mock_app = mock.MagicMock()
66+
mock_app.control = control_mock
67+
68+
# When
69+
task_list = list_celery_tasks_with_name(mock_app, "bad_task_name")
70+
71+
# Then
72+
assert len(task_list) == 0
73+
74+
def test__are_worker_active__should_return_true_if_worker_responds_to_ping(self):
75+
# Given
76+
active_workers = {u'celery@host': {u'ok': u'pong'}}
77+
inspect_mock = mock.MagicMock()
78+
inspect_mock.ping.return_value = active_workers
79+
control_mock = mock.MagicMock()
80+
control_mock.inspect.return_value = inspect_mock
81+
mock_app = mock.MagicMock()
82+
mock_app.control = control_mock
83+
84+
# When
85+
active_workers_found = are_worker_active(mock_app)
86+
87+
# Then
88+
assert active_workers_found
89+
90+
def test__are_worker_active__should_return_false_if_worker_does_not_respond_to_ping(self):
91+
# Given
92+
active_workers = None
93+
inspect_mock = mock.MagicMock()
94+
inspect_mock.ping.return_value = active_workers
95+
control_mock = mock.MagicMock()
96+
control_mock.inspect.return_value = inspect_mock
97+
mock_app = mock.MagicMock()
98+
mock_app.control = control_mock
99+
100+
# When
101+
active_workers_found = are_worker_active(mock_app)
102+
103+
# Then
104+
assert not active_workers_found
105+
106+
def test__are_worker_active__should_return_false_if_worker_respond_ko_to_ping(self):
107+
# Given
108+
active_workers = {u'celery@host': {u'not_ok': u'pong'}}
109+
inspect_mock = mock.MagicMock()
110+
inspect_mock.ping.return_value = active_workers
111+
control_mock = mock.MagicMock()
112+
control_mock.inspect.return_value = inspect_mock
113+
mock_app = mock.MagicMock()
114+
mock_app.control = control_mock
115+
116+
# When
117+
active_workers_found = are_worker_active(mock_app)
118+
119+
# Then
120+
assert not active_workers_found
121+

tests/test_singleton.py

+58
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
from celery_singleton.backends.redis import RedisBackend
1111
from celery_singleton.backends import get_backend
1212
from celery_singleton.config import Config
13+
from celery_singleton import inspect_celery
1314

1415

1516
@pytest.fixture(scope="session")
@@ -313,3 +314,60 @@ def simple_task(*args):
313314
mock_lock.assert_called_once_with(
314315
simple_task.singleton_backend, lock, result.task_id, expiry=60
315316
)
317+
318+
class TestChallengeLockOnStartup:
319+
@mock.patch.object(inspect_celery, "list_celery_tasks_with_name", return_value=[])
320+
@mock.patch.object(inspect_celery, "are_worker_active", return_value=True)
321+
def test__lock_invalidation__remove_lock_if_no_task_running(self, mock_celery_inspect, mock_celery_ping, scoped_app):
322+
with scoped_app as app:
323+
# Given
324+
task_args=[1, 2, 3]
325+
@app.task(name="simple_task", base=Singleton, challenge_lock_on_startup=True, celery_app=app)
326+
def simple_task(*args):
327+
return args
328+
lock = simple_task.generate_lock(simple_task.name, task_args=task_args)
329+
simple_task.unlock = mock.MagicMock()
330+
simple_task.release_lock = mock.MagicMock() # avoid call to unlock on task completion
331+
332+
# When
333+
simple_task.delay(1, 2, 3)
334+
335+
# Then
336+
simple_task.unlock.assert_called_once_with(lock)
337+
338+
@mock.patch.object(inspect_celery, "list_celery_tasks_with_name", return_value=[inspect_celery.CeleryTask('simple_task', args=[1, 2, 3], kwargs=None)])
339+
@mock.patch.object(inspect_celery, "are_worker_active", return_value=True)
340+
def test__lock_invalidation__do_not_remove_lock_if_task_running(self, mock_celery_inspect, mock_celery_ping, scoped_app):
341+
with scoped_app as app:
342+
# Given
343+
task_name = "simple_task"
344+
task_args=[1, 2, 3]
345+
@app.task(name="simple_task", base=Singleton, challenge_lock_on_startup=True, celery_app=app)
346+
def simple_task(*args):
347+
return args
348+
lock = simple_task.generate_lock(simple_task.name, task_args, None)
349+
simple_task.unlock = mock.MagicMock()
350+
simple_task.release_lock = mock.MagicMock() # avoid call to unlock on task completion
351+
352+
# When
353+
simple_task.delay(1, 2, 3)
354+
355+
# Then
356+
simple_task.unlock.assert_not_called()
357+
358+
@mock.patch.object(inspect_celery, "list_celery_tasks_with_name", return_value=[])
359+
@mock.patch.object(inspect_celery, "are_worker_active", return_value=False)
360+
def test__lock_invalidation__do_not_remove_lock_if_no_worker_running(self, mock_celery_inspect, mock_celery_ping, scoped_app):
361+
with scoped_app as app:
362+
# Given
363+
@app.task(name="simple_task", base=Singleton, challenge_lock_on_startup=True, celery_app=app)
364+
def simple_task(*args):
365+
return args
366+
simple_task.unlock = mock.MagicMock()
367+
simple_task.release_lock = mock.MagicMock() # avoid call to unlock on task completion
368+
369+
# When
370+
simple_task.delay(1, 2, 3)
371+
372+
# Then
373+
simple_task.unlock.assert_not_called()

0 commit comments

Comments
 (0)