Skip to content

Commit ea605a2

Browse files
committed
add option for automatic remove of a lock
1 parent c2cf1cd commit ea605a2

File tree

5 files changed

+334
-12
lines changed

5 files changed

+334
-12
lines changed

README.md

+39-11
Original file line numberDiff line numberDiff line change
@@ -7,17 +7,19 @@ This is a baseclass for celery tasks that ensures only one instance of the task
77
**Table of Contents**
88

99
- [Celery-Singleton](#celery-singleton)
10-
- [Prerequisites](#prerequisites)
11-
- [Quick start](#quick-start)
12-
- [How does it work?](#how-does-it-work)
13-
- [Handling deadlocks](#handling-deadlocks)
14-
- [Backends](#backends)
15-
- [Task configuration](#task-configuration)
16-
- [unique\_on](#uniqueon)
17-
- [raise\_on\_duplicate](#raiseonduplicate)
18-
- [App Configuration](#app-configuration)
19-
- [Testing](#testing)
20-
- [Contribute](#contribute)
10+
- [Prerequisites](#prerequisites)
11+
- [Quick start](#quick-start)
12+
- [How does it work?](#how-does-it-work)
13+
- [Handling deadlocks](#handling-deadlocks)
14+
- [Backends](#backends)
15+
- [Task configuration](#task-configuration)
16+
- [unique\_on](#uniqueon)
17+
- [raise\_on\_duplicate](#raiseonduplicate)
18+
- [lock\_expiry](#lockexpiry)
19+
- [callenge_lock_on_startup](#callengelockonstartup)
20+
- [App Configuration](#app-configuration)
21+
- [Testing](#testing)
22+
- [Contribute](#contribute)
2123

2224
<!-- markdown-toc end -->
2325

@@ -182,6 +184,32 @@ assert task1 != task2 # These are two separate task instances
182184

183185
This option can be applied globally in the [app config](#app-configuration) with `singleton_lock_expiry`. Task option supersedes the app config.
184186

187+
### callenge_lock_on_startup
188+
189+
Some tasks may be stucked after unexpected shuttdown of a worker.
190+
If this situation should be avoided, the lock can be challenged on task startup.
191+
192+
> **Use with caution** : this configuration may lead to unexpected behaviour. Try using classic [deadlocks handling](#markdown-header-handling-deadlocks) if possible.
193+
> this may lead to unexpected behaviour if celery is in a transition state at task startup
194+
> ex: Tasks beeing scheduled while no worker online will all start on worker startup
195+
196+
```python
197+
from celery_singleton import Singleton
198+
from .local_path import app
199+
200+
201+
@app.task(
202+
base=Singleton,
203+
challenge_lock_on_startup=True,
204+
celery_app=app,
205+
)
206+
def do_something(username):
207+
time.sleep(5)
208+
209+
task1 = do_something.delay('bob')
210+
# Worker cold shuttdown causes lock to remain
211+
task2 = do_something.delay('bob') # lock should be removed
212+
```
185213

186214
## App Configuration
187215

celery_singleton/inspect_celery.py

+48
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
from celery import Celery
2+
3+
class CeleryTask:
4+
def __init__(self, id, name, args, kwargs):
5+
self.id = id
6+
self.name = name
7+
self.args = args
8+
self.kwargs = kwargs
9+
10+
def get_active_task_by_id(app, id):
11+
inspector = app.control.inspect()
12+
try:
13+
active_task_found = _get_active_task(id, inspector.active())
14+
if active_task_found:
15+
return active_task_found
16+
scheduled_task_found = _get_scheduled_task(id, inspector.scheduled())
17+
if scheduled_task_found:
18+
return scheduled_task_found
19+
except Exception:
20+
pass
21+
return None
22+
23+
def _get_active_task(id, active_task_dict):
24+
for worker, task_list in active_task_dict.items():
25+
for task in task_list:
26+
if task['id'] == id:
27+
return CeleryTask(task['id'], task["name"], task["args"], task["kwargs"])
28+
return None
29+
30+
def _get_scheduled_task(id, scheduled_task_dict):
31+
for worker, task_scheduled_list in scheduled_task_dict.items():
32+
for scheduled in task_scheduled_list:
33+
task = scheduled['request']
34+
if task['id'] == id:
35+
return CeleryTask(task["id"], task["name"], task["args"], task["kwargs"])
36+
return None
37+
38+
def are_worker_active(app):
39+
worke_pong = app.control.inspect().ping()
40+
if not worke_pong:
41+
return False
42+
try:
43+
for worker, response in worke_pong.items():
44+
if response['ok']:
45+
return True
46+
except Exception:
47+
pass
48+
return False

celery_singleton/singleton.py

+18-1
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from .config import Config
77
from .exceptions import DuplicateTaskError
88
from . import util
9+
from . import inspect_celery
910

1011

1112
def clear_locks(app):
@@ -21,6 +22,8 @@ class Singleton(BaseTask):
2122
unique_on = None
2223
raise_on_duplicate = None
2324
lock_expiry = None
25+
challenge_lock_on_startup = False
26+
celery_app = None
2427

2528
@property
2629
def _raise_on_duplicate(self):
@@ -74,7 +77,7 @@ def generate_lock(self, task_name, task_args=None, task_kwargs=None):
7477
unique_kwargs,
7578
key_prefix=self.singleton_config.key_prefix,
7679
)
77-
80+
7881
def apply_async(
7982
self,
8083
args=None,
@@ -103,6 +106,9 @@ def apply_async(
103106
**options
104107
)
105108

109+
if self.challenge_lock_on_startup:
110+
self.challenge_lock(lock, args, kwargs)
111+
106112
task = self.lock_and_run(**run_args)
107113
if task:
108114
return task
@@ -126,6 +132,17 @@ def lock_and_run(self, lock, *args, task_id=None, **kwargs):
126132
# Clear the lock if apply_async fails
127133
self.unlock(lock)
128134
raise
135+
136+
def challenge_lock(self, lock, args, kwargs):
137+
current_lock_id = self.singleton_backend.get(lock)
138+
if not current_lock_id:
139+
return
140+
if not inspect_celery.are_worker_active(self.celery_app):
141+
return
142+
active_task_found = inspect_celery.get_active_task_by_id(self.celery_app, current_lock_id)
143+
if active_task_found:
144+
return
145+
self.unlock(lock)
129146

130147
def release_lock(self, task_args=None, task_kwargs=None):
131148
lock = self.generate_lock(self.name, task_args, task_kwargs)

tests/test_inspect_celery.py

+145
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
import pytest
2+
import sys
3+
from unittest import mock
4+
from celery_singleton.inspect_celery import get_active_task_by_id, are_worker_active, CeleryTask
5+
6+
7+
class TestInspectCelery:
8+
def test__get_active_task_by_id__should_return_matching_task__when_task_active(self):
9+
# Given
10+
task_id = '262a1cf9-2c4f-4680-8261-7498fb39756c'
11+
active_tasks = {'celery@worker_host': [{'id': task_id, 'name': 'simple_task', 'args': [1, 2, 3], 'kwargs': {}, 'type': 'simple_task', 'hostname': 'celery@worker_host', 'time_start': 1588508284.207397, 'acknowledged': True, 'delivery_info': {'exchange': '', 'routing_key': 'celery', 'priority': 0, 'redelivered': None}, 'worker_pid': 45895}]}
12+
inspect_mock = mock.MagicMock()
13+
inspect_mock.active.return_value = active_tasks
14+
control_mock = mock.MagicMock()
15+
control_mock.inspect.return_value = inspect_mock
16+
mock_app = mock.MagicMock()
17+
mock_app.control = control_mock
18+
19+
# When
20+
active_task = get_active_task_by_id(mock_app, task_id)
21+
22+
# Then
23+
assert active_task is not None
24+
assert active_task.id == task_id
25+
assert active_task.name == 'simple_task'
26+
assert active_task.args == [1, 2, 3]
27+
assert active_task.kwargs == {}
28+
29+
def test__get_active_task_by_id__should_return_matching_task__when_task_scheduled(self):
30+
# Given
31+
task_id = '262a1cf9-2c4f-4680-8261-7498fb39756c'
32+
active_tasks = {'celery@worker_host': []}
33+
scheduled_tasks = {'celery@worker_host': [{'eta': '2020-05-12T17:31:32.886704+00:00', 'priority': 6, 'request': {'id': task_id, 'name': 'simple_task', 'args': [1, 2, 3], 'kwargs': {}, 'type': 'simple_task', 'hostname': 'celery@worker_host', 'time_start': None, 'acknowledged': False, 'delivery_info': {'exchange': '', 'routing_key': 'celery', 'priority': 0, 'redelivered': None}, 'worker_pid': None}}]}
34+
inspect_mock = mock.MagicMock()
35+
inspect_mock.active.return_value = active_tasks
36+
inspect_mock.scheduled.return_value = scheduled_tasks
37+
control_mock = mock.MagicMock()
38+
control_mock.inspect.return_value = inspect_mock
39+
mock_app = mock.MagicMock()
40+
mock_app.control = control_mock
41+
42+
# When
43+
active_task = get_active_task_by_id(mock_app, task_id)
44+
45+
# Then
46+
assert active_task is not None
47+
assert active_task.id == task_id
48+
assert active_task.name == 'simple_task'
49+
assert active_task.args == [1, 2, 3]
50+
assert active_task.kwargs == {}
51+
52+
def test__get_active_task_by_id__should_return_none_when_no_matching_task_id(self):
53+
# Given
54+
active_tasks = {'celery@worker_host': [{'id': '262a1cf9-2c4f-4680-8261-7498fb39756c', 'name': 'simple_task', 'args': [1, 2, 3], 'kwargs': {}, 'type': 'simple_task', 'hostname': 'celery@worker_host', 'time_start': 1588508284.207397, 'acknowledged': True, 'delivery_info': {'exchange': '', 'routing_key': 'celery', 'priority': 0, 'redelivered': None}, 'worker_pid': 45895}]}
55+
inspect_mock = mock.MagicMock()
56+
inspect_mock.active.return_value = active_tasks
57+
control_mock = mock.MagicMock()
58+
control_mock.inspect.return_value = inspect_mock
59+
mock_app = mock.MagicMock()
60+
mock_app.control = control_mock
61+
62+
# When
63+
active_task = get_active_task_by_id(mock_app, "bad_task_name")
64+
65+
# Then
66+
assert active_task is None
67+
68+
def test__get_active_task_by_id__should_return_none_when_no_active_tasks(self):
69+
# Given
70+
inspect_mock = mock.MagicMock()
71+
inspect_mock.active.return_value = None
72+
control_mock = mock.MagicMock()
73+
control_mock.inspect.return_value = inspect_mock
74+
mock_app = mock.MagicMock()
75+
mock_app.control = control_mock
76+
77+
# When
78+
active_task = get_active_task_by_id(mock_app, "any_name")
79+
80+
# Then
81+
assert active_task is None
82+
83+
def test__get_active_task_by_id__should_return_none_when_worker_answer_cannot_be_parsed(self):
84+
# Given
85+
inspect_mock = mock.MagicMock()
86+
inspect_mock.active.return_value = {'worker': ['bad_task_definition']}
87+
control_mock = mock.MagicMock()
88+
control_mock.inspect.return_value = inspect_mock
89+
mock_app = mock.MagicMock()
90+
mock_app.control = control_mock
91+
92+
# When
93+
active_task = get_active_task_by_id(mock_app, "bad_task_name")
94+
95+
# Then
96+
assert active_task is None
97+
98+
def test__are_worker_active__should_return_true_if_worker_responds_to_ping(self):
99+
# Given
100+
active_workers = {u'celery@host': {u'ok': u'pong'}}
101+
inspect_mock = mock.MagicMock()
102+
inspect_mock.ping.return_value = active_workers
103+
control_mock = mock.MagicMock()
104+
control_mock.inspect.return_value = inspect_mock
105+
mock_app = mock.MagicMock()
106+
mock_app.control = control_mock
107+
108+
# When
109+
active_workers_found = are_worker_active(mock_app)
110+
111+
# Then
112+
assert active_workers_found
113+
114+
def test__are_worker_active__should_return_false_if_worker_does_not_respond_to_ping(self):
115+
# Given
116+
active_workers = None
117+
inspect_mock = mock.MagicMock()
118+
inspect_mock.ping.return_value = active_workers
119+
control_mock = mock.MagicMock()
120+
control_mock.inspect.return_value = inspect_mock
121+
mock_app = mock.MagicMock()
122+
mock_app.control = control_mock
123+
124+
# When
125+
active_workers_found = are_worker_active(mock_app)
126+
127+
# Then
128+
assert not active_workers_found
129+
130+
def test__are_worker_active__should_return_false_if_worker_respond_ko_to_ping(self):
131+
# Given
132+
active_workers = {u'celery@host': {u'not_ok': u'pong'}}
133+
inspect_mock = mock.MagicMock()
134+
inspect_mock.ping.return_value = active_workers
135+
control_mock = mock.MagicMock()
136+
control_mock.inspect.return_value = inspect_mock
137+
mock_app = mock.MagicMock()
138+
mock_app.control = control_mock
139+
140+
# When
141+
active_workers_found = are_worker_active(mock_app)
142+
143+
# Then
144+
assert not active_workers_found
145+

0 commit comments

Comments
 (0)