Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Comprehensive Celery Integration Enhancement in FastAPI Boilerplate #39

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 62 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -245,3 +245,65 @@ from core.helpers.cache import Cache, CacheTag
await Cache.remove_by_prefix(prefix="get_user_list")
await Cache.remove_by_tag(tag=CacheTag.GET_USER_LIST)
```

## Celery Integration

This project utilizes Celery for background task processing to enhance performance and scalability, particularly for operations that are IO-bound or computationally intensive. The integration is carefully designed to support both synchronous and asynchronous tasks, with a focus on maintaining consistency in database session management between FastAPI and Celery.

### Configuration

The `CeleryConfigurator` class centralizes the Celery setup, including configuration of task routes, worker settings, and database session management. This class ensures that Celery is seamlessly integrated with the existing FastAPI application structure, especially concerning how database sessions are handled.

#### Dynamic Queue Creation

Each application within the project can have its tasks routed to a specific queue that is automatically created and named based on the application's module name. This setup allows for fine-grained control over task processing and resource allocation, ensuring that tasks do not interfere with one another and can be scaled independently.

### Examples of Celery Tasks

#### Synchronous Task Example

Here's how a synchronous task is defined and used:

```python
# app/users/application/celery/tasks.py
from core.celery import celery_app

@celery_app.task(name="send_welcome_email")
def send_welcome_email(user_id):
"""
Sends a welcome email to the user specified by user_id.
"""
print(f"Sending welcome email to user {user_id}")
# Email sending logic here
return f"Welcome email sent to user {user_id}"

# To dispatch this task from your application code, use:
send_welcome_email.delay(user_id=123)

```

#### Asynchronous Task Example

Asynchronous tasks are beneficial for operations that involve I/O waiting times, such as database operations or requests to external services. Here's how to define an asynchronous task using the `async_task` decorator:

```python
# app/users/application/celery/tasks.py
from core.celery import async_task

@async_task(name="log_user_activity")
async def log_user_activity(user_id, activity):
"""
Asynchronously logs user activity.
"""
await asyncio.sleep(2) # Simulate async I/O operation
print(f"Activity logged for user {user_id}: {activity}")
return f"Activity logged for user {user_id}: {activity}"
```

#### Running Celery Workers

To start the Celery workers, use the following command, which ensures they are configured as per the settings defined in CeleryConfigurator:

```bash
celery -A core.celery.celery_app worker --loglevel=info -P prefork -E
```
File renamed without changes.
36 changes: 36 additions & 0 deletions app/user/application/celery/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from core.celery import celery_app


@celery_app.task(name="send_welcome_email")
def send_welcome_email(user_id):
"""
Task to send a welcome email to a new user.

Parameters:
user_id (int): The ID of the user to whom the welcome email is sent.
"""
# Logic to send an email would go here
print(f"Sending welcome email to user {user_id}")
# Simulate email sending delay
import time

time.sleep(5)
return f"Welcome email sent to user {user_id}"


@celery_app.task(name="log_user_activity")
def log_user_activity(user_id, activity):
"""
Task to log user activity.

Parameters:
user_id (int): The ID of the user.
activity (str): Description of the activity.
"""
# Logic to log the activity would go here
print(f"Logging activity for user {user_id}: {activity}")
# Simulate logging delay
import time

time.sleep(2)
return f"Activity logged for user {user_id}: {activity}"
12 changes: 0 additions & 12 deletions celery_task/__init__.py

This file was deleted.

8 changes: 8 additions & 0 deletions core/celery/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from .async_task import async_task
from .celery_worker import celery_app, celery_conf

__all__ = [
"celery_app",
"celery_conf",
"async_task",
]
43 changes: 43 additions & 0 deletions core/celery/async_task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import asyncio
from functools import wraps

from .celery_worker import celery_app


def async_task(self, *args, **opts):
"""
A decorator to transform synchronous task functions into asynchronous Celery tasks.

This decorator wraps the function within an asynchronous loop, enabling it to be
scheduled and run by Celery as an asynchronous task.

Args:
*args: Variable length argument list for task decorator options.
**opts: Arbitrary keyword arguments for task decorator options.

Returns:
function: A Celery task that wraps the original function.
"""

def decorator(f):
@wraps(f)
def wrapper(*args, **kwargs):
try:
loop = asyncio.get_event_loop()
if loop.is_closed():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)

if loop.is_running():
future = asyncio.ensure_future(f(*args, **kwargs))
return future
else:
return loop.run_until_complete(f(*args, **kwargs))

celery_task = celery_app.task(wrapper, *args, **opts)
return celery_task

return decorator
168 changes: 168 additions & 0 deletions core/celery/celery_worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
# core/celery/celery_app.py
import os

from celery import Celery, signals
from kombu import Queue

import app # noqa
from core.config import config
from core.db.session import reset_session_context, set_session_context


class CeleryConfigurator:
"""
Configures and manages the Celery application settings, task routing, and logging.

This class encapsulates the Celery configuration including the setup of task routes,
logging, and signal connections for task lifecycle events.

Attributes:
app (Celery): An instance of the Celery class, configured with backend and broker URLs.
"""

def __init__(self):
"""
Initializes the Celery application with specific settings for backend, broker, and signal connections.
"""
self.app = Celery(
"worker",
backend=config.CELERY_BACKEND_URL,
broker=config.CELERY_BROKER_URL,
)
self._connect_signals()
self.configure_celery()

def _connect_signals(self):
"""
Connects custom signal handlers to Celery signals for enhanced session.
"""
signals.task_prerun.connect(self.start_session)
signals.task_postrun.connect(self.close_session)

async def start_session(self, task=None, *args, **kwargs):
"""
Starts a database session at the beginning of a task's execution, managing the session context.

This method initializes the session context that will be used throughout the task's lifecycle.
It does not directly manipulate the session here but sets up the context needed for managing
the session during the task execution.

Args:
task (Celery Task): The current task instance.
*args: Variable positional arguments.
**kwargs: Variable keyword arguments.
"""
session_id = f"session_{task.request.id}"
token = set_session_context(session_id)
task.request.session_token = token

async def close_session(self, task=None, *args, **kwargs):
"""
Closes the database session after the task's execution is complete.

This method resets the session context set at the start of the task, ensuring that any session
specific to the task is properly concluded. The session itself will be automatically closed by
the async_scoped_session.

Args:
task (Celery Task): The current task instance.
*args: Variable positional arguments.
**kwargs: Variable keyword arguments.
"""
token = task.request.session_token
reset_session_context(token)

def _setup_queues(self):
"""
Automatically configures Celery queues based on the application modules.

This method dynamically sets up queues for each module in the 'app' directory that contains a
'celery' subdirectory with a 'tasks.py' file. It constructs the queue name from the module name,
ensuring each task module has its corresponding queue.

The queues are configured with a naming convention of '<module-name>-queue' and use a routing key
that matches tasks from the respective module to the appropriate queue based on the module's name.

How it works:
- It navigates through the 'app' directory to find subdirectories that match the expected
structure for Celery tasks (i.e., 'app/<module>/application/celery/tasks.py').
- For each valid module, it creates a queue named '<module-name>-queue'.
- The routing key for each queue is set to '<module-name>.#', which means it will match any
routing keys that start with the module name, allowing for flexible task routing within the module.

Example:
- If there is a module 'users' with the path 'app/users/application/celery/tasks.py',
this method will setup a queue named 'users-queue' with a routing key 'users.#'.

Note:
- This method assumes that each part of the system that needs task processing capabilities will
conform to this directory and naming structure. This convention simplifies the setup and
scaling of task processing across different parts of the application.
"""
base_path = os.path.join(os.getcwd(), "app")
modules = [
name
for name in os.listdir(base_path)
if os.path.isdir(os.path.join(base_path, name, "application", "celery"))
and os.path.exists(
os.path.join(base_path, name, "application", "celery", "tasks.py")
)
]

self.app.conf.task_queues = [
Queue(f"{module}-queue", routing_key=f"{module}.#") for module in modules
]

def _setup_task_routes(self):
"""
Configures task routing rules to ensure tasks are directed to the appropriate queues.

Detailed Routing Setup:
- Dynamic routing based on the task's module name allows for a scalable approach
to handle various tasks.
- The queue name is inferred dynamically from the module name of the task, promoting
consistency and maintainability.

Example:
- A task located in 'app.catalog_item.tasks' is routed to 'catalog-items-queue'.

Developers are encouraged to maintain a consistent naming convention in task module
names to ensure the routing logic functions correctly.

Note:
- This method sets up the routing using a lambda function that parses the module name to
determine the queue. This method assumes that the module name directly corresponds to
the queue name.
"""

self.app.conf.task_routes = {
"app.*.application.celery.*": {
"queue": lambda args: f"{args[0].split('.')[1]}-queue"
}
}

def configure_celery(self):
"""
Applies the configurations for task routes and updates default settings for task execution.

This method centralizes the configuration of task behavior such as concurrency, task expiration,
and memory limits to optimize worker performance and resource management.
"""

self._setup_task_routes()
self._setup_queues()
self.app.conf.update(
task_track_started=True,
worker_concurrency=10,
worker_prefetch_multiplier=1,
worker_max_tasks_per_child=100,
worker_max_memory_per_child=300000,
result_expires=3600,
broker_transport_options={"visibility_timeout": 3600},
broker_connection_retry=True,
)
self.app.autodiscover_tasks(["app"])


celery_conf = CeleryConfigurator()
celery_app = celery_conf.app