Skip to content

Commit 25f1950

Browse files
committed
Add option for challenging/removing lock on task start
1 parent c2cf1cd commit 25f1950

File tree

5 files changed

+371
-11
lines changed

5 files changed

+371
-11
lines changed

README.md

+49-11
Original file line numberDiff line numberDiff line change
@@ -7,17 +7,19 @@ This is a baseclass for celery tasks that ensures only one instance of the task
77
**Table of Contents**
88

99
- [Celery-Singleton](#celery-singleton)
10-
- [Prerequisites](#prerequisites)
11-
- [Quick start](#quick-start)
12-
- [How does it work?](#how-does-it-work)
13-
- [Handling deadlocks](#handling-deadlocks)
14-
- [Backends](#backends)
15-
- [Task configuration](#task-configuration)
16-
- [unique\_on](#uniqueon)
17-
- [raise\_on\_duplicate](#raiseonduplicate)
18-
- [App Configuration](#app-configuration)
19-
- [Testing](#testing)
20-
- [Contribute](#contribute)
10+
- [Prerequisites](#prerequisites)
11+
- [Quick start](#quick-start)
12+
- [How does it work?](#how-does-it-work)
13+
- [Handling deadlocks](#handling-deadlocks)
14+
- [Backends](#backends)
15+
- [Task configuration](#task-configuration)
16+
- [unique\_on](#unique_on)
17+
- [raise\_on\_duplicate](#raise_on_duplicate)
18+
- [lock\_expiry](#lock_expiry)
19+
- [callenge_lock_on_startup](#callenge_lock_on_startup)
20+
- [App Configuration](#app-configuration)
21+
- [Testing](#testing)
22+
- [Contribute](#contribute)
2123

2224
<!-- markdown-toc end -->
2325

@@ -182,6 +184,42 @@ assert task1 != task2 # These are two separate task instances
182184

183185
This option can be applied globally in the [app config](#app-configuration) with `singleton_lock_expiry`. Task option supersedes the app config.
184186

187+
### callenge_lock_on_startup
188+
189+
Some tasks may be stucked after unexpected shuttdown of a worker.
190+
If this situation should be avoided, the lock can be challenged on task startup.
191+
Setting `challenge_lock_on_startup` to true will cause every submission of this task to check, before trying to acquire a lock :
192+
193+
1. check if a lock matching this task is already there
194+
2. check if at least one worker responds to ping
195+
3. inspect workers for finding owner of the lock (existing task with id matching the one from le lock)
196+
1. inspect active tasks (tasks currently being processed by a worker) : check that task id is not present
197+
2. inspect reserved tasks (tasks claimed by a worker, but not yet running) : check that task id is not present
198+
3. inspect scheduled tasks (tasks received by , but scheduled at some point in the future) : check that task id is not present
199+
200+
If any of previous check fails, the lock remains.
201+
202+
> **Use with caution** : this configuration may lead to unexpected behaviour. Try using classic [deadlocks handling](#handling-deadlocks) if possible.
203+
> Intermittent failure to communicate with workers or submission of a task before any worker receives the task owning the lock may lead to having multiple instances of the task running simultaneously.
204+
> The extensive use of the [`celery worker inspection`](https://docs.celeryproject.org/en/stable/userguide/workers.html#inspecting-workers) API may also lead to performances issues when working with large number of workers and/or large number of tasks.
205+
206+
```python
207+
from celery_singleton import Singleton
208+
from .local_path import app
209+
210+
211+
@app.task(
212+
base=Singleton,
213+
challenge_lock_on_startup=True,
214+
celery_app=app,
215+
)
216+
def do_something(username):
217+
time.sleep(5)
218+
219+
task1 = do_something.delay('bob')
220+
# Worker cold shuttdown causes lock to remain
221+
task2 = do_something.delay('bob') # lock should be removed
222+
```
185223

186224
## App Configuration
187225

celery_singleton/inspect_celery.py

+51
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
from celery import Celery
2+
3+
class CeleryTask:
4+
def __init__(self, id, name, args, kwargs):
5+
self.id = id
6+
self.name = name
7+
self.args = args
8+
self.kwargs = kwargs
9+
10+
def get_task_by_id(app, id):
11+
inspector = app.control.inspect()
12+
try:
13+
active_task_found = _get_task(id, inspector.active())
14+
if active_task_found:
15+
return active_task_found
16+
reserved_task_found = _get_task(id, inspector.reserved())
17+
if reserved_task_found:
18+
return reserved_task_found
19+
scheduled_task_found = _get_scheduled_task(id, inspector.scheduled())
20+
if scheduled_task_found:
21+
return scheduled_task_found
22+
except Exception:
23+
pass
24+
return None
25+
26+
def _get_task(id, active_task_dict):
27+
for worker, task_list in active_task_dict.items():
28+
for task in task_list:
29+
if task['id'] == id:
30+
return CeleryTask(task['id'], task["name"], task["args"], task["kwargs"])
31+
return None
32+
33+
def _get_scheduled_task(id, scheduled_task_dict):
34+
for worker, task_scheduled_list in scheduled_task_dict.items():
35+
for scheduled in task_scheduled_list:
36+
task = scheduled['request']
37+
if task['id'] == id:
38+
return CeleryTask(task["id"], task["name"], task["args"], task["kwargs"])
39+
return None
40+
41+
def are_worker_active(app):
42+
worke_pong = app.control.inspect().ping()
43+
if not worke_pong:
44+
return False
45+
try:
46+
for worker, response in worke_pong.items():
47+
if response['ok']:
48+
return True
49+
except Exception:
50+
pass
51+
return False

celery_singleton/singleton.py

+17
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from .config import Config
77
from .exceptions import DuplicateTaskError
88
from . import util
9+
from . import inspect_celery
910

1011

1112
def clear_locks(app):
@@ -21,6 +22,8 @@ class Singleton(BaseTask):
2122
unique_on = None
2223
raise_on_duplicate = None
2324
lock_expiry = None
25+
challenge_lock_on_startup = False
26+
celery_app = None
2427

2528
@property
2629
def _raise_on_duplicate(self):
@@ -103,6 +106,9 @@ def apply_async(
103106
**options
104107
)
105108

109+
if self.challenge_lock_on_startup:
110+
self.challenge_lock(lock, args, kwargs)
111+
106112
task = self.lock_and_run(**run_args)
107113
if task:
108114
return task
@@ -126,6 +132,17 @@ def lock_and_run(self, lock, *args, task_id=None, **kwargs):
126132
# Clear the lock if apply_async fails
127133
self.unlock(lock)
128134
raise
135+
136+
def challenge_lock(self, lock, args, kwargs):
137+
current_lock_id = self.singleton_backend.get(lock)
138+
if not current_lock_id:
139+
return
140+
if not inspect_celery.are_worker_active(self.celery_app):
141+
return
142+
task_found = inspect_celery.get_task_by_id(self.celery_app, current_lock_id)
143+
if task_found:
144+
return
145+
self.unlock(lock)
129146

130147
def release_lock(self, task_args=None, task_kwargs=None):
131148
lock = self.generate_lock(self.name, task_args, task_kwargs)

tests/test_inspect_celery.py

+170
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
import pytest
2+
import sys
3+
from unittest import mock
4+
from celery_singleton.inspect_celery import get_task_by_id, are_worker_active, CeleryTask
5+
6+
7+
class TestInspectCelery:
8+
def test__get_task_by_id__should_return_matching_task__when_task_active(self):
9+
# Given
10+
task_id = '262a1cf9-2c4f-4680-8261-7498fb39756c'
11+
active_tasks = {'celery@worker_host': [{'id': task_id, 'name': 'simple_task', 'args': [1, 2, 3], 'kwargs': {}, 'type': 'simple_task', 'hostname': 'celery@worker_host', 'time_start': 1588508284.207397, 'acknowledged': True, 'delivery_info': {'exchange': '', 'routing_key': 'celery', 'priority': 0, 'redelivered': None}, 'worker_pid': 45895}]}
12+
inspect_mock = mock.MagicMock()
13+
inspect_mock.active.return_value = active_tasks
14+
control_mock = mock.MagicMock()
15+
control_mock.inspect.return_value = inspect_mock
16+
mock_app = mock.MagicMock()
17+
mock_app.control = control_mock
18+
19+
# When
20+
active_task = get_task_by_id(mock_app, task_id)
21+
22+
# Then
23+
assert active_task is not None
24+
assert active_task.id == task_id
25+
assert active_task.name == 'simple_task'
26+
assert active_task.args == [1, 2, 3]
27+
assert active_task.kwargs == {}
28+
29+
def test__get_task_by_id__should_return_matching_task__when_task_scheduled(self):
30+
# Given
31+
task_id = '262a1cf9-2c4f-4680-8261-7498fb39756c'
32+
active_tasks = {'celery@worker_host': []}
33+
scheduled_tasks = {'celery@worker_host': [{'eta': '2020-05-12T17:31:32.886704+00:00', 'priority': 6, 'request': {'id': task_id, 'name': 'simple_task', 'args': [1, 2, 3], 'kwargs': {}, 'type': 'simple_task', 'hostname': 'celery@worker_host', 'time_start': None, 'acknowledged': False, 'delivery_info': {'exchange': '', 'routing_key': 'celery', 'priority': 0, 'redelivered': None}, 'worker_pid': None}}]}
34+
inspect_mock = mock.MagicMock()
35+
inspect_mock.active.return_value = active_tasks
36+
inspect_mock.scheduled.return_value = scheduled_tasks
37+
control_mock = mock.MagicMock()
38+
control_mock.inspect.return_value = inspect_mock
39+
mock_app = mock.MagicMock()
40+
mock_app.control = control_mock
41+
42+
# When
43+
active_task = get_task_by_id(mock_app, task_id)
44+
45+
# Then
46+
assert active_task is not None
47+
assert active_task.id == task_id
48+
assert active_task.name == 'simple_task'
49+
assert active_task.args == [1, 2, 3]
50+
assert active_task.kwargs == {}
51+
52+
def test__get_task_by_id__should_return_matching_task__when_task_reserved(self):
53+
# Given
54+
task_id = '262a1cf9-2c4f-4680-8261-7498fb39756c'
55+
active_tasks = {'celery@worker_host': []}
56+
scheduled_tasks = {'celery@worker_host': []}
57+
reserved_tasks = {'celery@worker_host': [{'id': task_id, 'name': 'simple_task', 'args': [1, 2, 3], 'kwargs': {}, 'type': 'simple_task', 'hostname': 'celery@worker_host', 'time_start': 1588508284.207397, 'acknowledged': True, 'delivery_info': {'exchange': '', 'routing_key': 'celery', 'priority': 0, 'redelivered': None}, 'worker_pid': 45895}]}
58+
inspect_mock = mock.MagicMock()
59+
inspect_mock.active.return_value = active_tasks
60+
inspect_mock.scheduled.return_value = scheduled_tasks
61+
inspect_mock.reserved.return_value = reserved_tasks
62+
control_mock = mock.MagicMock()
63+
control_mock.inspect.return_value = inspect_mock
64+
mock_app = mock.MagicMock()
65+
mock_app.control = control_mock
66+
67+
# When
68+
active_task = get_task_by_id(mock_app, task_id)
69+
70+
# Then
71+
assert active_task is not None
72+
assert active_task.id == task_id
73+
assert active_task.name == 'simple_task'
74+
assert active_task.args == [1, 2, 3]
75+
assert active_task.kwargs == {}
76+
77+
def test__get_task_by_id__should_return_none_when_no_matching_task_id(self):
78+
# Given
79+
active_tasks = {'celery@worker_host': [{'id': '262a1cf9-2c4f-4680-8261-7498fb39756c', 'name': 'simple_task', 'args': [1, 2, 3], 'kwargs': {}, 'type': 'simple_task', 'hostname': 'celery@worker_host', 'time_start': 1588508284.207397, 'acknowledged': True, 'delivery_info': {'exchange': '', 'routing_key': 'celery', 'priority': 0, 'redelivered': None}, 'worker_pid': 45895}]}
80+
inspect_mock = mock.MagicMock()
81+
inspect_mock.active.return_value = active_tasks
82+
control_mock = mock.MagicMock()
83+
control_mock.inspect.return_value = inspect_mock
84+
mock_app = mock.MagicMock()
85+
mock_app.control = control_mock
86+
87+
# When
88+
active_task = get_task_by_id(mock_app, "bad_task_name")
89+
90+
# Then
91+
assert active_task is None
92+
93+
def test__get_task_by_id__should_return_none_when_no_active_tasks(self):
94+
# Given
95+
inspect_mock = mock.MagicMock()
96+
inspect_mock.active.return_value = None
97+
control_mock = mock.MagicMock()
98+
control_mock.inspect.return_value = inspect_mock
99+
mock_app = mock.MagicMock()
100+
mock_app.control = control_mock
101+
102+
# When
103+
active_task = get_task_by_id(mock_app, "any_name")
104+
105+
# Then
106+
assert active_task is None
107+
108+
def test__get_task_by_id__should_return_none_when_worker_answer_cannot_be_parsed(self):
109+
# Given
110+
inspect_mock = mock.MagicMock()
111+
inspect_mock.active.return_value = {'worker': ['bad_task_definition']}
112+
control_mock = mock.MagicMock()
113+
control_mock.inspect.return_value = inspect_mock
114+
mock_app = mock.MagicMock()
115+
mock_app.control = control_mock
116+
117+
# When
118+
active_task = get_task_by_id(mock_app, "bad_task_name")
119+
120+
# Then
121+
assert active_task is None
122+
123+
def test__are_worker_active__should_return_true_if_worker_responds_to_ping(self):
124+
# Given
125+
active_workers = {u'celery@host': {u'ok': u'pong'}}
126+
inspect_mock = mock.MagicMock()
127+
inspect_mock.ping.return_value = active_workers
128+
control_mock = mock.MagicMock()
129+
control_mock.inspect.return_value = inspect_mock
130+
mock_app = mock.MagicMock()
131+
mock_app.control = control_mock
132+
133+
# When
134+
active_workers_found = are_worker_active(mock_app)
135+
136+
# Then
137+
assert active_workers_found
138+
139+
def test__are_worker_active__should_return_false_if_worker_does_not_respond_to_ping(self):
140+
# Given
141+
active_workers = None
142+
inspect_mock = mock.MagicMock()
143+
inspect_mock.ping.return_value = active_workers
144+
control_mock = mock.MagicMock()
145+
control_mock.inspect.return_value = inspect_mock
146+
mock_app = mock.MagicMock()
147+
mock_app.control = control_mock
148+
149+
# When
150+
active_workers_found = are_worker_active(mock_app)
151+
152+
# Then
153+
assert not active_workers_found
154+
155+
def test__are_worker_active__should_return_false_if_worker_respond_ko_to_ping(self):
156+
# Given
157+
active_workers = {u'celery@host': {u'not_ok': u'pong'}}
158+
inspect_mock = mock.MagicMock()
159+
inspect_mock.ping.return_value = active_workers
160+
control_mock = mock.MagicMock()
161+
control_mock.inspect.return_value = inspect_mock
162+
mock_app = mock.MagicMock()
163+
mock_app.control = control_mock
164+
165+
# When
166+
active_workers_found = are_worker_active(mock_app)
167+
168+
# Then
169+
assert not active_workers_found
170+

0 commit comments

Comments
 (0)