Skip to content

Conversation

@sylvinus
Copy link
Member

@sylvinus sylvinus commented Oct 20, 2025

12 years later, everything old is new again: https://www.slideshare.net/slideshow/why-and-how-pricing-assistant-migrated-from-celery-to-rq-parispy-2/24526592

We are facing the same issues: inability to properly monitor and clean failing tasks, complex dependency introduced in the codebase.

Dramatiq seems to check a few boxes: Good redis support, reliable tasks, simple API, simple dashboard, understandable code. Let's try it!

Summary by CodeRabbit

Release Notes

  • New Features

    • Added Dramatiq Dashboard for task monitoring in the admin panel
  • Chores

    • Migrated task management system from Celery to Dramatiq
    • Updated container image versions and dependencies
    • Removed search reindex management command
    • Updated worker and scheduler deployment configuration
    • Configured scheduled tasks for system maintenance (selfcheck and message retry)

@coderabbitai
Copy link

coderabbitai bot commented Oct 20, 2025

Walkthrough

This PR migrates the backend message queue system from Celery to Dramatiq. Key changes include replacing task decorators and invocation methods (.delay().send()), updating result tracking (.id.message_id), implementing a new progress reporting system via set_task_progress, updating Docker and deployment configurations, and adding a Dramatiq dashboard to Django admin.

Changes

Cohort / File(s) Summary
Configuration & Infrastructure
Makefile, Procfile, compose.yaml, env.d/development/backend.defaults, src/backend/Dockerfile, docs/env.md
Updated deployment and development configurations: pip-audit ignore flag in Makefile, Procfile worker/scheduler commands using Django management instead of Celery, docker-compose service renames (celery-dev → worker-dev, celery-ui → scheduler-dev) with new commands and dependencies, Docker base image and Poetry version bumps, Dramatiq environment variables, and documentation updates reflecting broker URL changes (CELERY_BROKER_URL → DRAMATIQ_BROKER_URL).
Core Task Framework
src/backend/core/utils.py, src/backend/core/worker_setup.py, src/backend/core/management/commands/worker.py
Introduced new Dramatiq task utilities: register_task() wrapper, set_task_progress() for progress tracking, get_task_progress() for retrieval; added worker setup module for Django initialization; created custom worker command extending django_dramatiq's RunDramatiqCommand.
Task Definitions Migration
src/backend/core/mda/tasks.py, src/backend/core/services/importer/tasks.py, src/backend/core/services/search/tasks.py
Migrated from Celery decorators to @register_task decorator; removed self parameter from task signatures; replaced Celery state updates with explicit set_task_progress() calls; added cron scheduling for selfcheck and retry tasks; removed error-handling try/except blocks in favor of direct execution flow.
Task Invocation Updates
src/backend/core/api/viewsets/send.py, src/backend/core/services/importer/service.py, src/backend/core/signals.py
Changed task invocation from .delay() to .send(); updated result handling from task.id to task.message_id; preserved surrounding business logic and error handling.
Task Status & Progress Tracking
src/backend/core/api/viewsets/task.py, src/backend/core/services/importer/imap.py
Replaced Celery AsyncResult-based status checks with Dramatiq result backend queries; added new DRAMATIQ_STATES mapping; implemented progress-driven response with progress/message/timestamp fields; removed task_instance parameter from process_folder_messages signature.
Admin Interface
src/backend/core/admin.py, src/backend/core/templates/admin/index.html, src/backend/messages/urls.py
Integrated Dramatiq dashboard into Django admin: new CoreAdminSite with get_urls override exposing /dramatiq/ endpoint; staff-only dashboard view via WSGI synthetic environment; admin index template link to Task Management dashboard.
Settings & Configuration
src/backend/messages/settings.py, src/backend/pyproject.toml
Removed all Celery configuration (CELERY_BROKER_URL, CELERY_RESULT_BACKEND, django_celery_beat, etc.); added Dramatiq configuration blocks (DRAMATIQ_BROKER, DRAMATIQ_RESULT_BACKEND, DRAMATIQ_CRONTAB) across all environments; updated dependencies: removed celery[redis], django-celery-beat, django-celery-results; added django-dramatiq, dramatiq[redis], dramatiq-dashboard, dramatiq-crontab[sentry].
Removed/Deleted
src/backend/core/management/commands/search_reindex.py, src/backend/core/services/dns/tasks.py, src/backend/messages/celery_app.py
Removed OpenSearch reindex management command, DNS Celery task scaffold, and Celery app configuration module.
Module/Package Updates
src/backend/core/tasks.py, src/backend/core/services/search/__init__.py, src/backend/core/services/search/index.py, src/backend/messages/__init__.py, src/backend/core/management/commands/run_task.py
Commented out task imports in core/tasks.py; removed reindex_all/reindex_mailbox/reindex_thread exports and implementations; removed celery_app export; updated run_task.py terminology and task discovery logic to search multiple task modules in priority order.
Test Suite Updates
src/backend/core/tests/conftest.py, src/backend/core/tests/api/test_*.py, src/backend/core/tests/importer/test_*.py, src/backend/core/tests/search/test_*.py
Added dramatiq worker and worker_broker fixtures; created comprehensive TaskDetailViewTest with progress/status scenarios; updated all task mocks from .delay() to .send() and result references from .id to .message_id; refactored progress mocking to use set_task_progress; added integration test for task API; removed celery_app backend patching.

Sequence Diagram(s)

sequenceDiagram
    participant Client as Client/API
    participant Handler as View/Signal Handler
    participant Queue as Dramatiq Queue
    participant Worker as Dramatiq Worker
    participant Cache as Django Cache
    participant DB as Database

    alt Old Celery Flow
        Client->>Handler: Request/Event
        Handler->>Queue: task.delay() 
        Queue->>Worker: Get task
        Worker->>DB: Update via self.update_state()
        Client->>Handler: Poll task.id in Celery backend
        Handler-->>Client: Status
    end

    alt New Dramatiq Flow
        Client->>Handler: Request/Event
        Handler->>Queue: task.send()
        Queue->>Worker: Get task
        Worker->>Cache: set_task_progress(progress, metadata)
        Cache-->>Cache: Store {progress, message, timestamp}
        Client->>Handler: Poll task.message_id
        Handler->>Cache: get_task_progress(task_id)
        Cache-->>Handler: {progress, message, timestamp}
        Handler-->>Client: Status with progress data
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

This migration spans multiple architectural layers with high heterogeneity: configuration changes, decorator/invocation pattern updates across multiple task modules, new progress tracking infrastructure, admin integration, and comprehensive test updates. While many changes follow predictable patterns (.delay().send()), the task definition logic, progress handling, and state management require separate reasoning for each affected module. The scope is broad (30+ files), with dense logic changes in task handlers and API endpoints.

Possibly related PRs

  • ♻️(import) fix naming after refactor #365: Modifies the same src/backend/core/services/importer/imap.py module—this PR changes progress reporting mechanism while the retrieved PR further refactors the importer function signatures and promotion.
  • ✨(mta) add prototype integrated MTA out #285: Modifies task/worker codepaths (src/backend/core/mda/tasks.py, task scheduling)—this PR replaces Celery infrastructure while the retrieved PR adds complementary Celery-based MTA selfcheck and SMTP outbound functionality.
  • ✨(import) Enhance message import through multipart upload #372: Modifies message import codepaths (src/backend/core/api/viewsets/import_message.py, src/backend/core/services/importer/service.py, src/backend/core/services/importer/tasks.py)—both PRs affect the same import task signatures and invocation patterns.

Suggested reviewers

  • sdemagny
  • Nastaliss

Poem

🐰 From Celery's bloom to Dramatiq's beat,
Tasks now send forth in rhythmic fleet,
Progress whispers through the cache so bright,
Workers dance through the queue-less night!
No more delays—just messages true,
A hop, skip, and queue-jump right through! 🎪

Pre-merge checks and finishing touches

✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title Check ✅ Passed The pull request title "✨(worker) switch from Celery to Dramatiq" directly and clearly summarizes the primary change across the entire changeset. The raw summary shows extensive modifications throughout the codebase focused on migrating from Celery to Dramatiq, including updates to task definitions (core/mda/tasks.py, core/services/importer/tasks.py, core/services/search/tasks.py), configuration files (messages/settings.py), dependencies (pyproject.toml), and management commands. The title accurately reflects this main objective stated in the PR description and uses concise, specific language that a teammate scanning the history would immediately understand.
Docstring Coverage ✅ Passed Docstring coverage is 97.53% which is sufficient. The required threshold is 80.00%.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch dramatiq

Warning

There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure.

🔧 Pylint (4.0.1)
src/backend/core/admin.py
src/backend/core/api/viewsets/task.py
src/backend/core/api/viewsets/send.py
  • 23 others

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 13

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (9)
src/backend/core/tests/search/test_search.py (2)

136-150: Fix incorrect assertion keys to match task return structure.

The task function reindex_all_task returns a dictionary with a "success" key, not "status". Line 145 will fail.

Apply this diff to fix the assertion:

-    assert result["status"] == "success"
+    assert result["success"] is True

152-164: Fix incorrect assertion keys to match task return structure.

The task function reindex_mailbox_task returns "success" and "mailbox_id" keys, not "status" and "mailbox". Lines 162-163 will fail.

Apply this diff to fix the assertions:

-    assert result["status"] == "success"
-    assert result["mailbox"] == str(test_mailbox.id)
+    assert result["success"] is True
+    assert result["mailbox_id"] == str(test_mailbox.id)
src/backend/core/services/importer/tasks.py (1)

184-225: MBOX streaming loads the entire file into memory (risk of OOM).

Collecting every message then reversing defeats the “without loading everything into memory” goal and will blow up on large mbox files. Use offsets with a spooled temp file to reverse without buffering message bodies.

-def stream_mbox_messages(file) -> Generator[bytes, None, None]:
+def stream_mbox_messages(file) -> Generator[bytes, None, None]:
@@
-    current_message = []
-    in_message = False
-    messages = []
-
-    # Read line by line to avoid loading entire file into memory at once
-    # We still need to collect messages for reversing due to mbox format
-    for line in file:
-        # Check for mbox message separator
-        if line.startswith(b"From "):
-            if in_message and current_message:
-                # End of previous message - store it
-                messages.append(b"".join(current_message))
-                current_message = []
-            in_message = True
-            # Skip the mbox From line
-            continue
-
-        if in_message:
-            current_message.append(line)
-
-    # Add the last message if there is one
-    if current_message:
-        messages.append(b"".join(current_message))
-
-    # Yield messages in reverse order to treat replies correctly
-    # (mbox format stores newest messages first)
-    for message in reversed(messages):
-        yield message
+    # Spool to disk, track offsets of message boundaries, then slice in reverse.
+    import io
+    import tempfile
+
+    with tempfile.SpooledTemporaryFile(max_size=32 * 1024 * 1024) as tmp:
+        # Copy stream to a seekable tmp file in chunks
+        for chunk in iter(lambda: file.read(1024 * 1024), b""):
+            tmp.write(chunk)
+        tmp.flush()
+        tmp.seek(0)
+
+        offsets: list[tuple[int, int]] = []  # (start, end)
+        start = None
+        pos = 0
+        while True:
+            line = tmp.readline()
+            if not line:
+                # EOF
+                if start is not None:
+                    offsets.append((start, pos))
+                break
+            if line.startswith(b"From "):
+                if start is not None:
+                    offsets.append((start, pos))
+                start = pos + len(line)  # skip the mbox "From " separator
+            pos = tmp.tell()
+
+        for start, end in reversed(offsets):
+            tmp.seek(start)
+            remaining = end - start
+            buf = io.BytesIO()
+            while remaining > 0:
+                chunk = tmp.read(min(1024 * 1024, remaining))
+                if not chunk:
+                    break
+                buf.write(chunk)
+                remaining -= len(chunk)
+            yield buf.getvalue()

If S3 storage returns a non-seekable stream, this approach still works since we spool locally first. Based on learnings.

src/backend/core/tests/importer/test_imap_import.py (3)

174-235: Tests still assume Celery’s update_state; switch to set_task_progress or assert only results.

The task no longer calls update_state; these assertions will never run. Replace with progress assertions via set_task_progress or drop progress checks.

-@patch("imaplib.IMAP4_SSL")
-def test_imap_import_task_success(
-    mock_imap4_ssl, mailbox, mock_imap_connection, sample_email
-):
+@patch("imaplib.IMAP4_SSL")
+@patch("core.services.importer.tasks.set_task_progress")
+def test_imap_import_task_success(
+    mock_set_task_progress, mock_imap4_ssl, mailbox, mock_imap_connection, sample_email
+):
@@
-    # Create a mock task instance
-    mock_task = MagicMock()
-    mock_task.update_state = MagicMock()
-
-    with patch.object(
-        import_imap_messages_task, "update_state", mock_task.update_state
-    ):
-        # Run the task
-        task = import_imap_messages_task(
+    # Run the task directly (actor is callable)
+    task = import_imap_messages_task(
             imap_server="imap.example.com",
@@
-        # Verify progress updates were called correctly
-        assert mock_task.update_state.call_count == 4  # 3 PROGRESS + 1 SUCCESS
-
-        # Verify progress updates
-        for i in range(1, 4):
-            mock_task.update_state.assert_any_call(
-                state="PROGRESS",
-                meta={
-                    "result": {
-                        "message_status": f"Processing message {i} of 3",
-                        "total_messages": 3,
-                        "success_count": i,  # Current message was successful
-                        "failure_count": 0,
-                        "type": "imap",
-                        "current_message": i,
-                    },
-                    "error": None,
-                },
-            )
-
-        # Verify success update
-        mock_task.update_state.assert_any_call(
-            state="SUCCESS",
-            meta=task,
-        )
+    # Optional: verify at least initial progress was reported
+    mock_set_task_progress.assert_any_call(0, {"message": "Connecting to IMAP server"})

259-303: Remove update_state expectations in login failure test.

No update_state is called; assert the returned failure payload instead.

-    # Create a mock task instance
-    mock_task = MagicMock()
-    mock_task.update_state = MagicMock()
-
-    # Mock IMAP connection to raise an error on login
-    with (
-        patch.object(import_imap_messages_task, "update_state", mock_task.update_state),
-        patch("core.services.importer.imap.imaplib.IMAP4_SSL") as mock_imap,
-    ):
+    # Mock IMAP connection to raise an error on login
+    with patch("core.services.importer.imap.imaplib.IMAP4_SSL") as mock_imap:
@@
-        # Verify only failure update was called
-        assert mock_task.update_state.call_count == 1
-        mock_task.update_state.assert_called_once_with(
-            state="FAILURE",
-            meta={
-                "result": task_result["result"],
-                "error": task_result["error"],
-            },
-        )
+        # No progress calls required here; just ensure failure payload is correct

308-371: Fix mocked progress and assertions; current test shadows the patched mock and uses Celery-style args.

You overwrite mock_set_task_progress with a new MagicMock(), so no calls are captured; also the signature should be set_task_progress(progress: int, metadata: dict).

-@patch("imaplib.IMAP4_SSL")
-@patch("core.services.importer.tasks.set_task_progress")
-def test_imap_import_task_message_fetch_failure(
-    mock_set_task_progress, mock_imap4_ssl, mailbox
-):
+@patch("imaplib.IMAP4_SSL")
+@patch("core.services.importer.tasks.set_task_progress")
+def test_imap_import_task_message_fetch_failure(
+    mock_set_task_progress, mock_imap4_ssl, mailbox
+):
@@
-    mock_set_task_progress = MagicMock()
+    # Use the patched mock_set_task_progress injected by the decorator
@@
-    # Verify progress updates were called correctly
-    assert mock_set_task_progress.call_count == 3  # 3 PROGRESS
+    # Verify progress updates were emitted (count depends on implementation)
+    assert mock_set_task_progress.call_count >= 1
@@
-    for i in range(1, 4):
-        mock_set_task_progress.assert_any_call(
-            state="PROGRESS",
-            meta={
-                "result": {
-                    "message_status": f"Processing message {i} of 3",
-                    "total_messages": 3,
-                    "success_count": 0,
-                    "failure_count": i,  # Current message failed
-                    "type": "imap",
-                    "current_message": i,
-                },
-                "error": None,
-            },
-        )
+    # Example: assert one of the calls contained the expected metadata message
+    messages = [
+        kwargs.get("metadata", {}).get("message")
+        if kwargs else (args[1].get("message") if len(args) >= 2 else None)
+        for args, kwargs in (call.args, call.kwargs) for call in mock_set_task_progress.call_args_list
+    ]
+    assert any("Processing message" in (m or "") for m in messages)
src/backend/core/api/viewsets/task.py (1)

37-45: Schema omits PROGRESS fields actually returned.

Add progress, message, and timestamp to OpenAPI schema to avoid drift.

-            fields={
-                "status": drf_serializers.ChoiceField(choices=sorted(DRAMATIQ_STATES)),
-                "result": drf_serializers.JSONField(allow_null=True),
-                "error": drf_serializers.CharField(allow_null=True),
-            },
+            fields={
+                "status": drf_serializers.ChoiceField(choices=sorted(DRAMATIQ_STATES)),
+                "result": drf_serializers.JSONField(allow_null=True),
+                "error": drf_serializers.CharField(allow_null=True),
+                # Present when status == "PROGRESS"
+                "progress": drf_serializers.IntegerField(required=False),
+                "message": drf_serializers.CharField(required=False, allow_blank=True),
+                "timestamp": drf_serializers.FloatField(required=False),
+            },
src/backend/core/mda/tasks.py (1)

113-122: Set final progress on early error paths

When the message doesn’t exist or is a draft, progress never reaches 100%, leaving the UI in limbo. Set progress to 100 with an error message before returning.

         except models.Message.DoesNotExist:
             error_msg = f"Message with ID '{message_id}' does not exist"
-            return {"success": False, "error": error_msg}
+            set_task_progress(100, {"message": error_msg})
+            return {"success": False, "error": error_msg}
 
         if message.is_draft:
             error_msg = f"Message '{message_id}' is still a draft and cannot be sent"
-            return {"success": False, "error": error_msg}
+            set_task_progress(100, {"message": error_msg})
+            return {"success": False, "error": error_msg}
src/backend/core/services/search/tasks.py (1)

103-125: Handle nonexistent mailbox gracefully

Mirror your thread/message handlers: catch Mailbox.DoesNotExist and return a structured error.

-    threads = models.Mailbox.objects.get(id=mailbox_id).threads_viewer
+    try:
+        threads = models.Mailbox.objects.get(id=mailbox_id).threads_viewer
+    except models.Mailbox.DoesNotExist:
+        logger.error("Mailbox %s does not exist", mailbox_id)
+        return {"mailbox_id": str(mailbox_id), "success": False, "error": "mailbox_not_found"}
🧹 Nitpick comments (11)
src/backend/core/worker_setup.py (1)

6-11: Consider adding error handling and logging for worker initialization.

The module-level execution of install() and django.setup() lacks error handling and observability. If initialization fails, workers may start with incomplete Django configuration, leading to cryptic task failures.

Apply this diff to add error handling and logging:

+import logging
 import django

 from configurations.importer import install

+logger = logging.getLogger(__name__)
+
+try:
+    logger.info("Initializing Django configuration for Dramatiq worker...")
-install(check_options=True)
-django.setup()
+    install(check_options=True)
+    django.setup()
+    logger.info("Django configuration initialized successfully")
+except Exception as e:
+    logger.exception("Failed to initialize Django configuration for worker: %s", e)
+    raise

As per coding guidelines.

src/backend/core/utils.py (2)

39-57: Clamp progress and use timezone-aware timestamps.

Avoid invalid values and prefer timezone-aware timestamps for consistency across services.

-from typing import Any, Dict, Optional
+from typing import Any, Dict, Optional
+from django.utils import timezone
@@
-def set_task_progress(progress: int, metadata: Optional[Dict[str, Any]] = None) -> None:
+def set_task_progress(progress: int, metadata: Optional[Dict[str, Any]] = None) -> None:
@@
-    task_id = current_message.message_id
+    task_id = current_message.message_id
+    # Defensive clamp to [0, 100]
+    try:
+        progress = max(0, min(100, int(progress)))
+    except (TypeError, ValueError):
+        progress = 0
     progress_data = {
         "progress": progress,
-        "timestamp": time.time(),
+        "timestamp": timezone.now().timestamp(),
         "metadata": metadata or {},
     }

60-69: LGTM.

Simple cache getter; consider adding a small, module-level constant for the cache key prefix to avoid typos across modules.

src/backend/core/services/importer/tasks.py (2)

97-115: Guard against zero-division and over-update.

If total_messages is 0, the loop won’t run, but being explicit avoids future refactors breaking this. Also, avoid computing i/total_messages when total_messages is small.

-                if i % 100 == 0 or i == total_messages:
-                    progress_percentage = min(20 + (i / total_messages) * 70, 90)
+                if total_messages > 0 and (i % 100 == 0 or i == total_messages):
+                    progress_percentage = min(20 + (i / total_messages) * 70, 90)

366-379: Consider emitting a final 100% progress update on success.

MBOX task sets 100% at completion; mirror this for EML to keep UX consistent.

-        if success:
-            return {
+        if success:
+            set_task_progress(100, {"message": "EML processing completed successfully"})
+            return {
                 "status": "SUCCESS",
                 "result": result,
                 "error": None,
             }
src/backend/core/tests/importer/test_imap_import.py (1)

373-454: Drop update_state patch in duplicate recipients test; keep result assertions only.

The actor doesn’t call update_state.

-    # Create a mock task instance
-    mock_task = MagicMock()
-    mock_task.update_state = MagicMock()
-
-    with patch.object(
-        import_imap_messages_task, "update_state", mock_task.update_state
-    ):
-        # Run the task
-        task = import_imap_messages_task(
+    # Run the task
+    task = import_imap_messages_task(
             imap_server="imap.example.com",
@@
-        # Critical: Verify that no validation errors were logged
-        # This ensures the deduplication logic works correctly
+        # Critical: Verify that no validation errors were logged
+        # This ensures the deduplication logic works correctly
         mock_logger.error.assert_not_called()
src/backend/core/api/viewsets/task.py (1)

65-65: Prefer the project’s IsAuthenticated to avoid DB hits.

Swap DRF’s permission for core.api.permissions.IsAuthenticated which short-circuits on tokens.

-from rest_framework import permissions
+from rest_framework import permissions
+from core.api.permissions import IsAuthenticated as CoreIsAuthenticated
@@
-    permission_classes = [permissions.IsAuthenticated]
+    permission_classes = [CoreIsAuthenticated]
src/backend/core/mda/tasks.py (1)

64-70: Return archive outcome in task result

Expose whether archiving actually happened to callers.

-    result = {
-        "message_id": str(message_id),
-        "success": True,
-    }
+    result = {
+        "message_id": str(message_id),
+        "success": True,
+        "archived": bool(must_archive),
+    }
src/backend/core/tests/api/test_task.py (1)

132-138: Create a valid Message (or use a harmless actor) for integration

send_message_task.send("test-message-id") will likely raise Message.DoesNotExist when the worker processes it. Either:

  • Build a minimal sendable Message via factories, then pass its id; or
  • Replace with a no-op/cheap actor for this integration check.

Example (adjust to your factories/models):

-    result = send_message_task.send("test-message-id")
+    # e.g., using a MessageFactory or minimal model setup:
+    # msg = factories.MessageFactory.create(is_draft=False, is_sender=True, ...)
+    # result = send_message_task.send(str(msg.id))
src/backend/core/services/search/tasks.py (2)

31-51: Stream large querysets to reduce memory footprint

For big datasets, use iterator() and compute total via .count() to avoid caching the entire queryset.

-    threads = models.Thread.objects.all()
-    total = threads.count()
+    total = models.Thread.objects.count()
+    threads = models.Thread.objects.all().iterator(chunk_size=1000)
 
-    for i, thread in enumerate(threads):
+    for i, thread in enumerate(threads, start=1):
         try:
             if index_thread(thread):
                 success_count += 1
             else:
                 failure_count += 1
 ...
-        if i % 100 == 0:
+        if i % 100 == 0:
             logger.debug("Progress for all threads: %s of %s", i, total)

121-124: Optional: expose progress via set_task_progress

If you want UI progress parity with MDA tasks, call set_task_progress at start/periodically/finish.

-from core.utils import register_task
+from core.utils import register_task, set_task_progress
@@
-    # Ensure index exists first
+    # Ensure index exists first
     create_index_if_not_exists()
+    set_task_progress(0, {"message": "Reindex mailbox started", "mailbox_id": str(mailbox_id)})
@@
-        if i % 50 == 0:
-            logger.debug(
-                "Updating progress for mailbox %s: %s of %s", mailbox_id, i, total
-            )
+        if i % 50 == 0:
+            pct = min(int(i / max(total, 1) * 100), 99)
+            set_task_progress(pct, {"message": f"Mailbox {mailbox_id} {i}/{total}"})
@@
-    return {
+    set_task_progress(100, {"message": f"Mailbox {mailbox_id} reindex complete"})
+    return {

Also applies to: 49-51

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between bf77b07 and ce0791e.

⛔ Files ignored due to path filters (1)
  • src/backend/poetry.lock is excluded by !**/*.lock
📒 Files selected for processing (40)
  • Makefile (1 hunks)
  • Procfile (1 hunks)
  • compose.yaml (2 hunks)
  • docs/env.md (1 hunks)
  • env.d/development/backend.defaults (1 hunks)
  • src/backend/Dockerfile (2 hunks)
  • src/backend/core/admin.py (2 hunks)
  • src/backend/core/api/viewsets/import_message.py (2 hunks)
  • src/backend/core/api/viewsets/send.py (2 hunks)
  • src/backend/core/api/viewsets/task.py (4 hunks)
  • src/backend/core/management/commands/run_task.py (5 hunks)
  • src/backend/core/management/commands/search_reindex.py (0 hunks)
  • src/backend/core/management/commands/worker.py (1 hunks)
  • src/backend/core/mda/outbound.py (1 hunks)
  • src/backend/core/mda/tasks.py (6 hunks)
  • src/backend/core/services/dns/tasks.py (0 hunks)
  • src/backend/core/services/importer/imap.py (2 hunks)
  • src/backend/core/services/importer/service.py (2 hunks)
  • src/backend/core/services/importer/tasks.py (10 hunks)
  • src/backend/core/services/search/__init__.py (0 hunks)
  • src/backend/core/services/search/index.py (0 hunks)
  • src/backend/core/services/search/tasks.py (8 hunks)
  • src/backend/core/signals.py (3 hunks)
  • src/backend/core/tasks.py (1 hunks)
  • src/backend/core/templates/admin/index.html (1 hunks)
  • src/backend/core/tests/api/test_messages_import.py (15 hunks)
  • src/backend/core/tests/api/test_send_message_signature.py (11 hunks)
  • src/backend/core/tests/api/test_task.py (1 hunks)
  • src/backend/core/tests/conftest.py (1 hunks)
  • src/backend/core/tests/importer/test_file_import.py (2 hunks)
  • src/backend/core/tests/importer/test_imap_import.py (4 hunks)
  • src/backend/core/tests/importer/test_import_service.py (8 hunks)
  • src/backend/core/tests/search/test_search.py (3 hunks)
  • src/backend/core/utils.py (2 hunks)
  • src/backend/core/worker_setup.py (1 hunks)
  • src/backend/messages/__init__.py (0 hunks)
  • src/backend/messages/celery_app.py (0 hunks)
  • src/backend/messages/settings.py (4 hunks)
  • src/backend/messages/urls.py (1 hunks)
  • src/backend/pyproject.toml (2 hunks)
💤 Files with no reviewable changes (6)
  • src/backend/messages/init.py
  • src/backend/core/services/search/index.py
  • src/backend/core/services/search/init.py
  • src/backend/core/management/commands/search_reindex.py
  • src/backend/core/services/dns/tasks.py
  • src/backend/messages/celery_app.py
🧰 Additional context used
📓 Path-based instructions (6)
src/backend/**/*.py

📄 CodeRabbit inference engine (.cursor/rules/django-python.mdc)

src/backend/**/*.py: Follow Django/PEP 8 style with a 100-character line limit
Use descriptive, snake_case names for variables and functions
Use Django ORM for database access; avoid raw SQL unless necessary for performance
Use Django’s built-in user model and authentication framework
Prefer try-except blocks to handle exceptions in business logic and views
Log expected and unexpected actions with appropriate log levels
Capture and report exceptions to Sentry; use capture_exception() for custom errors
Do not log sensitive information (tokens, passwords, financial/health data, PII)

Files:

  • src/backend/core/services/importer/service.py
  • src/backend/core/tasks.py
  • src/backend/core/mda/outbound.py
  • src/backend/core/management/commands/worker.py
  • src/backend/core/tests/importer/test_import_service.py
  • src/backend/core/tests/api/test_send_message_signature.py
  • src/backend/messages/urls.py
  • src/backend/core/api/viewsets/import_message.py
  • src/backend/core/services/importer/tasks.py
  • src/backend/core/utils.py
  • src/backend/core/admin.py
  • src/backend/core/mda/tasks.py
  • src/backend/core/tests/importer/test_file_import.py
  • src/backend/core/signals.py
  • src/backend/core/tests/api/test_messages_import.py
  • src/backend/core/api/viewsets/send.py
  • src/backend/core/tests/api/test_task.py
  • src/backend/core/services/importer/imap.py
  • src/backend/core/tests/importer/test_imap_import.py
  • src/backend/core/tests/search/test_search.py
  • src/backend/core/management/commands/run_task.py
  • src/backend/core/tests/conftest.py
  • src/backend/core/api/viewsets/task.py
  • src/backend/core/worker_setup.py
  • src/backend/messages/settings.py
  • src/backend/core/services/search/tasks.py
src/backend/**/{views.py,tasks.py}

📄 CodeRabbit inference engine (.cursor/rules/django-python.mdc)

Use asynchronous views and Celery tasks for I/O-bound or long-running operations

Files:

  • src/backend/core/tasks.py
  • src/backend/core/services/importer/tasks.py
  • src/backend/core/mda/tasks.py
  • src/backend/core/services/search/tasks.py
src/backend/**/{tests.py,tests/**/*.py}

📄 CodeRabbit inference engine (.cursor/rules/django-python.mdc)

src/backend/**/{tests.py,tests/**/*.py}: Use Django’s testing tools (pytest-django) to ensure code quality and reliability
Unit tests should focus on a single use case, keep assertions minimal, and cover all possible cases

Files:

  • src/backend/core/tests/importer/test_import_service.py
  • src/backend/core/tests/api/test_send_message_signature.py
  • src/backend/core/tests/importer/test_file_import.py
  • src/backend/core/tests/api/test_messages_import.py
  • src/backend/core/tests/api/test_task.py
  • src/backend/core/tests/importer/test_imap_import.py
  • src/backend/core/tests/search/test_search.py
  • src/backend/core/tests/conftest.py
src/backend/**/urls.py

📄 CodeRabbit inference engine (.cursor/rules/django-python.mdc)

Define clear, RESTful URL patterns using Django’s URL dispatcher

Files:

  • src/backend/messages/urls.py
src/backend/**/{settings.py,middleware.py}

📄 CodeRabbit inference engine (.cursor/rules/django-python.mdc)

Use middleware judiciously for cross-cutting concerns (authentication, logging, caching)

Files:

  • src/backend/messages/settings.py
src/backend/**/settings.py

📄 CodeRabbit inference engine (.cursor/rules/django-python.mdc)

src/backend/**/settings.py: Leverage Django’s caching framework (e.g., Redis/Memcached) where appropriate
Use Django’s cache framework with a backend like Redis or Memcached to reduce DB load
Optimize static file handling using Django’s staticfiles pipeline (e.g., WhiteNoise)

Files:

  • src/backend/messages/settings.py
🧬 Code graph analysis (14)
src/backend/core/services/importer/service.py (1)
src/backend/core/services/importer/tasks.py (3)
  • process_mbox_file_task (30-170)
  • process_eml_file_task (367-459)
  • import_imap_messages_task (229-363)
src/backend/core/services/importer/tasks.py (3)
src/backend/core/mda/inbound.py (1)
  • deliver_inbound_message (332-736)
src/backend/core/mda/rfc5322/parser.py (1)
  • parse_email_message (369-480)
src/backend/core/utils.py (2)
  • register_task (14-23)
  • set_task_progress (39-57)
src/backend/core/utils.py (1)
src/backend/core/api/viewsets/task.py (1)
  • get (67-124)
src/backend/core/admin.py (1)
src/backend/core/api/viewsets/task.py (1)
  • get (67-124)
src/backend/core/mda/tasks.py (4)
src/backend/core/mda/outbound.py (1)
  • send_message (229-386)
src/backend/core/mda/selfcheck.py (1)
  • run_selfcheck (208-330)
src/backend/core/utils.py (2)
  • register_task (14-23)
  • set_task_progress (39-57)
src/backend/core/api/viewsets/task.py (1)
  • get (67-124)
src/backend/core/signals.py (1)
src/backend/core/services/search/tasks.py (2)
  • index_message_task (136-172)
  • reindex_thread_task (61-90)
src/backend/core/tests/api/test_messages_import.py (2)
src/backend/core/tests/api/test_import_file_upload.py (2)
  • api_client (25-29)
  • user (19-21)
src/backend/core/tests/api/test_messages_import_labels_en.py (4)
  • api_client (20-22)
  • user (26-28)
  • mailbox (32-36)
  • mbox_file (53-76)
src/backend/core/api/viewsets/send.py (1)
src/backend/core/mda/tasks.py (1)
  • send_message_task (21-70)
src/backend/core/tests/api/test_task.py (4)
src/backend/core/mda/tasks.py (1)
  • send_message_task (21-70)
src/backend/core/utils.py (1)
  • get_task_progress (60-69)
src/backend/core/factories.py (1)
  • UserFactory (20-31)
src/backend/core/api/viewsets/task.py (1)
  • get (67-124)
src/backend/core/services/importer/imap.py (1)
src/backend/core/utils.py (1)
  • set_task_progress (39-57)
src/backend/core/tests/importer/test_imap_import.py (1)
src/backend/core/services/importer/tasks.py (1)
  • import_imap_messages_task (229-363)
src/backend/core/tests/search/test_search.py (1)
src/backend/core/services/search/tasks.py (2)
  • reindex_all_task (22-57)
  • reindex_mailbox_task (94-132)
src/backend/core/api/viewsets/task.py (2)
src/backend/core/utils.py (1)
  • get_task_progress (60-69)
src/backend/core/api/permissions.py (1)
  • IsAuthenticated (18-25)
src/backend/core/services/search/tasks.py (1)
src/backend/core/utils.py (1)
  • register_task (14-23)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
  • GitHub Check: docker-publish-socks-proxy / docker-build-push
  • GitHub Check: build-front
🔇 Additional comments (49)
src/backend/Dockerfile (4)

2-2: Verify Python 3.13.9 compatibility with transitive dependencies.

The Python base image has been bumped to 3.13.9-slim-trixie. While learnings confirm Dramatiq 1.18.0 and Django 5.2 support this version range, you should verify that all transitive dependencies (including any binary wheels and system libraries) build and run correctly with Python 3.13.9.

Please confirm that:

  • The full dependency tree (via poetry lock) has been resolved and tested with Python 3.13.9.
  • Any C-extension dependencies or binary wheels are compatible with Python 3.13.9.
  • CI/test pipeline passes with this image.

25-25: Verify Poetry 2.2.1 compatibility and review changelog for breaking changes.

Poetry has been bumped from 2.1.4 to 2.2.1 (minor version update). You should verify that this version introduces no breaking changes to dependency resolution or build behavior.

Please confirm:

  • Poetry 2.2.1 resolves the dependency tree consistently with the current poetry.lock.
  • No breaking changes in 2.2.1 that could affect the build (e.g., changes to lockfile format, venv behavior, or installation flags used in line 41/46).
  • The poetry install --compile command on lines 41 and 46 continues to work as expected.

You can check the Poetry changelog at https://python-poetry.org/history/ or via poetry --version after upgrade to confirm the version and review release notes.


5-5: MIN_UPDATE_DATE cache-busting looks good.

The cache-busting timestamp has been appropriately updated to reflect the PR date (2025-10-20). This ensures apt-get update runs on subsequent builds to pull the latest package metadata.


1-119: Overall Dockerfile structure remains sound; Dramatiq integration handled externally.

The Dockerfile structure and stages are unchanged, which is appropriate for a Celery → Dramatiq migration. Dramatiq configuration (broker URL, worker initialization) is expected to be handled via environment variables and runtime configuration outside the Dockerfile (e.g., in docker-compose, Kubernetes manifests, or startup scripts). No Dramatiq-specific Dockerfile additions are needed.

Verify that:

  • Worker startup commands and Dramatiq broker/scheduler configuration are properly set in the deployment configuration (docker-compose, Kubernetes, systemd, etc.).
  • Environment variables (e.g., DRAMATIQ_BROKER_URL, DRAMATIQ_RESULT_BACKEND_URL) are passed to the container at runtime.
src/backend/core/api/viewsets/import_message.py (1)

46-46: LGTM! Documentation correctly updated.

The removal of "Celery" from the OpenAPI descriptions makes the API documentation framework-agnostic, which is appropriate for the Dramatiq migration.

Also applies to: 99-99

src/backend/core/mda/outbound.py (1)

232-232: LGTM! Docstring appropriately generalized.

The docstring update removes the Celery-specific reference, making it framework-agnostic.

env.d/development/backend.defaults (1)

30-32: LGTM! Dramatiq configuration looks correct.

The Redis broker URLs are properly configured for both the worker and scheduler components.

src/backend/core/services/importer/service.py (3)

72-73: LGTM! Correctly migrated to Dramatiq API.

The task invocation properly uses .send() instead of .delay(), and the message ID is correctly accessed via .message_id.


84-85: LGTM! EML import correctly migrated.

The EML import follows the same correct pattern as the MBOX import.


132-140: LGTM! IMAP import correctly migrated.

The IMAP import properly uses the Dramatiq API with keyword arguments passed to .send().

compose.yaml (2)

162-181: LGTM! Worker service correctly configured.

The worker-dev service is properly set up with the Django management command, Redis dependency, and appropriate volumes.


182-200: LGTM! Scheduler service correctly configured.

The scheduler-dev service properly uses the crontab management command and includes the static volume mount, likely for the Dramatiq dashboard.

src/backend/core/services/importer/imap.py (1)

6-6: LGTM! Correctly migrated to standard logging.

The switch from Celery task logger to Python's standard logging.getLogger(__name__) and importing set_task_progress is appropriate for the Dramatiq migration.

Also applies to: 16-18

Procfile (1)

2-3: Both management commands are available—no issues found.

The worker command is implemented locally in src/backend/core/management/commands/worker.py. The crontab command is automatically provided by the installed dramatiq_crontab package, which is already listed in INSTALLED_APPS. Per the library documentation, the python manage.py crontab syntax in the Procfile is correct.

Makefile (1)

327-330: This is not a false positive—it's a real vulnerability in pip that requires action.

GHSA-4xh5-x5gv-qwph is a real vulnerability in pip's fallback tar extraction that doesn't check symbolic links point to the extraction directory, affecting all versions up to pip 25.2. A malicious sdist can include links that escape the target directory and overwrite arbitrary files on the invoking host during pip install, enabling arbitrary file overwrite outside the build/extraction directory.

The Makefile's --ignore-vuln flag appears to be a known temporary workaround: pip-audit detects this vulnerability in pip 25.2 itself, as documented in the referenced GitHub issue #13607 ("Current pip 25.2 and CVE-2025-8869 break CI pipelines").

Recommended actions:

  • Upgrade pip to version 25.3 or later where this vulnerability is fixed, or upgrade Python to a version implementing PEP 706
  • If the ignore flag must remain temporarily, document this in the project's security policy explaining it's a known short-term workaround pending pip upgrade
  • Remove the --ignore-vuln flag once pip is upgraded
src/backend/core/api/viewsets/send.py (2)

123-123: LGTM! Correctly migrated to Dramatiq task invocation.

The change from .delay() to .send() correctly reflects Dramatiq's API for enqueuing tasks.


133-133: LGTM! Correctly migrated to Dramatiq message ID.

The change from task.id to task.message_id correctly reflects Dramatiq's Message object API.

src/backend/core/tests/api/test_send_message_signature.py (2)

121-121: LGTM! Test mocks correctly migrated to Dramatiq API.

All mock return value assignments have been correctly updated from .delay.return_value to .send.return_value, aligning with Dramatiq's task invocation API.

Also applies to: 161-161, 199-199, 244-244, 295-295, 336-336, 369-369, 404-404


353-355: LGTM! Test assertions correctly migrated to Dramatiq API.

All assertion calls have been correctly updated from mock_task.delay.assert_called_once_with() to mock_task.send.assert_called_once_with(), ensuring tests verify the correct Dramatiq invocation method.

Also applies to: 388-390, 422-424

src/backend/core/tests/importer/test_file_import.py (2)

110-110: LGTM! Test mock correctly migrated to Dramatiq API.

The patch target has been correctly updated from process_eml_file_task.delay to process_eml_file_task.send, aligning with Dramatiq's task invocation API.


186-186: LGTM! Docstring updated to be framework-agnostic.

The docstring has been appropriately updated to remove the Celery-specific reference, making it more generic and aligned with the Dramatiq migration.

src/backend/core/templates/admin/index.html (1)

11-11: No issues found—the URL is properly registered.

The 'admin:dramatiq_dashboard' URL is correctly registered in the admin site's get_urls() method at line 761 of src/backend/core/admin.py. The view is protected with @staff_member_required, and the URL is accessible via the admin namespace as referenced in the template.

docs/env.md (1)

63-63: LGTM!

Documentation correctly updated to reflect the Dramatiq broker configuration.

src/backend/core/signals.py (3)

48-66: LGTM!

The migration from .delay() to .send() correctly follows Dramatiq's task invocation pattern. Error handling is preserved.


68-86: LGTM!

Task invocation correctly updated to use Dramatiq's .send() method.


88-105: LGTM!

Thread indexing task invocation properly migrated to Dramatiq's .send() pattern.

src/backend/core/tests/conftest.py (1)

5-28: LGTM!

The Dramatiq worker fixtures are well-structured with proper lifecycle management. The broker is flushed to ensure test isolation, and the worker is properly started and stopped.

src/backend/core/management/commands/worker.py (1)

1-10: LGTM!

The custom worker command properly extends RunDramatiqCommand and overrides task module discovery to use core.worker_setup for proper Django initialization.

src/backend/core/tests/importer/test_import_service.py (7)

136-151: LGTM!

Test correctly updated to mock Dramatiq's .send() method instead of Celery's .delay().


233-251: LGTM!

Task invocation mock properly migrated to use .send() for Dramatiq.


336-354: LGTM!

MBOX task mock correctly updated to use Dramatiq's .send() pattern.


357-377: LGTM!

Mock consistently uses .send() for Dramatiq task invocation.


466-487: LGTM!

IMAP import task mock properly updated for Dramatiq.


497-521: LGTM!

All task mocks in this test file consistently use Dramatiq's .send() method.


543-566: LGTM!

Error handling test correctly mocks .send() to verify task error scenarios.

src/backend/core/tests/api/test_messages_import.py (5)

140-154: LGTM!

Test properly updated to use the Dramatiq worker fixture and synchronously wait for task completion via worker.join(). This ensures the assertions verify the actual task results.


186-202: LGTM!

Mock correctly patches Dramatiq's .send() method for async testing.


217-237: LGTM!

IMAP task mock properly updated to use .send().


330-362: LGTM!

Duplicate import test correctly mocks .send() for both import attempts.


389-424: LGTM!

All task mocks consistently use Dramatiq's .send() pattern throughout the test suite.

src/backend/core/management/commands/run_task.py (3)

1-30: LGTM!

Documentation and help text properly updated to reflect Dramatiq terminology and usage patterns.


47-91: LGTM!

Task execution correctly updated to use direct invocation for synchronous execution, replacing Celery's apply() pattern.


92-131: LGTM!

Task discovery logic properly updated to search through core task modules (core.mda.tasks, core.services.search.tasks, core.services.importer.tasks) with a fallback to full module path imports.

src/backend/core/utils.py (1)

14-23: Task registration looks good.

Using actor(store_results=True) is correct for persisting results. Ensure Results middleware is configured in settings so these results are actually stored.

src/backend/core/services/importer/tasks.py (1)

26-31: Good switch to Dramatiq actors and logger.

Decoration via @register_task and using dramatiq.get_logger(name) are appropriate.

src/backend/core/tests/importer/test_imap_import.py (1)

163-171: Form view test correctly patches .send.

This aligns with Dramatiq’s API.

src/backend/core/api/viewsets/task.py (2)

21-23: State list is fine, but “PROGRESS” is custom.

That’s OK for client compatibility. Keep docs in sync with the custom state.


66-123: Result fetch and progress fallback flow reads cleanly.

Once the results backend is verified, this path is solid. Good separation between SUCCESS/FAILURE/PROGRESS/PENDING.

src/backend/core/admin.py (1)

772-773: The custom admin_site is properly wired and will be used.

Verification confirms the CoreAdminSite instance is created at the specified lines and correctly imported in messages/urls.py, where it's wired as path(settings.ADMIN_URL, admin_site.urls). No conflicting django.contrib.admin.site references exist. The custom dashboard will be reachable as intended.

Comment on lines +704 to +752
# Dramatiq Dashboard Integration
@staff_member_required
def dramatiq_dashboard_view(request):
"""Serve the Dramatiq dashboard for staff users only."""
# Get the broker from django-dramatiq
broker = dramatiq.get_broker()

# Create the dashboard app
dashboard_app = dramatiq_dashboard.DashboardApp(broker=broker, prefix="")

# Create a WSGI environment
environ = {
"REQUEST_METHOD": request.method,
"PATH_INFO": request.path_info,
"QUERY_STRING": request.META.get("QUERY_STRING", ""),
"CONTENT_TYPE": request.META.get("CONTENT_TYPE", ""),
"CONTENT_LENGTH": request.META.get("CONTENT_LENGTH", ""),
"HTTP_HOST": request.META.get("HTTP_HOST", ""),
"SERVER_NAME": request.META.get("SERVER_NAME", ""),
"SERVER_PORT": request.META.get("SERVER_PORT", ""),
"wsgi.url_scheme": request.scheme,
"wsgi.input": request,
"wsgi.errors": request,
"wsgi.version": (1, 0),
"wsgi.multithread": True,
"wsgi.multiprocess": False,
"wsgi.run_once": False,
}

# Add HTTP headers
for key, value in request.META.items():
if key.startswith("HTTP_"):
environ[key] = value

# Call the dashboard app
def start_response(status, response_headers): # pylint: disable=unused-argument
# This will be called by the WSGI app
pass

# Get the response from the dashboard
response_body = dashboard_app(environ, start_response)

# Create Django response
response = HttpResponse(b"".join(response_body))
response.status_code = 200
response["Content-Type"] = "text/html; charset=utf-8"

return response

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

WSGI bridge ignores status/headers and sets an invalid wsgi.input; prefix likely wrong.

Capture status/headers from start_response, build environ from request.META with a real byte stream, and set DashboardApp prefix to the mounted path for correct links.

 @staff_member_required
 def dramatiq_dashboard_view(request):
@@
-    dashboard_app = dramatiq_dashboard.DashboardApp(broker=broker, prefix="")
+    # Mount under the admin route so asset/links resolve
+    mount_path = reverse("admin:dramatiq_dashboard").rstrip("/")
+    dashboard_app = dramatiq_dashboard.DashboardApp(broker=broker, prefix=mount_path)
@@
-    environ = {
-        "REQUEST_METHOD": request.method,
-        "PATH_INFO": request.path_info,
-        "QUERY_STRING": request.META.get("QUERY_STRING", ""),
-        "CONTENT_TYPE": request.META.get("CONTENT_TYPE", ""),
-        "CONTENT_LENGTH": request.META.get("CONTENT_LENGTH", ""),
-        "HTTP_HOST": request.META.get("HTTP_HOST", ""),
-        "SERVER_NAME": request.META.get("SERVER_NAME", ""),
-        "SERVER_PORT": request.META.get("SERVER_PORT", ""),
-        "wsgi.url_scheme": request.scheme,
-        "wsgi.input": request,
-        "wsgi.errors": request,
-        "wsgi.version": (1, 0),
-        "wsgi.multithread": True,
-        "wsgi.multiprocess": False,
-        "wsgi.run_once": False,
-    }
+    # Start from Django's META and fix required keys
+    import io
+    environ = dict(request.META)
+    environ.setdefault("REQUEST_METHOD", request.method)
+    environ["PATH_INFO"] = request.path_info
+    environ.setdefault("QUERY_STRING", "")
+    environ["SCRIPT_NAME"] = mount_path
+    environ["wsgi.url_scheme"] = request.scheme
+    environ["wsgi.input"] = io.BytesIO(request.body or b"")
+    environ["wsgi.errors"] = io.BytesIO()
+    environ["wsgi.version"] = (1, 0)
+    environ["wsgi.multithread"] = True
+    environ["wsgi.multiprocess"] = False
+    environ["wsgi.run_once"] = False
@@
-    # Add HTTP headers
-    for key, value in request.META.items():
-        if key.startswith("HTTP_"):
-            environ[key] = value
-
-    # Call the dashboard app
-    def start_response(status, response_headers):  # pylint: disable=unused-argument
-        # This will be called by the WSGI app
-        pass
+    # Call the dashboard app
+    status_holder = {"status": "200 OK", "headers": []}
+    def start_response(status, response_headers, exc_info=None):
+        status_holder["status"] = status
+        status_holder["headers"] = response_headers
+        return lambda x: None
@@
-    response_body = dashboard_app(environ, start_response)
-
-    # Create Django response
-    response = HttpResponse(b"".join(response_body))
-    response.status_code = 200
-    response["Content-Type"] = "text/html; charset=utf-8"
+    response_iter = dashboard_app(environ, start_response)
+    body = b"".join(response_iter)
+    status_code = int(status_holder["status"].split()[0])
+    response = HttpResponse(body, status=status_code)
+    for (k, v) in status_holder["headers"]:
+        # Let Django manage Content-Length
+        if k.lower() != "content-length":
+            response[k] = v
@@
     return response

Comment on lines +70 to +79
broker = dramatiq.get_broker()
result_backend = broker.get_results_backend()

# By default unknown tasks will be in PENDING. There is no reliable
# way to check if a task exists or not with Celery.
# https://github.com/celery/celery/issues/3596#issuecomment-262102185
if result_backend is not None:
try:
# Retrieve a Message for this task id from the backend, then get result
# See Dramatiq results API: message.get_result(...)
message = result_backend.get_message(task_id)
result = message.get_result(backend=result_backend, block=False)
return Response(
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

Verify results-backend access pattern.

broker.get_results_backend() and backend.get_message(...) must match your configured middleware/version. If not available, fetch the Results middleware and read its backend instead.


🌐 Web query:

In Dramatiq 1.18, what is the supported way to retrieve the results backend at runtime from a Broker, and does RedisBackend expose get_message(message_id)?

💡 Result:

  • Supported way: call Broker.get_results_backend() to fetch the Results middleware backend at runtime (raises RuntimeError if none). [1]

  • RedisBackend: it does not expose a get_message(message_id) API. The RedisBackend implements get_result(message) (and lower-level _get/_store) and uses a message object to build the storage key, so you must provide a Message (or use message.get_result / the backend.get_result APIs) rather than a standalone get_message(message_id). [1][2]

Sources: [1] Dramatiq Broker API (get_results_backend). [2] Dramatiq RedisBackend / results backend (get_result, build_message_key, _get/_store).


Fix incorrect RedisBackend API usage—get_message(task_id) doesn't exist.

RedisBackend does not expose a get_message(message_id) API. The code at lines 70-79 calls result_backend.get_message(task_id), which will fail at runtime. You must provide a Message object to use backend.get_result(message) or message.get_result() instead of trying to fetch messages by task_id alone.

Retrieve the Message from Dramatiq's actor system or message store, then pass it to the backend's get_result() method.

🤖 Prompt for AI Agents
In src/backend/core/api/viewsets/task.py around lines 70 to 79, the code calls
result_backend.get_message(task_id) which is not part of RedisBackend's API and
will fail; replace that call by using the backend's result retrieval API
directly (e.g. call result_backend.get_result(task_id) if the backend supports
fetching by id) or, if the backend requires a Message object, obtain/create the
dramatiq Message instance representing this task (from the actor/message store
or the original enqueue flow) and pass that Message into
result_backend.get_result(message) (or call
message.get_result(backend=result_backend)). Update imports and error handling
accordingly.

Comment on lines +47 to +63
if must_archive:
try:
thread = message.thread
models.Message.objects.filter(thread=thread).update(
is_archived=True, archived_at=timezone.now()
)
thread.update_stats()
set_task_progress(90, {"message": "Thread archived"})
except Exception as e:
# Not critical, just log the error
logger.exception(
"Error in send_message_task when archiving thread %s after sending message %s: %s",
thread.id,
message_id,
e,
)

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Make archiving error logging safe

If message.thread is None or missing .id, the logger.exception(... thread.id ...) will itself error. Use a safe fallback.

-        try:
-            thread = message.thread
+        try:
+            thread = message.thread
             models.Message.objects.filter(thread=thread).update(
                 is_archived=True, archived_at=timezone.now()
             )
             thread.update_stats()
             set_task_progress(90, {"message": "Thread archived"})
         except Exception as e:
             # Not critical, just log the error
-            logger.exception(
-                "Error in send_message_task when archiving thread %s after sending message %s: %s",
-                thread.id,
-                message_id,
-                e,
-            )
+            thread_id = getattr(thread, "id", "unknown")
+            logger.exception(
+                "Error archiving thread %s after sending message %s: %s",
+                thread_id,
+                message_id,
+                e,
+            )
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if must_archive:
try:
thread = message.thread
models.Message.objects.filter(thread=thread).update(
is_archived=True, archived_at=timezone.now()
)
thread.update_stats()
set_task_progress(90, {"message": "Thread archived"})
except Exception as e:
# Not critical, just log the error
logger.exception(
"Error in send_message_task when archiving thread %s after sending message %s: %s",
thread.id,
message_id,
e,
)
if must_archive:
try:
thread = message.thread
models.Message.objects.filter(thread=thread).update(
is_archived=True, archived_at=timezone.now()
)
thread.update_stats()
set_task_progress(90, {"message": "Thread archived"})
except Exception as e:
# Not critical, just log the error
thread_id = getattr(thread, "id", "unknown")
logger.exception(
"Error archiving thread %s after sending message %s: %s",
thread_id,
message_id,
e,
)
🤖 Prompt for AI Agents
In src/backend/core/mda/tasks.py around lines 47 to 63, the exception logging
accesses thread.id directly which can itself raise if thread is None or thread
was never assigned; change the except block to compute a safe thread_id before
logging (e.g., thread_obj = locals().get("thread") or getattr(message, "thread",
None); thread_id = getattr(thread_obj, "id", "<no-thread>")) and then pass that
thread_id (and message_id) to logger.exception instead of thread.id so the
logging call cannot fail.

Comment on lines +217 to 224
result = {
"success": True,
"message_id": str(message_id),
"recipients_processed": success_count,
"processed_messages": processed_count,
"success_count": success_count,
"error_count": error_count,
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Result field name is misleading

success_count counts messages, not recipients. Rename recipients_processed to avoid confusion.

-        result = {
+        result = {
             "success": True,
             "message_id": str(message_id),
-            "recipients_processed": success_count,
+            "messages_retried": success_count,
             "processed_messages": processed_count,
             "success_count": success_count,
             "error_count": error_count,
         }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
result = {
"success": True,
"message_id": str(message_id),
"recipients_processed": success_count,
"processed_messages": processed_count,
"success_count": success_count,
"error_count": error_count,
}
result = {
"success": True,
"message_id": str(message_id),
"messages_retried": success_count,
"processed_messages": processed_count,
"success_count": success_count,
"error_count": error_count,
}
🤖 Prompt for AI Agents
In src/backend/core/mda/tasks.py around lines 217 to 224, the field name
recipients_processed is misleading because success_count counts messages (not
recipients); rename recipients_processed to recipients_attempted (or
recipients_total if that better matches semantics) in the result dict and update
any callers/tests/docs that reference the old key so they read the new key and
maintain backwards compatibility if required (e.g., map old key to new or bump
API contract).

Comment on lines +446 to 456
set_task_progress(
None,
{
"message_status": message_status,
"total_messages": total_messages,
"success_count": success_count,
"failure_count": failure_count,
"type": "imap",
"current_message": current_message,
},
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

Verify that None is valid for the progress parameter.

The call to set_task_progress(None, {...}) passes None as the progress percentage. According to the function signature in src/backend/core/utils.py (lines 38-56), the first parameter is progress: int with an expected range of 0-100. Passing None may cause issues unless the implementation explicitly handles it.

Run the following script to check how set_task_progress handles None:


🏁 Script executed:

#!/bin/bash
# Description: Check the implementation of set_task_progress to verify None handling

# Extract the full function implementation
ast-grep --pattern $'def set_task_progress($$$):
  $$$'

Length of output: 0





---

🏁 Script executed:

```shell
#!/bin/bash
# Search for set_task_progress function definition
rg -n "def set_task_progress" --type=py -B 2 -A 15

Length of output: 1098


🏁 Script executed:

#!/bin/bash
# Also search for calls to set_task_progress to understand usage patterns
rg -n "set_task_progress\(" --type=py -B 2 -A 2 | head -50

Length of output: 3168


🏁 Script executed:

#!/bin/bash
# Check the actual file mentioned in the review: src/backend/core/utils.py lines 38-56
fd "utils.py" --type=f | xargs -I {} sh -c 'echo "File: {}"; sed -n "30,65p" "{}"'

Length of output: 3249


Pass a valid progress percentage (0-100) instead of None at line 446.

The function signature at src/backend/core/utils.py:39 explicitly declares progress: int with no Optional type hint, and the docstring documents the expected range as 0-100. The implementation stores this value directly without None handling. Passing None violates the contract and is inconsistent with all other call sites in the codebase (which pass 0, 5, 10, 15, 20, or calculated percentages). Determine the appropriate progress value for this context—if progress cannot be determined, consider skipping the call or using a placeholder value like 0.

🤖 Prompt for AI Agents
In src/backend/core/services/importer/imap.py around lines 446 to 456, the call
to set_task_progress currently passes None for the progress parameter which
violates the function contract (expects an int 0-100). Replace the None with a
valid integer progress value — preferably compute a percentage like
int((current_message / max(1, total_messages)) * 100) capped between 0 and 100,
or if a meaningful percentage cannot be determined use 0 as a safe placeholder;
alternatively, omit the call entirely when progress is indeterminate. Ensure the
value passed is an int in the 0–100 range before calling set_task_progress.

Comment on lines +581 to 587
DRAMATIQ_AUTODISCOVER_MODULES = [
"tasks"
] # services.search.tasks", "services.importer.tasks"]

DISABLE_DRAMATIQ_SCHEDULE = values.BooleanValue(
default=False, environ_name="DISABLE_DRAMATIQ_SCHEDULE", environ_prefix=None
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Autodiscover nested task modules

Tasks live in core/mda/tasks.py and core/services/search/tasks.py. With only "tasks", django‑dramatiq won’t import these modules and cron tasks won’t be registered.

-    DRAMATIQ_AUTODISCOVER_MODULES = [
-        "tasks"
-    ]  # services.search.tasks", "services.importer.tasks"]
+    DRAMATIQ_AUTODISCOVER_MODULES = [
+        "tasks",
+        "mda.tasks",
+        "services.search.tasks",
+    ]
🤖 Prompt for AI Agents
In src/backend/messages/settings.py around lines 581 to 587,
DRAMATIQ_AUTODISCOVER_MODULES currently only contains "tasks" which prevents
django-dramatiq from importing nested modules like core.mda.tasks and
core.services.search.tasks; update the list to include the full dotted module
paths for those task modules (e.g. "core.mda.tasks",
"core.services.search.tasks") alongside "tasks" so dramatiq will import and
register the cron tasks.

Comment on lines +946 to +960
DRAMATIQ_BROKER = {
"BROKER": "dramatiq.brokers.stub.StubBroker",
"OPTIONS": {},
"MIDDLEWARE": [
"dramatiq.middleware.AgeLimit",
"dramatiq.middleware.TimeLimit",
"dramatiq.middleware.Callbacks",
"dramatiq.middleware.Retries",
"dramatiq.middleware.CurrentMessage",
"dramatiq.middleware.Result",
"django_dramatiq.middleware.DbConnectionsMiddleware",
"django_dramatiq.middleware.AdminMiddleware",
],
}

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Fix middleware path in DevelopmentMinimal

"dramatiq.middleware.Result" is invalid. Use "dramatiq.results.Results".

         "MIDDLEWARE": [
             "dramatiq.middleware.AgeLimit",
             "dramatiq.middleware.TimeLimit",
             "dramatiq.middleware.Callbacks",
             "dramatiq.middleware.Retries",
             "dramatiq.middleware.CurrentMessage",
-            "dramatiq.middleware.Result",
+            "dramatiq.results.Results",
             "django_dramatiq.middleware.DbConnectionsMiddleware",
             "django_dramatiq.middleware.AdminMiddleware",
         ],
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
DRAMATIQ_BROKER = {
"BROKER": "dramatiq.brokers.stub.StubBroker",
"OPTIONS": {},
"MIDDLEWARE": [
"dramatiq.middleware.AgeLimit",
"dramatiq.middleware.TimeLimit",
"dramatiq.middleware.Callbacks",
"dramatiq.middleware.Retries",
"dramatiq.middleware.CurrentMessage",
"dramatiq.middleware.Result",
"django_dramatiq.middleware.DbConnectionsMiddleware",
"django_dramatiq.middleware.AdminMiddleware",
],
}
DRAMATIQ_BROKER = {
"BROKER": "dramatiq.brokers.stub.StubBroker",
"OPTIONS": {},
"MIDDLEWARE": [
"dramatiq.middleware.AgeLimit",
"dramatiq.middleware.TimeLimit",
"dramatiq.middleware.Callbacks",
"dramatiq.middleware.Retries",
"dramatiq.middleware.CurrentMessage",
"dramatiq.results.Results",
"django_dramatiq.middleware.DbConnectionsMiddleware",
"django_dramatiq.middleware.AdminMiddleware",
],
}
🤖 Prompt for AI Agents
In src/backend/messages/settings.py around lines 946 to 960, the DRAMATIQ_BROKER
MIDDLEWARE list contains an incorrect middleware path
"dramatiq.middleware.Result"; replace it with the correct class path
"dramatiq.results.Results" so the Results middleware is properly loaded (update
the string in the MIDDLEWARE array accordingly).

Comment on lines +980 to +993
DRAMATIQ_BROKER = {
"BROKER": "dramatiq.brokers.stub.StubBroker",
"OPTIONS": {},
"MIDDLEWARE": [
"dramatiq.middleware.AgeLimit",
"dramatiq.middleware.TimeLimit",
"dramatiq.middleware.Callbacks",
"dramatiq.middleware.Retries",
"dramatiq.middleware.CurrentMessage",
# "dramatiq.results.middleware.Results",
"django_dramatiq.middleware.DbConnectionsMiddleware",
"django_dramatiq.middleware.AdminMiddleware",
],
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

Include Results middleware in Test env too

For parity with Base and to exercise SUCCESS/FAILURE in tests, enable Results middleware.

         "MIDDLEWARE": [
             "dramatiq.middleware.AgeLimit",
             "dramatiq.middleware.TimeLimit",
             "dramatiq.middleware.Callbacks",
             "dramatiq.middleware.Retries",
             "dramatiq.middleware.CurrentMessage",
-            # "dramatiq.results.middleware.Results",
+            "dramatiq.results.Results",
             "django_dramatiq.middleware.DbConnectionsMiddleware",
             "django_dramatiq.middleware.AdminMiddleware",
         ],

🏁 Script executed:

#!/bin/bash
# Find all Django settings files to understand the configuration structure
fd -t f -e py 'settings' src/backend/messages/ | head -20

Length of output: 32


🏁 Script executed:

#!/bin/bash
# Search for DRAMATIQ_BROKER configurations across the codebase
rg -n "DRAMATIQ_BROKER" src/backend/ -A 15

Length of output: 3897


🏁 Script executed:

#!/bin/bash
# Search for any Results middleware references to verify the correct import path
rg -n "dramatiq\.results" src/backend/ -B 2 -A 2

Length of output: 1611


🏁 Script executed:

#!/bin/bash
# Check if there are any other references to this middleware in tests or configs
rg -n "Results" src/backend/messages/ | grep -i dramatiq

Length of output: 179


</web_search>


🌐 Web query:

Dramatiq Results middleware import path correct module

💡 Result:

Import it from dramatiq.results: use

from dramatiq.results import Results

(e.g. broker.add_middleware(Results(backend=RedisBackend()))). [1][2]


Correct the justification and address environment inconsistency

The suggested import path dramatiq.results.Results is correct per Dramatiq documentation. However, the review's claim of "parity with Base" is incorrect—the Base environment has Results middleware commented out, not enabled. Additionally, the DEV environment at line 946 uses dramatiq.middleware.Result (a different path), creating an inconsistency across environments that must be resolved.

The diff should enable Results middleware in the test environment, but the reasoning and the environmental configuration mismatch need clarification:

         "MIDDLEWARE": [
             "dramatiq.middleware.AgeLimit",
             "dramatiq.middleware.TimeLimit",
             "dramatiq.middleware.Callbacks",
             "dramatiq.middleware.Retries",
             "dramatiq.middleware.CurrentMessage",
-            # "dramatiq.results.middleware.Results",
+            "dramatiq.results.Results",
             "django_dramatiq.middleware.DbConnectionsMiddleware",
             "django_dramatiq.middleware.AdminMiddleware",
         ],

Clarify why Results middleware differs between DEV (dramatiq.middleware.Result) and TEST (dramatiq.results.Results), and confirm whether BASE should also be updated.

🤖 Prompt for AI Agents
In src/backend/messages/settings.py around lines 980 to 993, the Test
environment currently comments/uses Results middleware inconsistently and the
review justification is incorrect: Base has Results commented out, and Dev uses
an incorrect path dramatiq.middleware.Result. Update the Test config to enable
the correct middleware path "dramatiq.results.Results", fix Dev (around line
946) to the same correct path (replace dramatiq.middleware.Result with
dramatiq.results.Results) so both environments are consistent, and either update
Base to the same enabled entry or explicitly document in a brief comment that
Base intentionally leaves Results commented out; also correct the review
justification text to state that Base has Results commented out rather than
enabled.

Comment on lines +15 to +18
from core.admin import admin_site

urlpatterns = [
path(settings.ADMIN_URL, admin.site.urls),
path(settings.ADMIN_URL, admin_site.urls),
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

Verify that all model admins are registered with the custom admin site.

The switch from admin.site.urls to admin_site.urls replaces Django's default admin with a custom admin site instance. Ensure that all existing model admin classes are registered with admin_site rather than the default admin.site, otherwise they won't be accessible in the admin interface.

Run the following script to verify admin registrations:


🏁 Script executed:

#!/bin/bash
# Description: Find all admin.site.register calls that should use admin_site instead

echo "=== Checking for admin.site.register calls that should use admin_site ==="
rg -n 'admin\.site\.register' --type=py

echo ""
echo "=== Checking admin_site registrations in core/admin.py ==="
rg -n 'admin_site\.register' --type=py src/backend/core/admin.py

Length of output: 133


🏁 Script executed:

#!/bin/bash

echo "=== Checking if core/admin.py exists ==="
fd -t f "admin\.py" src/backend/core/

echo ""
echo "=== Contents of core/admin.py ==="
find src/backend/core -name "admin.py" -exec cat {} \;

echo ""
echo "=== Searching for admin_site definition ==="
rg -n "admin_site\s*=" --type=py -A 3

echo ""
echo "=== Checking all Django ModelAdmin class definitions ==="
rg -n "class.*\(ModelAdmin\|admin\.ModelAdmin\)" --type=py

echo ""
echo "=== Contents of messages/urls.py ==="
cat src/backend/messages/urls.py

Length of output: 24910


Critical issue: Model admins registered with default admin.site instead of custom admin_site.

The @admin.register() decorators throughout core/admin.py (15+ model admin classes) register models with Django's default admin.site. However, messages/urls.py routes to the custom admin_site instance. This means no models will be accessible in the admin interface.

All @admin.register() calls must specify the custom site. Either:

  • Add site=admin_site parameter: @admin.register(models.User, site=admin_site)
  • Or use explicit registration: admin_site.register(models.User, UserAdmin)

src/backend/core/admin.py: Lines containing @admin.register() decorators (affecting User, MailDomain, Mailbox, Channel, MailboxAccess, Thread, Attachment, Message, Contact, MessageRecipient, Label, Blob, MailDomainAccess, DKIMKey, MessageTemplate admins)

🤖 Prompt for AI Agents
In src/backend/messages/urls.py around lines 15 to 18 and affecting
src/backend/core/admin.py decorator lines, the project uses a custom admin_site
but the model admins in core/admin.py are currently registered with Django's
default admin.site via @admin.register(), so those models won't appear under the
custom admin_site; update each @admin.register(...) decorator to include
site=admin_site (e.g. @admin.register(models.YourModel, site=admin_site)) or
remove the decorator and explicitly call admin_site.register(models.YourModel,
YourModelAdmin) for each listed admin (User, MailDomain, Mailbox, Channel,
MailboxAccess, Thread, Attachment, Message, Contact, MessageRecipient, Label,
Blob, MailDomainAccess, DKIMKey, MessageTemplate) and ensure core/admin.py
imports admin_site from core.admin so registrations target the custom admin
instance.

"django-celery-results==2.6.0",
"django-dramatiq==0.14.0",
"dramatiq[redis]==1.18.0",
"dramatiq-dashboard@git+https://github.com/dbowring/dramatiq_dashboard@128ee38e2b5efc36c4ae964a934cffc41baaf896",
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Consider supply chain risks with Git-based dependency.

Installing dramatiq-dashboard directly from a Git commit hash introduces supply chain and maintainability concerns:

  • Not auditable via standard package security tools (pip-audit)
  • Difficult to track for security updates or CVEs
  • Commit hash is not a versioned release
  • Repository ownership (dbowring/dramatiq_dashboard) should be verified

Consider these alternatives:

  1. If an official PyPI package exists, use it with a pinned version
  2. Fork the repository to your organization's control and install from there
  3. Vendor the code directly into your repository

To verify if a PyPI package exists:

Is dramatiq-dashboard available on PyPI?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants