Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automatic lock release - fixes #21 #22

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 49 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,19 @@ This is a baseclass for celery tasks that ensures only one instance of the task
**Table of Contents**

- [Celery-Singleton](#celery-singleton)
- [Prerequisites](#prerequisites)
- [Quick start](#quick-start)
- [How does it work?](#how-does-it-work)
- [Handling deadlocks](#handling-deadlocks)
- [Backends](#backends)
- [Task configuration](#task-configuration)
- [unique\_on](#uniqueon)
- [raise\_on\_duplicate](#raiseonduplicate)
- [App Configuration](#app-configuration)
- [Testing](#testing)
- [Contribute](#contribute)
- [Prerequisites](#prerequisites)
- [Quick start](#quick-start)
- [How does it work?](#how-does-it-work)
- [Handling deadlocks](#handling-deadlocks)
- [Backends](#backends)
- [Task configuration](#task-configuration)
- [unique\_on](#unique_on)
- [raise\_on\_duplicate](#raise_on_duplicate)
- [lock\_expiry](#lock_expiry)
- [callenge_lock_on_startup](#callenge_lock_on_startup)
- [App Configuration](#app-configuration)
- [Testing](#testing)
- [Contribute](#contribute)

<!-- markdown-toc end -->

Expand Down Expand Up @@ -182,6 +184,42 @@ assert task1 != task2 # These are two separate task instances

This option can be applied globally in the [app config](#app-configuration) with `singleton_lock_expiry`. Task option supersedes the app config.

### callenge_lock_on_startup

Some tasks may be stucked after unexpected shuttdown of a worker.
If this situation should be avoided, the lock can be challenged on task startup.
Setting `challenge_lock_on_startup` to true will cause every submission of this task to check, before trying to acquire a lock :

1. check if a lock matching this task is already there
2. check if at least one worker responds to ping
3. inspect workers for finding owner of the lock (existing task with id matching the one from le lock)
1. inspect active tasks (tasks currently being processed by a worker) : check that task id is not present
2. inspect reserved tasks (tasks claimed by a worker, but not yet running) : check that task id is not present
3. inspect scheduled tasks (tasks received by , but scheduled at some point in the future) : check that task id is not present

If any of previous check fails, the lock remains.

> **Use with caution** : this configuration may lead to unexpected behaviour. Try using classic [deadlocks handling](#handling-deadlocks) if possible.
> Intermittent failure to communicate with workers or submission of a task before any worker receives the task owning the lock may lead to having multiple instances of the task running simultaneously.
> The extensive use of the [`celery worker inspection`](https://docs.celeryproject.org/en/stable/userguide/workers.html#inspecting-workers) API may also lead to performances issues when working with large number of workers and/or large number of tasks.

```python
from celery_singleton import Singleton
from .local_path import app


@app.task(
base=Singleton,
challenge_lock_on_startup=True,
celery_app=app,
)
def do_something(username):
time.sleep(5)

task1 = do_something.delay('bob')
# Worker cold shuttdown causes lock to remain
task2 = do_something.delay('bob') # lock should be removed
```

## App Configuration

Expand Down
51 changes: 51 additions & 0 deletions celery_singleton/inspect_celery.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
from celery import Celery

class CeleryTask:
def __init__(self, id, name, args, kwargs):
self.id = id
self.name = name
self.args = args
self.kwargs = kwargs

def get_task_by_id(app, id):
inspector = app.control.inspect()
try:
active_task_found = _get_task(id, inspector.active())
if active_task_found:
return active_task_found
reserved_task_found = _get_task(id, inspector.reserved())
if reserved_task_found:
return reserved_task_found
scheduled_task_found = _get_scheduled_task(id, inspector.scheduled())
if scheduled_task_found:
return scheduled_task_found
except Exception:
pass
return None

def _get_task(id, active_task_dict):
for worker, task_list in active_task_dict.items():
for task in task_list:
if task['id'] == id:
return CeleryTask(task['id'], task["name"], task["args"], task["kwargs"])
return None

def _get_scheduled_task(id, scheduled_task_dict):
for worker, task_scheduled_list in scheduled_task_dict.items():
for scheduled in task_scheduled_list:
task = scheduled['request']
if task['id'] == id:
return CeleryTask(task["id"], task["name"], task["args"], task["kwargs"])
return None

def are_worker_active(app):
worke_pong = app.control.inspect().ping()
if not worke_pong:
return False
try:
for worker, response in worke_pong.items():
if response['ok']:
return True
except Exception:
pass
return False
17 changes: 17 additions & 0 deletions celery_singleton/singleton.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from .config import Config
from .exceptions import DuplicateTaskError
from . import util
from . import inspect_celery


def clear_locks(app):
Expand All @@ -21,6 +22,8 @@ class Singleton(BaseTask):
unique_on = None
raise_on_duplicate = None
lock_expiry = None
challenge_lock_on_startup = False
celery_app = None

@property
def _raise_on_duplicate(self):
Expand Down Expand Up @@ -103,6 +106,9 @@ def apply_async(
**options
)

if self.challenge_lock_on_startup:
self.challenge_lock(lock, args, kwargs)

task = self.lock_and_run(**run_args)
if task:
return task
Expand All @@ -126,6 +132,17 @@ def lock_and_run(self, lock, *args, task_id=None, **kwargs):
# Clear the lock if apply_async fails
self.unlock(lock)
raise

def challenge_lock(self, lock, args, kwargs):
current_lock_id = self.singleton_backend.get(lock)
if not current_lock_id:
return
if not inspect_celery.are_worker_active(self.celery_app):
return
task_found = inspect_celery.get_task_by_id(self.celery_app, current_lock_id)
if task_found:
return
self.unlock(lock)

def release_lock(self, task_args=None, task_kwargs=None):
lock = self.generate_lock(self.name, task_args, task_kwargs)
Expand Down
170 changes: 170 additions & 0 deletions tests/test_inspect_celery.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
import pytest
import sys
from unittest import mock
from celery_singleton.inspect_celery import get_task_by_id, are_worker_active, CeleryTask


class TestInspectCelery:
def test__get_task_by_id__should_return_matching_task__when_task_active(self):
# Given
task_id = '262a1cf9-2c4f-4680-8261-7498fb39756c'
active_tasks = {'celery@worker_host': [{'id': task_id, 'name': 'simple_task', 'args': [1, 2, 3], 'kwargs': {}, 'type': 'simple_task', 'hostname': 'celery@worker_host', 'time_start': 1588508284.207397, 'acknowledged': True, 'delivery_info': {'exchange': '', 'routing_key': 'celery', 'priority': 0, 'redelivered': None}, 'worker_pid': 45895}]}
inspect_mock = mock.MagicMock()
inspect_mock.active.return_value = active_tasks
control_mock = mock.MagicMock()
control_mock.inspect.return_value = inspect_mock
mock_app = mock.MagicMock()
mock_app.control = control_mock

# When
active_task = get_task_by_id(mock_app, task_id)

# Then
assert active_task is not None
assert active_task.id == task_id
assert active_task.name == 'simple_task'
assert active_task.args == [1, 2, 3]
assert active_task.kwargs == {}

def test__get_task_by_id__should_return_matching_task__when_task_scheduled(self):
# Given
task_id = '262a1cf9-2c4f-4680-8261-7498fb39756c'
active_tasks = {'celery@worker_host': []}
scheduled_tasks = {'celery@worker_host': [{'eta': '2020-05-12T17:31:32.886704+00:00', 'priority': 6, 'request': {'id': task_id, 'name': 'simple_task', 'args': [1, 2, 3], 'kwargs': {}, 'type': 'simple_task', 'hostname': 'celery@worker_host', 'time_start': None, 'acknowledged': False, 'delivery_info': {'exchange': '', 'routing_key': 'celery', 'priority': 0, 'redelivered': None}, 'worker_pid': None}}]}
inspect_mock = mock.MagicMock()
inspect_mock.active.return_value = active_tasks
inspect_mock.scheduled.return_value = scheduled_tasks
control_mock = mock.MagicMock()
control_mock.inspect.return_value = inspect_mock
mock_app = mock.MagicMock()
mock_app.control = control_mock

# When
active_task = get_task_by_id(mock_app, task_id)

# Then
assert active_task is not None
assert active_task.id == task_id
assert active_task.name == 'simple_task'
assert active_task.args == [1, 2, 3]
assert active_task.kwargs == {}

def test__get_task_by_id__should_return_matching_task__when_task_reserved(self):
# Given
task_id = '262a1cf9-2c4f-4680-8261-7498fb39756c'
active_tasks = {'celery@worker_host': []}
scheduled_tasks = {'celery@worker_host': []}
reserved_tasks = {'celery@worker_host': [{'id': task_id, 'name': 'simple_task', 'args': [1, 2, 3], 'kwargs': {}, 'type': 'simple_task', 'hostname': 'celery@worker_host', 'time_start': 1588508284.207397, 'acknowledged': True, 'delivery_info': {'exchange': '', 'routing_key': 'celery', 'priority': 0, 'redelivered': None}, 'worker_pid': 45895}]}
inspect_mock = mock.MagicMock()
inspect_mock.active.return_value = active_tasks
inspect_mock.scheduled.return_value = scheduled_tasks
inspect_mock.reserved.return_value = reserved_tasks
control_mock = mock.MagicMock()
control_mock.inspect.return_value = inspect_mock
mock_app = mock.MagicMock()
mock_app.control = control_mock

# When
active_task = get_task_by_id(mock_app, task_id)

# Then
assert active_task is not None
assert active_task.id == task_id
assert active_task.name == 'simple_task'
assert active_task.args == [1, 2, 3]
assert active_task.kwargs == {}

def test__get_task_by_id__should_return_none_when_no_matching_task_id(self):
# Given
active_tasks = {'celery@worker_host': [{'id': '262a1cf9-2c4f-4680-8261-7498fb39756c', 'name': 'simple_task', 'args': [1, 2, 3], 'kwargs': {}, 'type': 'simple_task', 'hostname': 'celery@worker_host', 'time_start': 1588508284.207397, 'acknowledged': True, 'delivery_info': {'exchange': '', 'routing_key': 'celery', 'priority': 0, 'redelivered': None}, 'worker_pid': 45895}]}
inspect_mock = mock.MagicMock()
inspect_mock.active.return_value = active_tasks
control_mock = mock.MagicMock()
control_mock.inspect.return_value = inspect_mock
mock_app = mock.MagicMock()
mock_app.control = control_mock

# When
active_task = get_task_by_id(mock_app, "bad_task_name")

# Then
assert active_task is None

def test__get_task_by_id__should_return_none_when_no_active_tasks(self):
# Given
inspect_mock = mock.MagicMock()
inspect_mock.active.return_value = None
control_mock = mock.MagicMock()
control_mock.inspect.return_value = inspect_mock
mock_app = mock.MagicMock()
mock_app.control = control_mock

# When
active_task = get_task_by_id(mock_app, "any_name")

# Then
assert active_task is None

def test__get_task_by_id__should_return_none_when_worker_answer_cannot_be_parsed(self):
# Given
inspect_mock = mock.MagicMock()
inspect_mock.active.return_value = {'worker': ['bad_task_definition']}
control_mock = mock.MagicMock()
control_mock.inspect.return_value = inspect_mock
mock_app = mock.MagicMock()
mock_app.control = control_mock

# When
active_task = get_task_by_id(mock_app, "bad_task_name")

# Then
assert active_task is None

def test__are_worker_active__should_return_true_if_worker_responds_to_ping(self):
# Given
active_workers = {u'celery@host': {u'ok': u'pong'}}
inspect_mock = mock.MagicMock()
inspect_mock.ping.return_value = active_workers
control_mock = mock.MagicMock()
control_mock.inspect.return_value = inspect_mock
mock_app = mock.MagicMock()
mock_app.control = control_mock

# When
active_workers_found = are_worker_active(mock_app)

# Then
assert active_workers_found

def test__are_worker_active__should_return_false_if_worker_does_not_respond_to_ping(self):
# Given
active_workers = None
inspect_mock = mock.MagicMock()
inspect_mock.ping.return_value = active_workers
control_mock = mock.MagicMock()
control_mock.inspect.return_value = inspect_mock
mock_app = mock.MagicMock()
mock_app.control = control_mock

# When
active_workers_found = are_worker_active(mock_app)

# Then
assert not active_workers_found

def test__are_worker_active__should_return_false_if_worker_respond_ko_to_ping(self):
# Given
active_workers = {u'celery@host': {u'not_ok': u'pong'}}
inspect_mock = mock.MagicMock()
inspect_mock.ping.return_value = active_workers
control_mock = mock.MagicMock()
control_mock.inspect.return_value = inspect_mock
mock_app = mock.MagicMock()
mock_app.control = control_mock

# When
active_workers_found = are_worker_active(mock_app)

# Then
assert not active_workers_found

Loading