From 32913de27da84ac5f43f481047e4fc88ff2ad09b Mon Sep 17 00:00:00 2001 From: Radoslav Georgiev Date: Wed, 26 Feb 2025 11:42:18 +0200 Subject: [PATCH 1/2] Add blog examples --- .../celery_non_concurrent/tasks.py | 48 +++++++++++++++++++ styleguide_example/blog_examples/tasks.py | 2 + 2 files changed, 50 insertions(+) create mode 100644 styleguide_example/blog_examples/celery_non_concurrent/tasks.py create mode 100644 styleguide_example/blog_examples/tasks.py diff --git a/styleguide_example/blog_examples/celery_non_concurrent/tasks.py b/styleguide_example/blog_examples/celery_non_concurrent/tasks.py new file mode 100644 index 00000000..d69b6b5d --- /dev/null +++ b/styleguide_example/blog_examples/celery_non_concurrent/tasks.py @@ -0,0 +1,48 @@ +import logging +from functools import wraps + +from celery import shared_task + +from styleguide_example.tasks import celery_app + +inspect = celery_app.control.inspect + +logger = logging.getLogger(__name__) + + +def non_concurrent_task(_func=None, *args, **kwargs): + def wrapper(func): + @wraps(func) + def inner(_bound_self, *_func_args, **_func_kwargs): + running_task_count = 0 + + queues = inspect().active() + + if queues is None: + queues = {} + + for running_tasks in queues.values(): + for task in running_tasks: + if task["name"] == _bound_self.name: + running_task_count += 1 + + if running_task_count > 1: + logger.warning(f"[non_concurrent_task] Task {_bound_self.name} is already running") + return + + return func(*_func_args, **_func_kwargs) + + return shared_task(bind=True, *args, **kwargs)(inner) + + if _func is None: + return wrapper + + return wrapper(_func) + + +@non_concurrent_task +def test_non_concurrent_task(): + logger.info("A non-concurrent task is running") + import time + time.sleep(10) + logger.info("A non-concurrent task finished") diff --git a/styleguide_example/blog_examples/tasks.py b/styleguide_example/blog_examples/tasks.py new file mode 100644 index 00000000..1611e6fb --- /dev/null +++ b/styleguide_example/blog_examples/tasks.py @@ -0,0 +1,2 @@ +# noqa +from styleguide_example.blog_examples.celery_non_concurrent.tasks import test_non_concurrent_task From 371f2122cd4fb173563d8110b6fefa8368d03e22 Mon Sep 17 00:00:00 2001 From: Radoslav Georgiev Date: Tue, 18 Mar 2025 13:28:06 +0200 Subject: [PATCH 2/2] Add ruff ignore --- styleguide_example/blog_examples/tasks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/styleguide_example/blog_examples/tasks.py b/styleguide_example/blog_examples/tasks.py index 1611e6fb..db65c0fb 100644 --- a/styleguide_example/blog_examples/tasks.py +++ b/styleguide_example/blog_examples/tasks.py @@ -1,2 +1,2 @@ -# noqa +# ruff: noqa from styleguide_example.blog_examples.celery_non_concurrent.tasks import test_non_concurrent_task