diff --git a/README.md b/README.md index 9312e77..cd3c60a 100644 --- a/README.md +++ b/README.md @@ -7,17 +7,19 @@ This is a baseclass for celery tasks that ensures only one instance of the task **Table of Contents** - [Celery-Singleton](#celery-singleton) - - [Prerequisites](#prerequisites) - - [Quick start](#quick-start) - - [How does it work?](#how-does-it-work) - - [Handling deadlocks](#handling-deadlocks) - - [Backends](#backends) - - [Task configuration](#task-configuration) - - [unique\_on](#uniqueon) - - [raise\_on\_duplicate](#raiseonduplicate) - - [App Configuration](#app-configuration) - - [Testing](#testing) - - [Contribute](#contribute) + - [Prerequisites](#prerequisites) + - [Quick start](#quick-start) + - [How does it work?](#how-does-it-work) + - [Handling deadlocks](#handling-deadlocks) + - [Backends](#backends) + - [Task configuration](#task-configuration) + - [unique\_on](#unique_on) + - [raise\_on\_duplicate](#raise_on_duplicate) + - [lock\_expiry](#lock_expiry) + - [callenge_lock_on_startup](#callenge_lock_on_startup) + - [App Configuration](#app-configuration) + - [Testing](#testing) + - [Contribute](#contribute) @@ -182,6 +184,42 @@ assert task1 != task2 # These are two separate task instances This option can be applied globally in the [app config](#app-configuration) with `singleton_lock_expiry`. Task option supersedes the app config. +### callenge_lock_on_startup + +Some tasks may be stucked after unexpected shuttdown of a worker. +If this situation should be avoided, the lock can be challenged on task startup. +Setting `challenge_lock_on_startup` to true will cause every submission of this task to check, before trying to acquire a lock : + +1. check if a lock matching this task is already there +2. check if at least one worker responds to ping +3. inspect workers for finding owner of the lock (existing task with id matching the one from le lock) + 1. inspect active tasks (tasks currently being processed by a worker) : check that task id is not present + 2. inspect reserved tasks (tasks claimed by a worker, but not yet running) : check that task id is not present + 3. inspect scheduled tasks (tasks received by , but scheduled at some point in the future) : check that task id is not present + +If any of previous check fails, the lock remains. + +> **Use with caution** : this configuration may lead to unexpected behaviour. Try using classic [deadlocks handling](#handling-deadlocks) if possible. +> Intermittent failure to communicate with workers or submission of a task before any worker receives the task owning the lock may lead to having multiple instances of the task running simultaneously. +> The extensive use of the [`celery worker inspection`](https://docs.celeryproject.org/en/stable/userguide/workers.html#inspecting-workers) API may also lead to performances issues when working with large number of workers and/or large number of tasks. + +```python +from celery_singleton import Singleton +from .local_path import app + + +@app.task( + base=Singleton, + challenge_lock_on_startup=True, + celery_app=app, +) +def do_something(username): + time.sleep(5) + +task1 = do_something.delay('bob') +# Worker cold shuttdown causes lock to remain +task2 = do_something.delay('bob') # lock should be removed +``` ## App Configuration diff --git a/celery_singleton/inspect_celery.py b/celery_singleton/inspect_celery.py new file mode 100644 index 0000000..7e99ac8 --- /dev/null +++ b/celery_singleton/inspect_celery.py @@ -0,0 +1,51 @@ +from celery import Celery + +class CeleryTask: + def __init__(self, id, name, args, kwargs): + self.id = id + self.name = name + self.args = args + self.kwargs = kwargs + +def get_task_by_id(app, id): + inspector = app.control.inspect() + try: + active_task_found = _get_task(id, inspector.active()) + if active_task_found: + return active_task_found + reserved_task_found = _get_task(id, inspector.reserved()) + if reserved_task_found: + return reserved_task_found + scheduled_task_found = _get_scheduled_task(id, inspector.scheduled()) + if scheduled_task_found: + return scheduled_task_found + except Exception: + pass + return None + +def _get_task(id, active_task_dict): + for worker, task_list in active_task_dict.items(): + for task in task_list: + if task['id'] == id: + return CeleryTask(task['id'], task["name"], task["args"], task["kwargs"]) + return None + +def _get_scheduled_task(id, scheduled_task_dict): + for worker, task_scheduled_list in scheduled_task_dict.items(): + for scheduled in task_scheduled_list: + task = scheduled['request'] + if task['id'] == id: + return CeleryTask(task["id"], task["name"], task["args"], task["kwargs"]) + return None + +def are_worker_active(app): + worke_pong = app.control.inspect().ping() + if not worke_pong: + return False + try: + for worker, response in worke_pong.items(): + if response['ok']: + return True + except Exception: + pass + return False \ No newline at end of file diff --git a/celery_singleton/singleton.py b/celery_singleton/singleton.py index 49ee0a6..6e94c96 100644 --- a/celery_singleton/singleton.py +++ b/celery_singleton/singleton.py @@ -6,6 +6,7 @@ from .config import Config from .exceptions import DuplicateTaskError from . import util +from . import inspect_celery def clear_locks(app): @@ -21,6 +22,8 @@ class Singleton(BaseTask): unique_on = None raise_on_duplicate = None lock_expiry = None + challenge_lock_on_startup = False + celery_app = None @property def _raise_on_duplicate(self): @@ -103,6 +106,9 @@ def apply_async( **options ) + if self.challenge_lock_on_startup: + self.challenge_lock(lock, args, kwargs) + task = self.lock_and_run(**run_args) if task: return task @@ -126,6 +132,17 @@ def lock_and_run(self, lock, *args, task_id=None, **kwargs): # Clear the lock if apply_async fails self.unlock(lock) raise + + def challenge_lock(self, lock, args, kwargs): + current_lock_id = self.singleton_backend.get(lock) + if not current_lock_id: + return + if not inspect_celery.are_worker_active(self.celery_app): + return + task_found = inspect_celery.get_task_by_id(self.celery_app, current_lock_id) + if task_found: + return + self.unlock(lock) def release_lock(self, task_args=None, task_kwargs=None): lock = self.generate_lock(self.name, task_args, task_kwargs) diff --git a/tests/test_inspect_celery.py b/tests/test_inspect_celery.py new file mode 100644 index 0000000..958fa5e --- /dev/null +++ b/tests/test_inspect_celery.py @@ -0,0 +1,170 @@ +import pytest +import sys +from unittest import mock +from celery_singleton.inspect_celery import get_task_by_id, are_worker_active, CeleryTask + + +class TestInspectCelery: + def test__get_task_by_id__should_return_matching_task__when_task_active(self): + # Given + task_id = '262a1cf9-2c4f-4680-8261-7498fb39756c' + active_tasks = {'celery@worker_host': [{'id': task_id, 'name': 'simple_task', 'args': [1, 2, 3], 'kwargs': {}, 'type': 'simple_task', 'hostname': 'celery@worker_host', 'time_start': 1588508284.207397, 'acknowledged': True, 'delivery_info': {'exchange': '', 'routing_key': 'celery', 'priority': 0, 'redelivered': None}, 'worker_pid': 45895}]} + inspect_mock = mock.MagicMock() + inspect_mock.active.return_value = active_tasks + control_mock = mock.MagicMock() + control_mock.inspect.return_value = inspect_mock + mock_app = mock.MagicMock() + mock_app.control = control_mock + + # When + active_task = get_task_by_id(mock_app, task_id) + + # Then + assert active_task is not None + assert active_task.id == task_id + assert active_task.name == 'simple_task' + assert active_task.args == [1, 2, 3] + assert active_task.kwargs == {} + + def test__get_task_by_id__should_return_matching_task__when_task_scheduled(self): + # Given + task_id = '262a1cf9-2c4f-4680-8261-7498fb39756c' + active_tasks = {'celery@worker_host': []} + scheduled_tasks = {'celery@worker_host': [{'eta': '2020-05-12T17:31:32.886704+00:00', 'priority': 6, 'request': {'id': task_id, 'name': 'simple_task', 'args': [1, 2, 3], 'kwargs': {}, 'type': 'simple_task', 'hostname': 'celery@worker_host', 'time_start': None, 'acknowledged': False, 'delivery_info': {'exchange': '', 'routing_key': 'celery', 'priority': 0, 'redelivered': None}, 'worker_pid': None}}]} + inspect_mock = mock.MagicMock() + inspect_mock.active.return_value = active_tasks + inspect_mock.scheduled.return_value = scheduled_tasks + control_mock = mock.MagicMock() + control_mock.inspect.return_value = inspect_mock + mock_app = mock.MagicMock() + mock_app.control = control_mock + + # When + active_task = get_task_by_id(mock_app, task_id) + + # Then + assert active_task is not None + assert active_task.id == task_id + assert active_task.name == 'simple_task' + assert active_task.args == [1, 2, 3] + assert active_task.kwargs == {} + + def test__get_task_by_id__should_return_matching_task__when_task_reserved(self): + # Given + task_id = '262a1cf9-2c4f-4680-8261-7498fb39756c' + active_tasks = {'celery@worker_host': []} + scheduled_tasks = {'celery@worker_host': []} + reserved_tasks = {'celery@worker_host': [{'id': task_id, 'name': 'simple_task', 'args': [1, 2, 3], 'kwargs': {}, 'type': 'simple_task', 'hostname': 'celery@worker_host', 'time_start': 1588508284.207397, 'acknowledged': True, 'delivery_info': {'exchange': '', 'routing_key': 'celery', 'priority': 0, 'redelivered': None}, 'worker_pid': 45895}]} + inspect_mock = mock.MagicMock() + inspect_mock.active.return_value = active_tasks + inspect_mock.scheduled.return_value = scheduled_tasks + inspect_mock.reserved.return_value = reserved_tasks + control_mock = mock.MagicMock() + control_mock.inspect.return_value = inspect_mock + mock_app = mock.MagicMock() + mock_app.control = control_mock + + # When + active_task = get_task_by_id(mock_app, task_id) + + # Then + assert active_task is not None + assert active_task.id == task_id + assert active_task.name == 'simple_task' + assert active_task.args == [1, 2, 3] + assert active_task.kwargs == {} + + def test__get_task_by_id__should_return_none_when_no_matching_task_id(self): + # Given + active_tasks = {'celery@worker_host': [{'id': '262a1cf9-2c4f-4680-8261-7498fb39756c', 'name': 'simple_task', 'args': [1, 2, 3], 'kwargs': {}, 'type': 'simple_task', 'hostname': 'celery@worker_host', 'time_start': 1588508284.207397, 'acknowledged': True, 'delivery_info': {'exchange': '', 'routing_key': 'celery', 'priority': 0, 'redelivered': None}, 'worker_pid': 45895}]} + inspect_mock = mock.MagicMock() + inspect_mock.active.return_value = active_tasks + control_mock = mock.MagicMock() + control_mock.inspect.return_value = inspect_mock + mock_app = mock.MagicMock() + mock_app.control = control_mock + + # When + active_task = get_task_by_id(mock_app, "bad_task_name") + + # Then + assert active_task is None + + def test__get_task_by_id__should_return_none_when_no_active_tasks(self): + # Given + inspect_mock = mock.MagicMock() + inspect_mock.active.return_value = None + control_mock = mock.MagicMock() + control_mock.inspect.return_value = inspect_mock + mock_app = mock.MagicMock() + mock_app.control = control_mock + + # When + active_task = get_task_by_id(mock_app, "any_name") + + # Then + assert active_task is None + + def test__get_task_by_id__should_return_none_when_worker_answer_cannot_be_parsed(self): + # Given + inspect_mock = mock.MagicMock() + inspect_mock.active.return_value = {'worker': ['bad_task_definition']} + control_mock = mock.MagicMock() + control_mock.inspect.return_value = inspect_mock + mock_app = mock.MagicMock() + mock_app.control = control_mock + + # When + active_task = get_task_by_id(mock_app, "bad_task_name") + + # Then + assert active_task is None + + def test__are_worker_active__should_return_true_if_worker_responds_to_ping(self): + # Given + active_workers = {u'celery@host': {u'ok': u'pong'}} + inspect_mock = mock.MagicMock() + inspect_mock.ping.return_value = active_workers + control_mock = mock.MagicMock() + control_mock.inspect.return_value = inspect_mock + mock_app = mock.MagicMock() + mock_app.control = control_mock + + # When + active_workers_found = are_worker_active(mock_app) + + # Then + assert active_workers_found + + def test__are_worker_active__should_return_false_if_worker_does_not_respond_to_ping(self): + # Given + active_workers = None + inspect_mock = mock.MagicMock() + inspect_mock.ping.return_value = active_workers + control_mock = mock.MagicMock() + control_mock.inspect.return_value = inspect_mock + mock_app = mock.MagicMock() + mock_app.control = control_mock + + # When + active_workers_found = are_worker_active(mock_app) + + # Then + assert not active_workers_found + + def test__are_worker_active__should_return_false_if_worker_respond_ko_to_ping(self): + # Given + active_workers = {u'celery@host': {u'not_ok': u'pong'}} + inspect_mock = mock.MagicMock() + inspect_mock.ping.return_value = active_workers + control_mock = mock.MagicMock() + control_mock.inspect.return_value = inspect_mock + mock_app = mock.MagicMock() + mock_app.control = control_mock + + # When + active_workers_found = are_worker_active(mock_app) + + # Then + assert not active_workers_found + diff --git a/tests/test_singleton.py b/tests/test_singleton.py index fddd9a4..72c4cbb 100644 --- a/tests/test_singleton.py +++ b/tests/test_singleton.py @@ -10,6 +10,7 @@ from celery_singleton.backends.redis import RedisBackend from celery_singleton.backends import get_backend from celery_singleton.config import Config +from celery_singleton import inspect_celery @pytest.fixture(scope="session") @@ -313,3 +314,86 @@ def simple_task(*args): mock_lock.assert_called_once_with( simple_task.singleton_backend, lock, result.task_id, expiry=60 ) + +class TestChallengeLockOnStartup: + @mock.patch.object(inspect_celery, "get_task_by_id", return_value=None) + @mock.patch.object(inspect_celery, "are_worker_active", return_value=True) + def test__lock_invalidation__remove_lock_if_no_task_running(self, mock_celery_inspect, mock_celery_ping, scoped_app): + with scoped_app as app: + # Given + task_args=[1, 2, 3] + @app.task(name="simple_task", base=Singleton, challenge_lock_on_startup=True, celery_app=app) + def simple_task(*args): + return args + lock = simple_task.generate_lock(simple_task.name, task_args=task_args) + simple_task.unlock = mock.MagicMock() + simple_task.release_lock = mock.MagicMock() # avoid call to unlock on task completion + simple_task.singleton_backend.get = mock.MagicMock() + simple_task.singleton_backend.get.return_value = '243254' + + # When + simple_task.delay(1, 2, 3) + + # Then + simple_task.unlock.assert_called_once_with(lock) + + @mock.patch.object(inspect_celery, "get_task_by_id", return_value=inspect_celery.CeleryTask('1234', 'simple_task', args=[1, 2, 3], kwargs=None)) + @mock.patch.object(inspect_celery, "are_worker_active", return_value=True) + def test__lock_invalidation__do_not_remove_lock_if_task_running(self, mock_celery_inspect, mock_celery_ping, scoped_app): + with scoped_app as app: + # Given + task_name = "simple_task" + task_args=[1, 2, 3] + @app.task(name="simple_task", base=Singleton, challenge_lock_on_startup=True, celery_app=app) + def simple_task(*args): + return args + lock = simple_task.generate_lock(simple_task.name, task_args, None) + simple_task.unlock = mock.MagicMock() + simple_task.release_lock = mock.MagicMock() # avoid call to unlock on task completion + simple_task.singleton_backend.get = mock.MagicMock() + simple_task.singleton_backend.get.return_value = '1234' # Lock exists matching task ID + + # When + simple_task.delay(1, 2, 3) + + # Then + simple_task.unlock.assert_not_called() + + @mock.patch.object(inspect_celery, "get_task_by_id", return_value=None) + @mock.patch.object(inspect_celery, "are_worker_active", return_value=False) + def test__lock_invalidation__do_not_remove_lock_if_no_worker_running(self, mock_celery_inspect, mock_celery_ping, scoped_app): + with scoped_app as app: + # Given + @app.task(name="simple_task", base=Singleton, challenge_lock_on_startup=True, celery_app=app) + def simple_task(*args): + return args + simple_task.unlock = mock.MagicMock() + simple_task.release_lock = mock.MagicMock() # avoid call to unlock on task completion + simple_task.singleton_backend.get = mock.MagicMock() + simple_task.singleton_backend.get.return_value = '1234' + + # When + simple_task.delay(1, 2, 3) + + # Then + simple_task.unlock.assert_not_called() + + @mock.patch.object(inspect_celery, "get_task_by_id", return_value=None) + @mock.patch.object(inspect_celery, "are_worker_active", return_value=True) + def test__lock_invalidation__do_not_list_active_tasks_if_no_lock_found(self, mock_celery_inspect, mock_celery_ping, scoped_app): + with scoped_app as app: + # Given + @app.task(name="simple_task", base=Singleton, challenge_lock_on_startup=True, celery_app=app) + def simple_task(*args): + return args + simple_task.unlock = mock.MagicMock() + simple_task.release_lock = mock.MagicMock() # avoid call to unlock on task completion + simple_task.singleton_backend.get = mock.MagicMock() + simple_task.singleton_backend.get.return_value = None + + # When + simple_task.delay(1, 2, 3) + + # Then + inspect_celery.get_task_by_id.assert_not_called() + simple_task.unlock.assert_not_called()