diff --git a/README.md b/README.md index c02c304..c0453ba 100644 --- a/README.md +++ b/README.md @@ -245,3 +245,65 @@ from core.helpers.cache import Cache, CacheTag await Cache.remove_by_prefix(prefix="get_user_list") await Cache.remove_by_tag(tag=CacheTag.GET_USER_LIST) ``` + +## Celery Integration + +This project utilizes Celery for background task processing to enhance performance and scalability, particularly for operations that are IO-bound or computationally intensive. The integration is carefully designed to support both synchronous and asynchronous tasks, with a focus on maintaining consistency in database session management between FastAPI and Celery. + +### Configuration + +The `CeleryConfigurator` class centralizes the Celery setup, including configuration of task routes, worker settings, and database session management. This class ensures that Celery is seamlessly integrated with the existing FastAPI application structure, especially concerning how database sessions are handled. + +#### Dynamic Queue Creation + +Each application within the project can have its tasks routed to a specific queue that is automatically created and named based on the application's module name. This setup allows for fine-grained control over task processing and resource allocation, ensuring that tasks do not interfere with one another and can be scaled independently. + +### Examples of Celery Tasks + +#### Synchronous Task Example + +Here's how a synchronous task is defined and used: + +```python +# app/users/application/celery/tasks.py +from core.celery import celery_app + +@celery_app.task(name="send_welcome_email") +def send_welcome_email(user_id): + """ + Sends a welcome email to the user specified by user_id. + """ + print(f"Sending welcome email to user {user_id}") + # Email sending logic here + return f"Welcome email sent to user {user_id}" + +# To dispatch this task from your application code, use: +send_welcome_email.delay(user_id=123) + +``` + +#### Asynchronous Task Example + +Asynchronous tasks are beneficial for operations that involve I/O waiting times, such as database operations or requests to external services. Here's how to define an asynchronous task using the `async_task` decorator: + +```python +# app/users/application/celery/tasks.py +from core.celery import async_task + +@async_task(name="log_user_activity") +async def log_user_activity(user_id, activity): + """ + Asynchronously logs user activity. + """ + await asyncio.sleep(2) # Simulate async I/O operation + print(f"Activity logged for user {user_id}: {activity}") + return f"Activity logged for user {user_id}: {activity}" +``` + +#### Running Celery Workers + +To start the Celery workers, use the following command, which ensures they are configured as per the settings defined in CeleryConfigurator: + +```bash +celery -A core.celery.celery_app worker --loglevel=info -P prefork -E +``` diff --git a/celery_task/tasks/__init__.py b/app/user/application/celery/__init__.py similarity index 100% rename from celery_task/tasks/__init__.py rename to app/user/application/celery/__init__.py diff --git a/app/user/application/celery/tasks.py b/app/user/application/celery/tasks.py new file mode 100644 index 0000000..cf0af20 --- /dev/null +++ b/app/user/application/celery/tasks.py @@ -0,0 +1,36 @@ +from core.celery import celery_app + + +@celery_app.task(name="send_welcome_email") +def send_welcome_email(user_id): + """ + Task to send a welcome email to a new user. + + Parameters: + user_id (int): The ID of the user to whom the welcome email is sent. + """ + # Logic to send an email would go here + print(f"Sending welcome email to user {user_id}") + # Simulate email sending delay + import time + + time.sleep(5) + return f"Welcome email sent to user {user_id}" + + +@celery_app.task(name="log_user_activity") +def log_user_activity(user_id, activity): + """ + Task to log user activity. + + Parameters: + user_id (int): The ID of the user. + activity (str): Description of the activity. + """ + # Logic to log the activity would go here + print(f"Logging activity for user {user_id}: {activity}") + # Simulate logging delay + import time + + time.sleep(2) + return f"Activity logged for user {user_id}: {activity}" diff --git a/celery_task/__init__.py b/celery_task/__init__.py deleted file mode 100644 index 28423b7..0000000 --- a/celery_task/__init__.py +++ /dev/null @@ -1,12 +0,0 @@ -from celery import Celery -from core.config import config - - -celery_app = Celery( - "worker", - backend=config.CELERY_BACKEND_URL, - broker=config.CELERY_BROKER_URL, -) - -celery_app.conf.task_routes = {"worker.celery_worker.test_celery": "test-queue"} -celery_app.conf.update(task_track_started=True) diff --git a/core/celery/__init__.py b/core/celery/__init__.py new file mode 100644 index 0000000..b3814ec --- /dev/null +++ b/core/celery/__init__.py @@ -0,0 +1,8 @@ +from .async_task import async_task +from .celery_worker import celery_app, celery_conf + +__all__ = [ + "celery_app", + "celery_conf", + "async_task", +] diff --git a/core/celery/async_task.py b/core/celery/async_task.py new file mode 100644 index 0000000..e070fc9 --- /dev/null +++ b/core/celery/async_task.py @@ -0,0 +1,43 @@ +import asyncio +from functools import wraps + +from .celery_worker import celery_app + + +def async_task(self, *args, **opts): + """ + A decorator to transform synchronous task functions into asynchronous Celery tasks. + + This decorator wraps the function within an asynchronous loop, enabling it to be + scheduled and run by Celery as an asynchronous task. + + Args: + *args: Variable length argument list for task decorator options. + **opts: Arbitrary keyword arguments for task decorator options. + + Returns: + function: A Celery task that wraps the original function. + """ + + def decorator(f): + @wraps(f) + def wrapper(*args, **kwargs): + try: + loop = asyncio.get_event_loop() + if loop.is_closed(): + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + except RuntimeError: + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + if loop.is_running(): + future = asyncio.ensure_future(f(*args, **kwargs)) + return future + else: + return loop.run_until_complete(f(*args, **kwargs)) + + celery_task = celery_app.task(wrapper, *args, **opts) + return celery_task + + return decorator diff --git a/core/celery/celery_worker.py b/core/celery/celery_worker.py new file mode 100644 index 0000000..1ce6283 --- /dev/null +++ b/core/celery/celery_worker.py @@ -0,0 +1,168 @@ +# core/celery/celery_app.py +import os + +from celery import Celery, signals +from kombu import Queue + +import app # noqa +from core.config import config +from core.db.session import reset_session_context, set_session_context + + +class CeleryConfigurator: + """ + Configures and manages the Celery application settings, task routing, and logging. + + This class encapsulates the Celery configuration including the setup of task routes, + logging, and signal connections for task lifecycle events. + + Attributes: + app (Celery): An instance of the Celery class, configured with backend and broker URLs. + """ + + def __init__(self): + """ + Initializes the Celery application with specific settings for backend, broker, and signal connections. + """ + self.app = Celery( + "worker", + backend=config.CELERY_BACKEND_URL, + broker=config.CELERY_BROKER_URL, + ) + self._connect_signals() + self.configure_celery() + + def _connect_signals(self): + """ + Connects custom signal handlers to Celery signals for enhanced session. + """ + signals.task_prerun.connect(self.start_session) + signals.task_postrun.connect(self.close_session) + + async def start_session(self, task=None, *args, **kwargs): + """ + Starts a database session at the beginning of a task's execution, managing the session context. + + This method initializes the session context that will be used throughout the task's lifecycle. + It does not directly manipulate the session here but sets up the context needed for managing + the session during the task execution. + + Args: + task (Celery Task): The current task instance. + *args: Variable positional arguments. + **kwargs: Variable keyword arguments. + """ + session_id = f"session_{task.request.id}" + token = set_session_context(session_id) + task.request.session_token = token + + async def close_session(self, task=None, *args, **kwargs): + """ + Closes the database session after the task's execution is complete. + + This method resets the session context set at the start of the task, ensuring that any session + specific to the task is properly concluded. The session itself will be automatically closed by + the async_scoped_session. + + Args: + task (Celery Task): The current task instance. + *args: Variable positional arguments. + **kwargs: Variable keyword arguments. + """ + token = task.request.session_token + reset_session_context(token) + + def _setup_queues(self): + """ + Automatically configures Celery queues based on the application modules. + + This method dynamically sets up queues for each module in the 'app' directory that contains a + 'celery' subdirectory with a 'tasks.py' file. It constructs the queue name from the module name, + ensuring each task module has its corresponding queue. + + The queues are configured with a naming convention of '-queue' and use a routing key + that matches tasks from the respective module to the appropriate queue based on the module's name. + + How it works: + - It navigates through the 'app' directory to find subdirectories that match the expected + structure for Celery tasks (i.e., 'app//application/celery/tasks.py'). + - For each valid module, it creates a queue named '-queue'. + - The routing key for each queue is set to '.#', which means it will match any + routing keys that start with the module name, allowing for flexible task routing within the module. + + Example: + - If there is a module 'users' with the path 'app/users/application/celery/tasks.py', + this method will setup a queue named 'users-queue' with a routing key 'users.#'. + + Note: + - This method assumes that each part of the system that needs task processing capabilities will + conform to this directory and naming structure. This convention simplifies the setup and + scaling of task processing across different parts of the application. + """ + base_path = os.path.join(os.getcwd(), "app") + modules = [ + name + for name in os.listdir(base_path) + if os.path.isdir(os.path.join(base_path, name, "application", "celery")) + and os.path.exists( + os.path.join(base_path, name, "application", "celery", "tasks.py") + ) + ] + + self.app.conf.task_queues = [ + Queue(f"{module}-queue", routing_key=f"{module}.#") for module in modules + ] + + def _setup_task_routes(self): + """ + Configures task routing rules to ensure tasks are directed to the appropriate queues. + + Detailed Routing Setup: + - Dynamic routing based on the task's module name allows for a scalable approach + to handle various tasks. + - The queue name is inferred dynamically from the module name of the task, promoting + consistency and maintainability. + + Example: + - A task located in 'app.catalog_item.tasks' is routed to 'catalog-items-queue'. + + Developers are encouraged to maintain a consistent naming convention in task module + names to ensure the routing logic functions correctly. + + Note: + - This method sets up the routing using a lambda function that parses the module name to + determine the queue. This method assumes that the module name directly corresponds to + the queue name. + """ + + self.app.conf.task_routes = { + "app.*.application.celery.*": { + "queue": lambda args: f"{args[0].split('.')[1]}-queue" + } + } + + def configure_celery(self): + """ + Applies the configurations for task routes and updates default settings for task execution. + + This method centralizes the configuration of task behavior such as concurrency, task expiration, + and memory limits to optimize worker performance and resource management. + """ + + self._setup_task_routes() + self._setup_queues() + self.app.conf.update( + task_track_started=True, + worker_concurrency=10, + worker_prefetch_multiplier=1, + worker_max_tasks_per_child=100, + worker_max_memory_per_child=300000, + result_expires=3600, + broker_transport_options={"visibility_timeout": 3600}, + broker_connection_retry=True, + ) + self.app.autodiscover_tasks(["app"]) + + +celery_conf = CeleryConfigurator() +celery_app = celery_conf.app