diff --git a/env.d/development/backend.defaults b/env.d/development/backend.defaults index a7240c0c..1ddc819d 100644 --- a/env.d/development/backend.defaults +++ b/env.d/development/backend.defaults @@ -74,8 +74,8 @@ MDA_API_SECRET=my-shared-secret-mda SALT_KEY=ThisIsAnExampleSaltForDevPurposeOnly # AI -AI_BASE_URL= -AI_API_KEY= -AI_MODEL= +AI_BASE_URL=https://albert.api.etalab.gouv.fr/v1 +AI_API_KEY= +AI_MODEL=albert-large AI_FEATURE_SUMMARY_ENABLED=False \ No newline at end of file diff --git a/src/backend/core/api/viewsets/thread.py b/src/backend/core/api/viewsets/thread.py index e635769d..666325c1 100644 --- a/src/backend/core/api/viewsets/thread.py +++ b/src/backend/core/api/viewsets/thread.py @@ -1,5 +1,7 @@ """API ViewSet for Thread model.""" +import logging + from django.conf import settings from django.db.models import Count, Exists, OuterRef, Q @@ -10,7 +12,8 @@ OpenApiResponse, extend_schema, ) -from rest_framework import mixins, status, viewsets +from rest_framework import mixins, viewsets, status +from rest_framework.response import Response from core import enums, models from core.ai.thread_summarizer import summarize_thread @@ -18,6 +21,8 @@ from .. import permissions, serializers +logger = logging.getLogger(__name__) + class ThreadViewSet( viewsets.GenericViewSet, @@ -350,16 +355,125 @@ def stats(self, request): location=OpenApiParameter.QUERY, description="Filter threads that are spam (1=true, 0=false).", ), + OpenApiParameter( + name="message_ids", + type=OpenApiTypes.STR, + location=OpenApiParameter.QUERY, + description="Comma-separated list of message IDs to filter threads by specific messages (used by AI search).", + ), ], ) def list(self, request, *args, **kwargs): """List threads with optional search functionality.""" search_query = request.query_params.get("search", "").strip() + message_ids = request.query_params.get("message_ids", "").strip() + + # Debug: Log what path we're taking + print(f"DEBUG THREAD API START: search_query='{search_query}', message_ids='{message_ids}'") + + # Check if we have specific message IDs from deep search (without search query) + if message_ids and not search_query: + print(f"DEBUG THREAD API: Taking message_ids ONLY path (no search query)") + mailbox_id = request.query_params.get("mailbox_id") + + # Parse message IDs from comma-separated string + try: + message_id_list = [mid.strip() for mid in message_ids.split(",") if mid.strip()] + print(f"DEBUG: Parsed message_id_list: {message_id_list}") + print(f"DEBUG: Number of message IDs: {len(message_id_list)}") + + # Check if this is the special "no results" UUID from contextual search + if len(message_id_list) == 1 and message_id_list[0] == "00000000-0000-0000-0000-000000000000": + print("DEBUG: Detected contextual search empty results UUID - showing no emails as intended") + # Return empty result + page = self.paginate_queryset([]) + if page is not None: + serializer = self.get_serializer(page, many=True) + return self.get_paginated_response(serializer.data) + return drf.response.Response([]) + + # Get threads that contain these messages + threads_with_messages = models.Thread.objects.filter( + messages__id__in=message_id_list + ).distinct() + + print(f"DEBUG: Found {threads_with_messages.count()} threads with these messages") + + # Apply additional filters from query parameters (mailbox, etc.) + queryset = self.get_queryset().filter(id__in=threads_with_messages) + + print(f"DEBUG: After applying additional filters: {queryset.count()} threads") + + # Use the paginator to create a paginated response + page = self.paginate_queryset(queryset) + if page is not None: + serializer = self.get_serializer(page, many=True) + print(f"DEBUG: Returning paginated response with {len(page)} threads") + return self.get_paginated_response(serializer.data) + + serializer = self.get_serializer(queryset, many=True) + print(f"DEBUG: Returning non-paginated response with {queryset.count()} threads") + return drf.response.Response(serializer.data) + + except Exception as e: + logger.error(f"Error processing message_ids: {e}") + print(f"DEBUG: Exception in message_ids processing: {e}") + # Fall back to regular search if message_ids processing fails # If search is provided and OpenSearch is available, use it if search_query and len(settings.OPENSEARCH_HOSTS[0]) > 0: # Get the mailbox_id for filtering mailbox_id = request.query_params.get("mailbox_id") + message_ids = request.query_params.get("message_ids", "").strip() + + # Debug: Print all received parameters + print(f"DEBUG THREAD API: search_query='{search_query}', message_ids='{message_ids}'") + print(f"DEBUG THREAD API: All query params: {dict(request.query_params)}") + print(f"DEBUG THREAD API: Taking search+message_ids path") + + # Debug: Print the received message_ids parameter + if message_ids: + print(f"DEBUG: Received message_ids parameter: '{message_ids}'") + + # Check if we have specific message IDs from deep search (RAG) + if message_ids: + # Parse message IDs from comma-separated string + try: + message_id_list = [mid.strip() for mid in message_ids.split(",") if mid.strip()] + print(f"DEBUG: Parsed message_id_list: {message_id_list}") + print(f"DEBUG: Number of message IDs: {len(message_id_list)}") + + # Check if this is the special "no results" UUID from contextual search + if len(message_id_list) == 1 and message_id_list[0] == "00000000-0000-0000-0000-000000000000": + print("DEBUG: Detected contextual search empty results UUID - showing no emails as intended") + + # Get threads that contain these messages + threads_with_messages = models.Thread.objects.filter( + messages__id__in=message_id_list + ).distinct() + + print(f"DEBUG: Found {threads_with_messages.count()} threads with these messages") + + # Apply additional filters from query parameters (mailbox, etc.) + queryset = self.get_queryset().filter(id__in=threads_with_messages) + + print(f"DEBUG: After applying additional filters: {queryset.count()} threads") + + # Use the paginator to create a paginated response + page = self.paginate_queryset(queryset) + if page is not None: + serializer = self.get_serializer(page, many=True) + print(f"DEBUG: Returning paginated response with {len(page)} threads") + return self.get_paginated_response(serializer.data) + + serializer = self.get_serializer(queryset, many=True) + print(f"DEBUG: Returning non-paginated response with {queryset.count()} threads") + return drf.response.Response(serializer.data) + + except Exception as e: + logger.error(f"Error processing message_ids: {e}") + print(f"DEBUG: Exception in message_ids processing: {e}") + # Fall back to regular search if message_ids processing fails # Build filters from query parameters # TODO: refactor as thread filters are not the same as message filters (has_messages, has_active) @@ -410,6 +524,7 @@ def list(self, request, *args, **kwargs): return drf.response.Response(serializer.data) # Fall back to regular DB query if no search query or OpenSearch not available + print(f"DEBUG THREAD API: Taking regular DB query path (no search or no OpenSearch)") return super().list(request, *args, **kwargs) @extend_schema( diff --git a/src/backend/core/management/__init__.py b/src/backend/core/management/__init__.py new file mode 100644 index 00000000..82b545de --- /dev/null +++ b/src/backend/core/management/__init__.py @@ -0,0 +1 @@ +# Django management commands package diff --git a/src/backend/core/management/commands/__init__.py b/src/backend/core/management/commands/__init__.py index e69de29b..e980e64d 100644 --- a/src/backend/core/management/commands/__init__.py +++ b/src/backend/core/management/commands/__init__.py @@ -0,0 +1 @@ +# Django management commands diff --git a/src/backend/core/management/commands/repair_attachments.py b/src/backend/core/management/commands/repair_attachments.py new file mode 100644 index 00000000..27d09c56 --- /dev/null +++ b/src/backend/core/management/commands/repair_attachments.py @@ -0,0 +1,258 @@ +#!/usr/bin/env python3 +""" +Django management command to repair broken Many-to-Many relationships between Messages and Attachments. + +This script fixes the issue where: +- Attachments exist in the database +- Messages have has_attachments=True flag set +- But the M2M intermediate table is empty, so message.attachments.all() returns nothing + +Usage: + python manage.py repair_attachments [--dry-run] [--mailbox-id UUID] +""" + +from django.core.management.base import BaseCommand, CommandError +from django.db import transaction +from core.models import Message, Attachment, Mailbox +from typing import List, Dict, Any +import logging + +logger = logging.getLogger(__name__) + + +class Command(BaseCommand): + help = 'Repair broken Many-to-Many relationships between Messages and Attachments' + + def add_arguments(self, parser): + parser.add_argument( + '--dry-run', + action='store_true', + help='Show what would be done without making changes', + ) + parser.add_argument( + '--mailbox-id', + type=str, + help='Only repair attachments for a specific mailbox (UUID)', + ) + parser.add_argument( + '--verbose', + action='store_true', + help='Enable verbose logging', + ) + + def handle(self, *args, **options): + """Main command handler.""" + + # Set up logging + if options['verbose']: + logging.basicConfig(level=logging.INFO) + + dry_run = options['dry_run'] + mailbox_id = options['mailbox_id'] + + self.stdout.write("=" * 60) + self.stdout.write("🔧 ATTACHMENT REPAIR SCRIPT") + self.stdout.write("=" * 60) + + if dry_run: + self.stdout.write(self.style.WARNING("DRY RUN MODE - No changes will be made")) + + try: + # Step 1: Analyze the problem + self.stdout.write("\n📊 ANALYZING ATTACHMENT PROBLEM...") + analysis = self.analyze_attachment_problem(mailbox_id) + self.print_analysis(analysis) + + if not analysis['attachments']: + self.stdout.write(self.style.ERROR("❌ No attachments found to repair")) + return + + if not analysis['messages_with_flag']: + self.stdout.write(self.style.ERROR("❌ No messages with has_attachments=True found")) + return + + # Step 2: Create repair plan + self.stdout.write("\n🔍 CREATING REPAIR PLAN...") + repair_plan = self.create_repair_plan(analysis) + self.print_repair_plan(repair_plan) + + if not repair_plan['links_to_create']: + self.stdout.write(self.style.WARNING("⚠️ No attachment links need to be created")) + return + + # Step 3: Execute repairs + if not dry_run: + self.stdout.write("\n🔧 EXECUTING REPAIRS...") + results = self.execute_repairs(repair_plan) + self.print_results(results) + else: + self.stdout.write(self.style.WARNING("\n⚠️ DRY RUN - Repairs not executed")) + + self.stdout.write(self.style.SUCCESS("\n✅ Attachment repair completed!")) + + except Exception as e: + self.stdout.write(self.style.ERROR(f"❌ Error during repair: {e}")) + raise CommandError(f"Repair failed: {e}") + + def analyze_attachment_problem(self, mailbox_id: str = None) -> Dict[str, Any]: + """Analyze the current state of attachments and messages.""" + + # Filter by mailbox if specified + attachment_filter = {} + message_filter = {} + + if mailbox_id: + try: + mailbox = Mailbox.objects.get(id=mailbox_id) + attachment_filter['mailbox'] = mailbox + message_filter['thread__accesses__mailbox'] = mailbox + self.stdout.write(f"🎯 Filtering by mailbox: {mailbox}") + except Mailbox.DoesNotExist: + raise CommandError(f"Mailbox {mailbox_id} not found") + + # Get all attachments + attachments = list(Attachment.objects.filter(**attachment_filter).select_related('mailbox', 'blob')) + + # Get messages with has_attachments=True + messages_with_flag = list(Message.objects.filter( + has_attachments=True, + **message_filter + ).select_related('thread', 'sender').prefetch_related('attachments')) + + # Count current M2M links + total_m2m_links = 0 + orphaned_attachments = [] + + for attachment in attachments: + linked_messages_count = attachment.messages.count() + total_m2m_links += linked_messages_count + + if linked_messages_count == 0: + orphaned_attachments.append(attachment) + + return { + 'attachments': attachments, + 'messages_with_flag': messages_with_flag, + 'orphaned_attachments': orphaned_attachments, + 'total_m2m_links': total_m2m_links, + 'mailbox_filter': mailbox_id + } + + def print_analysis(self, analysis: Dict[str, Any]): + """Print the analysis results.""" + + self.stdout.write(f"📊 CURRENT STATE:") + self.stdout.write(f" • Total attachments: {len(analysis['attachments'])}") + self.stdout.write(f" • Messages with has_attachments=True: {len(analysis['messages_with_flag'])}") + self.stdout.write(f" • Orphaned attachments (no M2M links): {len(analysis['orphaned_attachments'])}") + self.stdout.write(f" • Total M2M links: {analysis['total_m2m_links']}") + + if analysis['mailbox_filter']: + self.stdout.write(f" • Filtered by mailbox: {analysis['mailbox_filter']}") + + # Show sample orphaned attachments + if analysis['orphaned_attachments']: + self.stdout.write(f"\n🔍 ORPHANED ATTACHMENTS:") + for att in analysis['orphaned_attachments'][:5]: + self.stdout.write(f" • {att.name} (ID: {att.id}, Mailbox: {att.mailbox})") + if len(analysis['orphaned_attachments']) > 5: + remaining = len(analysis['orphaned_attachments']) - 5 + self.stdout.write(f" • ... and {remaining} more") + break + + def create_repair_plan(self, analysis: Dict[str, Any]) -> Dict[str, Any]: + """Create a plan for repairing the M2M relationships.""" + + links_to_create = [] + + # For each orphaned attachment, find candidate messages in the same mailbox + for attachment in analysis['orphaned_attachments']: + # Find messages in the same mailbox that have has_attachments=True + candidate_messages = [ + msg for msg in analysis['messages_with_flag'] + if any(access.mailbox == attachment.mailbox for access in msg.thread.accesses.all()) + ] + + if candidate_messages: + # Strategy: Link to the most recent message with has_attachments=True + # This is a reasonable heuristic since attachments are usually linked to recent messages + most_recent_message = max(candidate_messages, key=lambda m: m.created_at) + + links_to_create.append({ + 'attachment': attachment, + 'message': most_recent_message, + 'reasoning': f"Most recent message with has_attachments=True in mailbox {attachment.mailbox}" + }) + + return { + 'links_to_create': links_to_create, + 'strategy': 'Link orphaned attachments to most recent message with has_attachments=True in same mailbox' + } + + def print_repair_plan(self, repair_plan: Dict[str, Any]): + """Print the repair plan.""" + + self.stdout.write(f"🔧 REPAIR STRATEGY: {repair_plan['strategy']}") + self.stdout.write(f"📝 PLANNED ACTIONS:") + self.stdout.write(f" • M2M links to create: {len(repair_plan['links_to_create'])}") + + # Show sample links + for i, link in enumerate(repair_plan['links_to_create'][:3], 1): + att = link['attachment'] + msg = link['message'] + self.stdout.write(f"\n {i}. Link attachment '{att.name}' to message:") + self.stdout.write(f" • Message ID: {msg.id}") + self.stdout.write(f" • Subject: '{msg.subject[:50]}...'") + self.stdout.write(f" • Date: {msg.created_at}") + self.stdout.write(f" • Reasoning: {link['reasoning']}") + + if len(repair_plan['links_to_create']) > 3: + remaining = len(repair_plan['links_to_create']) - 3 + self.stdout.write(f" • ... and {remaining} more links") + + def execute_repairs(self, repair_plan: Dict[str, Any]) -> Dict[str, Any]: + """Execute the repair plan.""" + + successful_links = 0 + failed_links = 0 + errors = [] + + with transaction.atomic(): + for link in repair_plan['links_to_create']: + try: + attachment = link['attachment'] + message = link['message'] + + # Create the M2M link + attachment.messages.add(message) + + self.stdout.write(f"✅ Linked '{attachment.name}' to message {message.id}") + successful_links += 1 + + except Exception as e: + error_msg = f"Failed to link {attachment.name}: {e}" + self.stdout.write(self.style.ERROR(f"❌ {error_msg}")) + errors.append(error_msg) + failed_links += 1 + + return { + 'successful_links': successful_links, + 'failed_links': failed_links, + 'errors': errors + } + + def print_results(self, results: Dict[str, Any]): + """Print the execution results.""" + + self.stdout.write(f"\n📊 REPAIR RESULTS:") + self.stdout.write(f" • Successful links created: {results['successful_links']}") + self.stdout.write(f" • Failed links: {results['failed_links']}") + + if results['errors']: + self.stdout.write(f"\n❌ ERRORS:") + for error in results['errors']: + self.stdout.write(f" • {error}") + + if results['successful_links'] > 0: + self.stdout.write(self.style.SUCCESS(f"\n🎉 Successfully created {results['successful_links']} attachment links!")) + self.stdout.write("💡 You should now be able to see attachments in your email search results.") diff --git a/src/backend/core/urls.py b/src/backend/core/urls.py index 3b8a7524..7f71a0a7 100644 --- a/src/backend/core/urls.py +++ b/src/backend/core/urls.py @@ -2,6 +2,8 @@ from django.conf import settings from django.urls import include, path +from django.shortcuts import redirect +from django.http import HttpResponse from rest_framework.routers import DefaultRouter @@ -30,6 +32,31 @@ from core.api.viewsets.user import UserViewSet from core.authentication.urls import urlpatterns as oidc_urls +def api_root(request): + """Simple API root view showing available endpoints.""" + return HttpResponse(""" + + Messages API + +

Messages API

+

Available Endpoints:

+ +

Deep Search Endpoints:

+ +

Note: Most endpoints require authentication.

+ + + """, content_type="text/html") + # - Main endpoints router = DefaultRouter() router.register("mta", MTAViewSet, basename="mta") @@ -68,6 +95,8 @@ ) urlpatterns = [ + # Root URL for API discovery + path("", api_root, name="api-root"), path( f"api/{settings.API_VERSION}/", include( @@ -91,6 +120,8 @@ mailbox_management_nested_router.urls ), # Includes /maildomains/{id}/mailboxes/, # Includes /maildomains/{id}/users/ ), + # Deep search endpoints for AI-powered search + path("deep_search/", include("deep_search.urls")), *oidc_urls, ] ), diff --git a/src/backend/deep_search/README.md b/src/backend/deep_search/README.md new file mode 100644 index 00000000..a000ce79 --- /dev/null +++ b/src/backend/deep_search/README.md @@ -0,0 +1,175 @@ +# Albert Chatbot + +This module provides a chatbot implementation using the Albert API from Etalab for processing emails with the following capabilities: + +## Features + +1. **Mail Summarization**: Automatically summarize email content with key points and urgency assessment +2. **Answer Generation**: Generate professional responses to emails with configurable tone +3. **Mail Classification**: Classify emails into categories (urgent, normal, spam, etc.) + +## Configuration + +The chatbot uses the Albert API with the following configuration: + +```python +from deep_search import AlbertChatbot, AlbertConfig + +# Default configuration +config = AlbertConfig( + name="albert-etalab", + base_url="https://albert.api.etalab.gouv.fr/v1", + api_key="sk-eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...", + model="albert-large" +) + +chatbot = AlbertChatbot(config) +``` + +## Usage Examples + +### 1. Summarize an Email + +```python +from deep_search import get_chatbot + +chatbot = get_chatbot() + +result = chatbot.summarize_mail( + mail_content="Bonjour, je souhaiterais avoir des informations...", + sender="user@example.com", + subject="Demande d'informations" +) + +if result['success']: + summary = result['summary'] + print(f"Résumé: {summary['summary']}") + print(f"Points clés: {summary['key_points']}") + print(f"Action requise: {summary['action_required']}") + print(f"Urgence: {summary['urgency_level']}") +``` + +### 2. Generate an Email Response + +```python +result = chatbot.generate_mail_answer( + original_mail="Bonjour, pouvez-vous m'aider avec...", + context="L'utilisateur a besoin d'aide technique", + tone="professional" +) + +if result['success']: + response = result['response'] + print(f"Réponse: {response['response']}") + print(f"Sujet: {response['subject']}") +``` + +### 3. Classify an Email + +```python +result = chatbot.classify_mail( + mail_content="URGENT: Problème avec le service...", + sender="support@example.com", + subject="Problème urgent" +) + +if result['success']: + classification = result['classification'] + print(f"Catégorie: {classification['primary_category']}") + print(f"Confiance: {classification['confidence_score']}") + print(f"Justification: {classification['reasoning']}") +``` + +### 4. Batch Processing + +```python +mails = [ + { + 'content': 'Email 1 content...', + 'sender': 'user1@example.com', + 'subject': 'Subject 1' + }, + { + 'content': 'Email 2 content...', + 'sender': 'user2@example.com', + 'subject': 'Subject 2' + } +] + +results = chatbot.process_mail_batch(mails, operation="summarize") + +for result in results: + if result['success']: + print(f"Mail {result['batch_index']}: {result['summary']['summary']}") +``` + +## Mail Classifications + +The chatbot can classify emails into the following categories: + +- `urgent`: Emails requiring immediate attention +- `normal`: Standard emails +- `low_priority`: Non-urgent emails +- `spam`: Unwanted emails +- `information`: Informational emails +- `request`: Service requests +- `complaint`: Complaints or issues +- `support`: Technical support requests + +## Error Handling + +All functions return a consistent response format: + +```python +{ + 'success': True, # or False + 'result_data': {...}, # Specific to each function + 'error': 'Error message', # Only present if success=False + # Additional metadata... +} +``` + +## Logging + +The chatbot uses Django's logging system with appropriate log levels: + +- `INFO`: Normal operations and successful API calls +- `ERROR`: API failures and processing errors +- `DEBUG`: Detailed debugging information (development only) + +## API Rate Limiting + +The chatbot respects the Albert API rate limits. For high-volume processing, consider: + +1. Using batch processing functionality +2. Implementing retry logic with exponential backoff +3. Monitoring API usage and quotas + +## Security + +- API keys are handled securely through configuration +- No sensitive information is logged +- All API communications use HTTPS +- Function calling ensures structured responses + +## Testing + +Test the real Albert API integration with: + +```bash +# Test individual functions +python chatbot/test_focused_api.py --function connection +python chatbot/test_focused_api.py --function summarize +python chatbot/test_focused_api.py --function classify +python chatbot/test_focused_api.py --function answer + +# Run comprehensive real API tests +python chatbot/test_real_api.py +``` + +The test suite includes: +- Real Albert API integration tests +- Individual function testing +- Error handling scenarios +- Batch processing with real API +- Complete workflow testing diff --git a/src/backend/deep_search/__init__.py b/src/backend/deep_search/__init__.py new file mode 100644 index 00000000..00bbf086 --- /dev/null +++ b/src/backend/deep_search/__init__.py @@ -0,0 +1,14 @@ +""" +Chatbot module for mail processing operations using Albert API. +""" + +default_app_config = 'deep_search.apps.DeepSearchConfig' + +from .chatbot import AlbertChatbot, get_chatbot +from .config import AlbertConfig + +__all__ = [ + "AlbertChatbot", + "AlbertConfig", + "get_chatbot", +] diff --git a/src/backend/deep_search/api_client.py b/src/backend/deep_search/api_client.py new file mode 100644 index 00000000..a1b8fc3a --- /dev/null +++ b/src/backend/deep_search/api_client.py @@ -0,0 +1,87 @@ +""" +API client for interacting with Albert API. + +This module handles all low-level API communication with the Albert service. +""" + +import json +import logging +import requests +from typing import Dict, List, Optional, Any + +from .config import AlbertConfig +from .exceptions import AlbertAPIError + +logger = logging.getLogger(__name__) + + +class AlbertAPIClient: + """Client for interacting with Albert API.""" + + def __init__(self, config: AlbertConfig): + """ + Initialize the Albert API client. + + Args: + config: Configuration for Albert API + """ + self.config = config + + # Validate that base_url is properly configured + if not self.config.base_url: + raise ValueError("AI_BASE_URL environment variable is not set or is empty. Please configure it with the Albert API base URL.") + + if not self.config.base_url.startswith(('http://', 'https://')): + raise ValueError(f"Invalid base URL '{self.config.base_url}'. URL must start with http:// or https://") + + logger.info(f"Albert API Client initialized with base URL: {self.config.base_url}") + + self.session = requests.Session() + self.session.headers.update({ + 'Authorization': f'Bearer {self.config.api_key}', + 'Content-Type': 'application/json', + }) + + def make_request( + self, + messages: List[Dict[str, str]], + ) -> Dict[str, Any]: + """ + Make a request to Albert API. + + Args: + messages: List of message objects for the conversation + + Returns: + Response from Albert API + + Raises: + AlbertAPIError: If API request fails + """ + try: + payload = { + "model": self.config.model, + "messages": messages, + "temperature": self.config.temperature, + "max_tokens": self.config.max_tokens, + } + + logger.info(f"Making request to Albert API with {len(messages)} messages") + + response = self.session.post( + f"{self.config.base_url}/chat/completions", + json=payload, + timeout=self.config.timeout + ) + response.raise_for_status() + + result = response.json() + logger.info("Successfully received response from Albert API") + return result + + except requests.RequestException as e: + logger.error(f"Failed to make request to Albert API: {e}") + raise AlbertAPIError(f"API request failed: {e}") + except json.JSONDecodeError as e: + logger.error(f"Failed to decode JSON response from Albert API: {e}") + raise AlbertAPIError(f"Invalid JSON response: {e}") diff --git a/src/backend/deep_search/apps.py b/src/backend/deep_search/apps.py new file mode 100644 index 00000000..bb705e1a --- /dev/null +++ b/src/backend/deep_search/apps.py @@ -0,0 +1,17 @@ +""" +Django app configuration for deep_search. +""" + +from django.apps import AppConfig + + +class DeepSearchConfig(AppConfig): + """Configuration for the deep_search Django app.""" + + default_auto_field = "django.db.models.BigAutoField" + name = "deep_search" + verbose_name = "Deep Search" + + def ready(self): + """Called when the app is ready.""" + pass diff --git a/src/backend/deep_search/apps.py 10-33-54-044.py b/src/backend/deep_search/apps.py 10-33-54-044.py new file mode 100644 index 00000000..bb705e1a --- /dev/null +++ b/src/backend/deep_search/apps.py 10-33-54-044.py @@ -0,0 +1,17 @@ +""" +Django app configuration for deep_search. +""" + +from django.apps import AppConfig + + +class DeepSearchConfig(AppConfig): + """Configuration for the deep_search Django app.""" + + default_auto_field = "django.db.models.BigAutoField" + name = "deep_search" + verbose_name = "Deep Search" + + def ready(self): + """Called when the app is ready.""" + pass diff --git a/src/backend/deep_search/chatbot.py b/src/backend/deep_search/chatbot.py new file mode 100644 index 00000000..7809a7fe --- /dev/null +++ b/src/backend/deep_search/chatbot.py @@ -0,0 +1,52 @@ +""" +Simplified chatbot implementation using Albert API for intelligent email search only. + +This module provides a minimal interface for email search operations through +the Albert API. All logic has been moved to views.py for simplicity. + +Author: ANCT +Date: 2025-06-19 +Refactored: 2025-06-26 +Updated: 2025-07-02 +Simplified: 2025-07-08 +""" + +import logging +from typing import Optional + +from .config import AlbertConfig +from .api_client import AlbertAPIClient +from .exceptions import ChatbotError + +logger = logging.getLogger(__name__) + + +class AlbertChatbot: + """ + This class only provides API client configuration. + All search logic is in views.py. + """ + + def __init__(self, config: Optional[AlbertConfig] = None): + """ + Initialize the Albert chatbot with API configuration. + + Args: + config: Configuration for Albert API. + """ + self.config = AlbertConfig() + self.api_client = AlbertAPIClient(self.config) + logger.info("Albert chatbot initialized for intelligent email search") + + +def get_chatbot(config: Optional[AlbertConfig] = None) -> AlbertChatbot: + """ + Factory function to get a configured AlbertChatbot instance. + + Args: + config: Optional configuration for Albert API. If None, uses default config. + + Returns: + Configured AlbertChatbot instance + """ + return AlbertChatbot(config=config) diff --git a/src/backend/deep_search/config.py b/src/backend/deep_search/config.py new file mode 100644 index 00000000..8cbdbaf3 --- /dev/null +++ b/src/backend/deep_search/config.py @@ -0,0 +1,26 @@ +""" +Configuration settings for the Albert chatbot. + +This module contains configuration classes and settings for the chatbot system. +""" + +import os +from dataclasses import dataclass +from enum import Enum + +@dataclass +class AlbertConfig: + """Configuration for Albert API.""" + name: str = "albert-etalab" + base_url: str = os.getenv("AI_BASE_URL", "https://albert.api.etalab.gouv.fr/v1") + api_key: str = os.getenv("AI_API_KEY", "") + model: str = os.getenv("AI_MODEL", "albert-large") + temperature: float = 0.3 + max_tokens: int = 4000 # Increased for better response capacity + timeout: int = 120 + max_iterations: int = 5 + # RAG-specific settings + max_emails_per_batch: int = 50 # Legacy limit (not currently used - all emails are processed) + max_email_content_length: int = 15000 # Limit email content size (increased to preserve attachments) + batch_upload_delay: float = 2.0 # Delay between batch uploads + min_relevance_score_percentage: float = 0.95 # Minimum score as percentage of highest score (0.0-1.0) diff --git a/src/backend/deep_search/contextual_search.py b/src/backend/deep_search/contextual_search.py new file mode 100644 index 00000000..97ab3e69 --- /dev/null +++ b/src/backend/deep_search/contextual_search.py @@ -0,0 +1,983 @@ +""" +Contextual Search Service for Email. + +This module provides a contextual search approach that sends all retrieved emails +directly to the Albert API for intelligent search, bypassing the RAG system. +This approach: +1. Retrieves ALL emails from the user's mailboxes +2. Sends all emails as context to the Albert API +3. Uses the Albert API's intelligence to find relevant emails +4. Returns formatted results for mailbox display + +This is useful when you want to leverage Albert's full capabilities without +the overhead of local embeddings and vector storage. +""" + +import json +import time +import logging +import re +import html +from typing import Dict, List, Any +from django.contrib.auth import get_user_model + +from core import models + +try: + from .email_utils import email_utils +except ImportError as e: + print(f"Warning: Could not import email_utils: {e}") + email_utils = None + +logger = logging.getLogger(__name__) +User = get_user_model() + + +class ContextualSearchService: + """ + Contextual search service for email. + + Provides Albert API-based contextual search functionality using shared email utilities. + """ + + def __init__(self): + """Initialize the contextual search service.""" + self.logger = logger + self.email_utils = email_utils + if email_utils is None: + self.logger.warning("email_utils not available, using fallback methods") + + def _get_user_accessible_mailboxes(self, user_id: str) -> List[models.Mailbox]: + """ + Fallback method to get all mailboxes that a user has access to. + + Args: + user_id: UUID of the user + + Returns: + List of Mailbox objects the user can access + """ + try: + self.logger.info(f"Getting accessible mailboxes for user_id: {user_id}") + + user = User.objects.get(id=user_id) + self.logger.debug(f"Found user: {user.email if hasattr(user, 'email') else user}") + + mailboxes = models.Mailbox.objects.filter( + accesses__user=user + ).select_related('domain', 'contact').order_by("-created_at") + + mailbox_count = mailboxes.count() + self.logger.info(f"Found {mailbox_count} accessible mailboxes for user {user}") + + if mailbox_count > 0: + mailbox_list = list(mailboxes) + self.logger.debug(f"Mailbox IDs: {[str(mb.id) for mb in mailbox_list[:5]]}") + return mailbox_list + else: + self.logger.warning(f"No accessible mailboxes found for user {user}") + return [] + + except User.DoesNotExist: + self.logger.error(f"User with ID {user_id} not found") + return [] + except Exception as e: + self.logger.error(f"Error retrieving mailboxes for user {user_id}: {e}", exc_info=True) + return [] + + def _get_recent_messages(self, mailboxes: List[models.Mailbox], limit: int = None) -> List[models.Message]: + """ + Fallback method to get messages from the given mailboxes. + + Args: + mailboxes: List of Mailbox objects + limit: Maximum number of messages to retrieve (default: None for all messages) + + Returns: + List of Message objects + """ + try: + limit_text = f"limit: {limit}" if limit else "all messages" + self.logger.info(f"Getting messages from {len(mailboxes)} mailboxes, {limit_text}") + + if not mailboxes: + self.logger.warning("No mailboxes provided to get_recent_messages") + return [] + + mailbox_ids = [mb.id for mb in mailboxes] + self.logger.debug(f"Searching messages in mailbox IDs: {mailbox_ids[:5]}") + + messages_query = models.Message.objects.filter( + thread__accesses__mailbox__id__in=mailbox_ids, + is_draft=False, + is_trashed=False + ).select_related( + 'sender', 'thread' + ).order_by('-created_at') + + # Apply limit only if specified + if limit is not None: + messages_query = messages_query[:limit] + + message_list = list(messages_query) + message_count = len(message_list) + + self.logger.info(f"Retrieved {message_count} messages from {len(mailboxes)} mailboxes") + + if message_count > 0: + first_msg = message_list[0] + last_msg = message_list[-1] + self.logger.debug(f"Date range: {last_msg.created_at} to {first_msg.created_at}") + self.logger.debug(f"Sample message IDs: {[str(msg.id) for msg in message_list[:3]]}") + else: + self.logger.warning("No messages found in the provided mailboxes") + + return message_list + + except Exception as e: + self.logger.error(f"Error retrieving recent messages: {e}", exc_info=True) + return [] + + def _get_parsed_message_content(self, message: models.Message) -> str: + """ + Fallback method to get parsed text content from a message for chatbot processing. + + Args: + message: Message object + + Returns: + String containing the parsed text content + """ + try: + self.logger.debug(f"Parsing message {message.id} - {message.subject}") + + parsed_data = message.get_parsed_data() + self.logger.debug(f"Parsed data keys: {list(parsed_data.keys())}") + + text_content = "" + text_body = parsed_data.get('textBody', []) + html_body = parsed_data.get('htmlBody', []) + self.logger.debug(f"Text body parts: {len(text_body)}, HTML body parts: {len(html_body)}") + + if text_body: + for text_part in text_body: + if isinstance(text_part, dict) and 'content' in text_part: + text_content += text_part['content'] + "\n" + elif isinstance(text_part, str): + text_content += text_part + "\n" + + if not text_content and html_body: + self.logger.debug(f"No text body, extracting from HTML") + for html_part in html_body: + if isinstance(html_part, dict) and 'content' in html_part: + html_content = html_part['content'] + clean_text = re.sub(r'<[^>]+>', '', html_content) + clean_text = html.unescape(clean_text) + text_content += clean_text + "\n" + + text_content = text_content.strip() + + self.logger.debug(f"Extracted {len(text_content)} characters of text content") + return text_content + + except Exception as e: + self.logger.error(f"Error parsing message content for {message}: {e}", exc_info=True) + return "" + + def _get_parsed_message_details(self, message: models.Message) -> Dict[str, Any]: + """ + Fallback method to get parsed content details from a message using the model's built-in parsing methods. + + Args: + message: Message object + + Returns: + Dictionary with parsed message content and metadata + """ + try: + self.logger.debug(f"Parsing message details {message.id} - {message.subject}") + + parsed_data = message.get_parsed_data() + self.logger.debug(f"Parsed data keys: {list(parsed_data.keys())}") + + text_body = parsed_data.get('textBody', []) + html_body = parsed_data.get('htmlBody', []) + self.logger.debug(f"Text body parts: {len(text_body)}, HTML body parts: {len(html_body)}") + + attachments = [] + for attachment in message.attachments.all(): + if attachment.name and attachment.name.strip(): + attachments.append({ + 'name': attachment.name, + 'size': attachment.size, + 'content_type': attachment.content_type, + }) + + if attachments: + attachment_names = [att['name'] for att in attachments] + self.logger.debug(f"Message {message.id} has attachments: {', '.join(attachment_names)}") + else: + self.logger.debug(f"Message {message.id} has no valid attachments") + + recipients_by_type = message.get_all_recipient_contacts() + recipients = { + 'to': [{'name': contact.name, 'email': contact.email} + for contact in recipients_by_type.get('to', [])], + 'cc': [{'name': contact.name, 'email': contact.email} + for contact in recipients_by_type.get('cc', [])], + 'bcc': [{'name': contact.name, 'email': contact.email} + for contact in recipients_by_type.get('bcc', [])] + } + self.logger.debug(f"Recipients - To: {len(recipients['to'])}, CC: {len(recipients['cc'])}, BCC: {len(recipients['bcc'])}") + + result = { + 'subject': message.subject, + 'sender': { + 'name': message.sender.name, + 'email': message.sender.email + }, + 'sent_at': message.sent_at, + 'text_body': text_body, + 'html_body': html_body, + 'attachments': attachments, + 'recipients': recipients, + 'is_draft': message.is_draft, + 'is_unread': message.is_unread, + 'is_starred': message.is_starred, + 'thread_id': str(message.thread.id), + 'message_id': str(message.id), + } + + self.logger.debug(f"Successfully parsed message {message.id}") + return result + + except Exception as e: + self.logger.error(f"Error parsing message content for {message}: {e}", exc_info=True) + return { + 'subject': message.subject, + 'error': str(e) + } + + def _get_email_context_for_chatbot(self, user_id: str) -> Dict[str, Any]: + """ + Fallback method to gather email context for the chatbot. + Gets ALL emails with all available information. + + Args: + user_id: UUID of the user + + Returns: + Dictionary containing email context and metadata + """ + start_time = time.time() + self.logger.info(f"Getting email context for chatbot (fallback) - user_id: {user_id}") + + try: + # Step 1: Get accessible mailboxes + self.logger.info("Step 1: Getting accessible mailboxes") + mailboxes = self._get_user_accessible_mailboxes(user_id) + + if not mailboxes: + self.logger.warning(f"No accessible mailboxes found for user {user_id}") + return { + "success": False, + "error": "No accessible mailboxes found", + "emails": [], + "total_emails": 0, + "mailbox_count": 0, + "processing_time": time.time() - start_time + } + + self.logger.info(f"Step 1 completed: Found {len(mailboxes)} accessible mailboxes") + + # Step 2: Get recent messages with full details - LIMIT FOR SPEED + self.logger.info("Step 2: Getting messages with full details (SPEED LIMITED)") + recent_message_objects = self._get_recent_messages(mailboxes, limit=200) # Add limit for speed + + if not recent_message_objects: + self.logger.warning(f"No messages found in mailboxes for user {user_id}") + return { + "success": False, + "error": "No messages found", + "emails": [], + "total_emails": 0, + "mailbox_count": len(mailboxes), + "processing_time": time.time() - start_time + } + + self.logger.info(f"Step 2 completed: Retrieved {len(recent_message_objects)} messages") + + # Step 3: Extract complete email information + self.logger.info("Step 3: Extracting complete email information") + emails_with_full_info = [] + + for i, message in enumerate(recent_message_objects): + if i < 10: + self.logger.debug(f"Processing message {i+1}: ID={message.id}, subject={message.subject[:50]}...") + + parsed_details = self._get_parsed_message_details(message) + text_content = self._get_parsed_message_content(message) + + attachments = [] + for attachment in message.attachments.all(): + if attachment.name and attachment.name.strip(): + attachments.append({ + 'name': attachment.name, + 'size': attachment.size, + 'content_type': attachment.content_type, + }) + + if attachments: + attachment_names = [att['name'] for att in attachments] + self.logger.debug(f"Message {message.id} has attachments: {', '.join(attachment_names)}") + + email_data = { + "id": str(message.id), + "subject": message.subject, + "content": text_content, + "sender": { + "name": message.sender.name, + "email": message.sender.email + }, + "recipients": parsed_details.get('recipients', {}), + "sent_at": message.sent_at.isoformat() if message.sent_at else None, + "created_at": message.created_at.isoformat(), + "thread_id": str(message.thread.id), + "thread_subject": message.thread.subject, + "attachments": attachments, + "attachment_count": len(attachments), + "flags": { + "is_unread": message.is_unread, + "is_starred": message.is_starred, + "is_draft": message.is_draft, + "is_trashed": message.is_trashed, + "is_spam": message.is_spam, + "is_archived": message.is_archived, + "is_sender": message.is_sender + } + } + + emails_with_full_info.append(email_data) + + self.logger.info(f"Step 3 completed: Processed {len(emails_with_full_info)} emails with full information") + + processing_time = time.time() - start_time + + result = { + "success": True, + "emails": emails_with_full_info, + "total_emails": len(emails_with_full_info), + "mailbox_count": len(mailboxes), + "processing_time": processing_time + } + + self.logger.info(f"Email context retrieval completed successfully: " + f"{len(emails_with_full_info)} emails retrieved, " + f"processing time: {processing_time:.2f}s") + + return result + + except Exception as e: + processing_time = time.time() - start_time + self.logger.error(f"Error in _get_email_context_for_chatbot for user {user_id}: {e}", exc_info=True) + return { + "success": False, + "error": f"Error retrieving email context: {str(e)}", + "emails": [], + "total_emails": 0, + "mailbox_count": 0, + "processing_time": processing_time + } + + def _extract_key_content(self, content: str, max_length: int = 300) -> str: + """ + Ultra-fast extraction of key content from email body by removing common words and keeping essential information. + Optimized for maximum speed to minimize inference time. + + Args: + content: Original email content + max_length: Maximum length of processed content (reduced to 300 for speed) + + Returns: + Processed content with key information only + """ + if not content or len(content.strip()) == 0: + return content + + try: + import re + + # Ultra-fast cleanup - only first 10 lines, simple patterns + content = content.strip() + + # Take only first 10 lines and remove obvious quoted/signature content + lines = [] + for line in content.split('\n')[:10]: # Reduced from 20 to 10 lines + stripped = line.strip() + # Skip obvious patterns quickly + if (not stripped.startswith('>') and + not stripped.startswith('le ') and + 'sent from' not in stripped.lower() and + 'cordialement' not in stripped.lower() and + len(stripped) > 2): + lines.append(line) + + content = ' '.join(lines) + + # Ultra-fast word extraction - only letters/numbers, length > 2 + words = re.findall(r'\b[a-zA-ZÀ-ÿ0-9]{3,}\b', content[:1000]) # Limit input to first 1000 chars + + # Minimal stop word removal for speed + mini_stop_words = { + 'the', 'and', 'for', 'are', 'but', 'not', 'you', 'all', 'can', 'was', 'one', 'our', 'get', 'has', 'his', 'how', 'may', 'new', 'now', 'see', 'two', 'who', 'did', 'she', 'use', 'way', 'why', + 'les', 'des', 'une', 'dans', 'pour', 'que', 'qui', 'est', 'avec', 'par', 'sur', 'sont', 'cette', 'mais', 'tout', 'comme', 'aux', 'ses', 'plus', 'ces', 'nous', 'vous', 'leur', 'elle', 'fait', 'sans', 'peut', 'très', 'bien' + } + + # Keep meaningful words only (reduced limits for speed) + key_words = [word for word in words[:50] if word.lower() not in mini_stop_words] # Reduced from 100 to 50 + + # Quick fallback if too few words remain + if len(key_words) < 3: # Reduced threshold + return content[:max_length] + ("..." if len(content) > max_length else "") + + # Reconstruct with key words (reduced to 25 words for speed) + key_content = ' '.join(key_words[:25]) # Reduced from 40 to 25 + + # Final length check + if len(key_content) > max_length: + key_content = key_content[:max_length] + "..." + + return key_content + + except Exception as e: + # Ultra-fast fallback - just truncate to smaller size + return content[:max_length] + ("..." if len(content) > max_length else "") + + def format_emails_for_albert_context(self, emails: List[Dict[str, Any]]) -> str: + """ + Format emails for Albert API context. + + Args: + emails: List of email dictionaries + + Returns: + Formatted string containing all emails for Albert API context + """ + try: + self.logger.info(f"Formatting {len(emails)} emails for Albert API context with key content extraction") + + # Count emails with attachments for logging + emails_with_attachments = [email for email in emails if email.get('attachment_count', 0) > 0] + if emails_with_attachments: + self.logger.info(f"Found {len(emails_with_attachments)} emails with attachments out of {len(emails)} total emails") + + # Track content reduction + original_content_length = 0 + processed_content_length = 0 + + context_parts = [] + context_parts.append("# EMAIL SEARCH CONTEXT\n") + context_parts.append(f"You are provided with {len(emails)} emails numbered from 1 to {len(emails)}.\n") + context_parts.append(f"Each email has a clear EMAIL NUMBER that you must use in your response.\n\n") + + for i, email in enumerate(emails, 1): + context_parts.append(f"{'='*60}\n") + context_parts.append(f"EMAIL NUMBER: {i} (USE THIS NUMBER: {i})\n") + context_parts.append(f"{'='*60}\n") + context_parts.append(f"**Subject:** {email.get('subject', 'No subject')}\n") + context_parts.append(f"**From:** {email.get('sender', {}).get('email', '')}\n") # Simplified - only email + + # Simplified recipients - only To field for speed + recipients = email.get('recipients', {}) + if recipients.get('to'): + to_emails = [r.get('email', '') for r in recipients.get('to', [])] + context_parts.append(f"**To:** {', '.join(to_emails[:2])}\n") # Limit to 2 recipients for speed + + context_parts.append(f"**Date:** {email.get('sent_at', 'No date')}\n") + + # Simplified flags for speed + flags = email.get('flags', {}) + if flags.get('is_unread') or flags.get('is_starred') or flags.get('is_draft'): + status_flags = [] + if flags.get('is_unread'): + status_flags.append("UNREAD") + if flags.get('is_starred'): + status_flags.append("STARRED") + if flags.get('is_draft'): + status_flags.append("DRAFT") + context_parts.append(f"**Status:** {', '.join(status_flags)}\n") + + # Add attachments - VERY IMPORTANT for search relevance + if email.get('attachment_count', 0) > 0: + attachments = email.get('attachments', []) + att_names = [att.get('name', '') for att in attachments if att.get('name', '').strip()] + if att_names: + context_parts.append(f"**📎 ATTACHMENTS (IMPORTANT FOR SEARCH): {', '.join(att_names)}**\n") + context_parts.append(f"**Attachment count:** {email.get('attachment_count', len(att_names))}\n") + else: + context_parts.append(f"**📎 Has {email.get('attachment_count', 0)} attachment(s) (names not available)**\n") + else: + context_parts.append(f"**📎 Attachments:** None\n") + + # Add content - extract key content to reduce inference time AGGRESSIVELY + content = email.get('content', '').strip() + if content: + # Track content reduction for performance logging + original_content_length += len(content) + key_content = self._extract_key_content(content, max_length=200) # Reduced from 500 to 200 + processed_content_length += len(key_content) + context_parts.append(f"**Key Content:** {key_content}\n") + else: + context_parts.append("**Key Content:** [No content available]\n") + + # Simplified separators for speed + context_parts.append(f"\n--- END EMAIL {i} ---\n\n") + + # Simplified summary for speed + context_parts.append(f"\nSUMMARY: {len(emails)} emails numbered 1-{len(emails)}\n") + + formatted_context = "".join(context_parts) + + # Log content reduction benefits + if original_content_length > 0: + reduction_ratio = (original_content_length - processed_content_length) / original_content_length * 100 + self.logger.info(f"📊 CONTENT REDUCTION PERFORMANCE:") + self.logger.info(f" • Original content: {original_content_length:,} chars") + self.logger.info(f" • Processed content: {processed_content_length:,} chars") + self.logger.info(f" • Reduction: {reduction_ratio:.1f}% ({original_content_length - processed_content_length:,} chars saved)") + + self.logger.info(f"Formatted context length: {len(formatted_context)} characters") + + return formatted_context + + except Exception as e: + self.logger.error(f"Error formatting emails for Albert context: {e}", exc_info=True) + return f"Error formatting emails: {str(e)}" + + def create_albert_search_prompt(self, user_query: str, emails_context: str, num_emails: int) -> str: + """ + Create a prompt for Albert API to search emails. + OPTIMIZED FOR SPEED - shorter prompt, clearer instructions. + + Args: + user_query: User's search query + emails_context: Formatted emails context + num_emails: Number of emails in the context + + Returns: + Formatted prompt for Albert API + """ + try: + # ULTRA-COMPACT prompt for faster inference + prompt = f"""Find relevant emails for: "{user_query}" + +EMAILS: {num_emails} emails numbered 1-{num_emails} +ATTACHMENTS: Look for 📎 markers - important for relevance! + +RESPONSE: JSON array only: +[{{"id": "1", "relevance_score": 0.95, "reason": "matches query because..."}}] + +RULES: +- Max 8 results (reduced for speed) +- Score >= 0.5 only +- Use EMAIL NUMBER from "EMAIL NUMBER: X" +- Valid IDs: 1 to {num_emails} ONLY +- Sort by score (highest first) +- Empty array [] if no matches + +{emails_context} + +QUERY: "{user_query}" +JSON RESPONSE:""" + + self.logger.debug(f"🚀 SPEED-OPTIMIZED: Created compact prompt of length: {len(prompt)} (reduced size)") + return prompt + + except Exception as e: + self.logger.error(f"Error creating Albert search prompt: {e}", exc_info=True) + return f"Error creating prompt: {str(e)}" + + def parse_albert_search_response(self, response: str, context_emails: List[Dict]) -> List[Dict[str, Any]]: + """ + Parse Albert API response and format results for frontend. + + Args: + response: Raw Albert API response + context_emails: Original email context + + Returns: + List of formatted search results + """ + try: + self.logger.info(f"Parsing Albert search response (length: {len(response)})") + + # Create email lookup + email_lookup = {email['id']: email for email in context_emails} + self.logger.debug(f"Created email lookup with {len(email_lookup)} emails") + + # Create a fallback mapping from EMAIL numbers to actual IDs for cases where Albert returns wrong format + email_number_to_id = {str(i): email['id'] for i, email in enumerate(context_emails, 1)} + self.logger.debug(f"Created fallback mapping for {len(email_number_to_id)} email numbers") + + # Try to extract JSON from response + try: + # Look for JSON array in the response + import re + json_match = re.search(r'\[.*?\]', response, re.DOTALL) + if json_match: + json_str = json_match.group(0) + self.logger.debug(f"Found JSON string: {json_str[:200]}...") + albert_results = json.loads(json_str) + self.logger.info(f"Successfully parsed {len(albert_results)} results from Albert") + + # Log the structure of the first result for debugging + if albert_results: + first_result = albert_results[0] + self.logger.debug(f"First result structure: {first_result}") + required_keys = ['id', 'relevance_score', 'reason'] + missing_keys = [key for key in required_keys if key not in first_result] + if missing_keys: + self.logger.warning(f"First result missing keys: {missing_keys}") + + else: + self.logger.warning("No JSON array found in Albert response") + self.logger.warning(f"Response content: {response[:500]}...") + + # Try to find any JSON-like structure + json_bracket_match = re.search(r'\{.*?\}', response, re.DOTALL) + if json_bracket_match: + self.logger.info("Found JSON object instead of array, attempting to parse...") + json_str = json_bracket_match.group(0) + try: + single_result = json.loads(json_str) + albert_results = [single_result] # Wrap in array + self.logger.info(f"Successfully parsed single result as array") + except json.JSONDecodeError: + self.logger.warning("Failed to parse JSON object as well") + return [] + else: + self.logger.warning("No JSON structure found in response at all") + return [] + + except json.JSONDecodeError as e: + self.logger.error(f"Failed to parse JSON from Albert response: {e}") + self.logger.error(f"Failed JSON: {json_str if 'json_str' in locals() else 'No JSON string'}") + return [] + + # Format results for frontend + formatted_results = [] + + for i, result in enumerate(albert_results): + self.logger.debug(f"Processing result {i+1}: {result}") + + email_id = result.get('id', '') + relevance_score = result.get('relevance_score', 0.0) + reason = result.get('reason', 'Albert identified as relevant') + + if not email_id: + self.logger.warning(f"Result {i+1} has no 'id' field: {result}") + continue + + # Always use number-to-UUID mapping approach to avoid hallucination + email = None + actual_email_id = None + + # Convert email_id to string and validate it's a number within range + try: + email_number = str(email_id).strip() + email_index = int(email_number) + + if 1 <= email_index <= len(context_emails): + # Valid email number - map to actual UUID + actual_email_id = email_number_to_id[email_number] + email = email_lookup[actual_email_id] + self.logger.debug(f"✅ Mapped EMAIL #{email_number} to UUID {actual_email_id}") + else: + self.logger.warning(f"❌ Albert returned invalid EMAIL number {email_number}. Valid range: 1-{len(context_emails)}") + continue + + except (ValueError, KeyError) as e: + self.logger.warning(f"❌ Albert returned invalid email ID format '{email_id}': {e}") + self.logger.warning(f"Expected: number between 1 and {len(context_emails)}") + continue + + if email and actual_email_id: + # Format result for frontend compatibility + formatted_result = { + 'id': actual_email_id, # Use the actual UUID + 'message_id': actual_email_id, + 'thread_id': email.get('thread_id', ''), + 'subject': email.get('subject', ''), + 'from': email.get('sender', {}).get('email', ''), + 'sender': { + 'email': email.get('sender', {}).get('email', ''), + 'name': email.get('sender', {}).get('name', '') + }, + 'sender_name': email.get('sender', {}).get('name', ''), + 'sender_email': email.get('sender', {}).get('email', ''), + 'date': email.get('sent_at'), + 'sent_at': email.get('sent_at'), + 'snippet': email.get('content', '')[:200] if email.get('content') else '', + 'is_unread': email.get('flags', {}).get('is_unread', False), + 'is_starred': email.get('flags', {}).get('is_starred', False), + 'thread_subject': email.get('thread_subject', ''), + 'relevance_score': relevance_score, + 'ai_reason': reason, + 'attachment_count': email.get('attachment_count', 0), + 'content_preview': email.get('content', '')[:200] if email.get('content') else '', + 'search_method': 'contextual_search' + } + + formatted_results.append(formatted_result) + self.logger.debug(f"✅ Added result for email {actual_email_id} with score {relevance_score}") + + # Sort by relevance score (highest first) + formatted_results.sort(key=lambda x: x.get('relevance_score', 0), reverse=True) + + self.logger.info(f"Successfully parsed {len(formatted_results)} email results from Albert") + return formatted_results + + except Exception as e: + self.logger.error(f"Error parsing Albert search response: {e}", exc_info=True) + self.logger.error(f"Response that caused error: {response[:500]}...") + return [] + + def chatbot_contextual_email_search( + self, + user_id: str, + user_query: str, + api_client, + max_results: int = 10 + ) -> Dict[str, Any]: + """ + Perform intelligent email search using Albert API directly. + + This method: + 1. Retrieves the most recent emails (up to 500) + 2. Formats all emails as context for Albert API + 3. Sends the query and context to Albert API + 4. Parses and formats the results for frontend + + Args: + user_id: UUID of the user + user_query: User's natural language search query + api_client: Albert API client instance + max_results: Maximum number of results to return + + Returns: + Dictionary containing search results and metadata + """ + search_start_time = time.time() + self.logger.info(f"Starting contextual email search - user_id: {user_id}, query: '{user_query[:100]}...'") + self.logger.info(f"API client type: {type(api_client)}") + self.logger.info(f"Max results requested: {max_results}") + + try: + # Step 1: Get all email context + self.logger.info("Step 1: Fetching email context") + context_start_time = time.time() + + if self.email_utils is None: + self.logger.info("Using fallback email context method") + context_result = self._get_email_context_for_chatbot(user_id) + else: + self.logger.info("Using email_utils for email context") + context_result = self.email_utils.get_email_context_for_chatbot(user_id) + + context_time = time.time() - context_start_time + self.logger.info(f"Successfully obtained email context in {context_time:.2f}s") + + if not context_result["success"]: + self.logger.error(f"Failed to get email context: {context_result.get('error', 'Unknown error')}") + return { + "success": False, + "error": f"Failed to retrieve emails: {context_result.get('error', 'Unknown error')}", + "results": [], + "search_method": "contextual_search", + "total_searched": 0, + "processing_time": time.time() - search_start_time + } + + context_emails = context_result["emails"] + self.logger.info(f"Step 1 completed: Retrieved {len(context_emails)} emails for search in {context_time:.2f}s") + self.logger.info(f"📊 CONTEXTUAL EMAIL CONTEXT PERFORMANCE: {len(context_emails)} emails retrieved in {context_time:.2f}s ({len(context_emails)/context_time:.1f} emails/sec)") + + if not context_emails: + self.logger.warning("No emails available for search") + return { + "success": True, + "results": [], + "message": "No emails found to search", + "search_method": "contextual_search", + "total_searched": 0, + "processing_time": time.time() - search_start_time + } + + # Step 2: Format emails for Albert API context + self.logger.info("Step 2: Formatting emails for Albert API context") + formatting_start_time = time.time() + + # Limit emails to avoid API payload limits - AGGRESSIVE REDUCTION for speed + max_emails_for_context = min(50, len(context_emails)) # Reduced from 100 to 50 emails + emails_for_context = context_emails[:max_emails_for_context] + + self.logger.info(f"🚀 SPEED OPTIMIZATION: Limited to {max_emails_for_context} emails for faster inference") + + emails_context = self.format_emails_for_albert_context(emails_for_context) + formatting_time = time.time() - formatting_start_time + self.logger.info(f"Step 2 completed: Formatted {len(emails_for_context)} emails for Albert context in {formatting_time:.2f}s") + self.logger.info(f"📊 CONTEXTUAL FORMATTING PERFORMANCE: {len(emails_for_context)} emails formatted in {formatting_time:.2f}s ({len(emails_for_context)/formatting_time:.1f} emails/sec)") + + # Step 3: Create Albert search prompt + self.logger.info("Step 3: Creating Albert search prompt") + search_prompt = self.create_albert_search_prompt(user_query, emails_context, len(emails_for_context)) + self.logger.info(f"Step 3 completed: Created search prompt of length {len(search_prompt)}") + + # Step 4: Send query to Albert API + self.logger.info("Step 4: Sending search query to Albert API") + try: + albert_start_time = time.time() + + # Prepare messages for Albert API (it expects a list of message objects) + messages = [ + { + "role": "user", + "content": search_prompt + } + ] + + self.logger.info(f"Prepared {len(messages)} messages for Albert API") + self.logger.debug(f"Message content length: {len(search_prompt)} characters") + + # Call Albert API using the correct method + albert_response = api_client.make_request(messages) + + albert_time = time.time() - albert_start_time + self.logger.info(f"Step 4 completed: Received Albert response in {albert_time:.2f}s") + self.logger.info(f"📊 CONTEXTUAL ALBERT API PERFORMANCE: Query processed in {albert_time:.2f}s for {len(emails_for_context)} emails") + + # Log the response structure for debugging + self.logger.debug(f"Albert API response keys: {list(albert_response.keys()) if albert_response else 'None'}") + + # Extract content from Albert API response + # The response structure is typically: {"choices": [{"message": {"content": "..."}}]} + albert_content = "" + if albert_response and "choices" in albert_response and len(albert_response["choices"]) > 0: + choice = albert_response["choices"][0] + if "message" in choice and "content" in choice["message"]: + albert_content = choice["message"]["content"] + self.logger.info(f"Successfully extracted content from Albert response: {len(albert_content)} characters") + else: + self.logger.error(f"Unexpected choice structure in Albert response: {choice}") + return { + "success": False, + "error": "Invalid Albert API response structure - missing message content", + "results": [], + "search_method": "contextual_search", + "total_searched": len(context_emails), + "processing_time": time.time() - search_start_time + } + else: + self.logger.error(f"Invalid Albert API response structure: {albert_response}") + return { + "success": False, + "error": "Invalid Albert API response structure - missing choices", + "results": [], + "search_method": "contextual_search", + "total_searched": len(context_emails), + "processing_time": time.time() - search_start_time + } + + if not albert_content: + self.logger.error("No content received from Albert API") + return { + "success": False, + "error": "No content received from Albert API", + "results": [], + "search_method": "contextual_search", + "total_searched": len(context_emails), + "processing_time": time.time() - search_start_time + } + + self.logger.debug(f"Albert response content: {albert_content[:500]}...") + + except Exception as api_error: + self.logger.error(f"Error calling Albert API: {api_error}", exc_info=True) + self.logger.error(f"API client type: {type(api_client)}") + self.logger.error(f"API client methods: {[method for method in dir(api_client) if not method.startswith('_')]}") + return { + "success": False, + "error": f"Failed to call Albert API: {str(api_error)}", + "results": [], + "search_method": "contextual_search", + "total_searched": len(context_emails), + "processing_time": time.time() - search_start_time + } + + # Step 5: Parse Albert response and format results + self.logger.info("Step 5: Parsing Albert response and formatting results") + parsing_start_time = time.time() + + # Log the full Albert response for debugging + self.logger.info(f"Albert API returned content of length: {len(albert_content)}") + self.logger.debug(f"First 500 characters of Albert response: {albert_content[:500]}") + + formatted_results = self.parse_albert_search_response(albert_content, context_emails) + + self.logger.info(f"Parsed {len(formatted_results)} results from Albert response") + + # Apply max_results limit - REDUCED for faster processing + if len(formatted_results) > max_results: + formatted_results = formatted_results[:max_results] + self.logger.info(f"🚀 SPEED: Limited results to {max_results} as requested") + + parsing_time = time.time() - parsing_start_time + self.logger.info(f"Step 5 completed: Formatted {len(formatted_results)} search results in {parsing_time:.2f}s") + + # Log detailed information about the final results + if formatted_results: + self.logger.info("=== CONTEXTUAL SEARCH RESULTS ===") + for i, result in enumerate(formatted_results[:5]): # Log first 5 results + self.logger.info(f" #{i+1}: ID={result.get('id', 'N/A')[:8]}... | Score={result.get('relevance_score', 'N/A')} | Subject={result.get('subject', 'N/A')[:50]}...") + if len(formatted_results) > 5: + self.logger.info(f" ... and {len(formatted_results) - 5} more results") + self.logger.info("=== END CONTEXTUAL SEARCH RESULTS ===") + else: + self.logger.warning("No results were formatted from Albert response") + self.logger.warning(f"Raw Albert content preview: {albert_content[:200]}...") + + search_time = time.time() - search_start_time + + # Log performance summary + self.logger.info(f"📊 CONTEXTUAL SEARCH TOTAL PERFORMANCE SUMMARY:") + self.logger.info(f" • Total Search Time: {search_time:.2f}s") + self.logger.info(f" • Email Context Time: {context_time:.2f}s ({(context_time/search_time*100):.1f}%)") + self.logger.info(f" • Email Formatting Time: {formatting_time:.2f}s ({(formatting_time/search_time*100):.1f}%)") + self.logger.info(f" • Albert API Time: {albert_time:.2f}s ({(albert_time/search_time*100):.1f}%)") + self.logger.info(f" • Response Parsing Time: {parsing_time:.2f}s ({(parsing_time/search_time*100):.1f}%)") + self.logger.info(f" • Other Processing Time: {(search_time-context_time-formatting_time-albert_time-parsing_time):.2f}s ({((search_time-context_time-formatting_time-albert_time-parsing_time)/search_time*100):.1f}%)") + self.logger.info(f" • Total Emails Available: {len(context_emails)}") + self.logger.info(f" • Emails Sent to Albert: {len(emails_for_context)}") + self.logger.info(f" • Results Returned: {len(formatted_results)}") + + return { + "success": True, + "results": formatted_results, + "search_method": "contextual_search", + "total_searched": len(context_emails), + "emails_sent_to_albert": len(emails_for_context), + "processing_time": search_time, + "albert_processing_time": albert_time if 'albert_time' in locals() else 0 + } + + except Exception as e: + total_search_time = time.time() - search_start_time + self.logger.error(f"Error in chatbot_contextual_email_search: {e}", exc_info=True) + return { + "success": False, + "error": f"Search failed: {str(e)}", + "results": [], + "search_method": "contextual_search", + "total_searched": 0, + "processing_time": total_search_time + } diff --git a/src/backend/deep_search/email_service.py b/src/backend/deep_search/email_service.py new file mode 100644 index 00000000..54a815f6 --- /dev/null +++ b/src/backend/deep_search/email_service.py @@ -0,0 +1,718 @@ +""" +Email Service for Chatbot Integration with Multiple Search Methods. + +This module provides a clean, class-based interface for email retrieval and search +for the chatbot. Supports two search methods: + +1. RAG-based search (default): Uses local embeddings and vector storage +2. Contextual search: Sends emails directly to Albert API for intelligent search + +Core functions: +- Get ALL emails with full metadata +- Intelligent email search using either RAG or contextual search +- Format results for mailbox display + +The search method is configurable via the use_rag parameter in EmailService.__init__(). +""" + +import json +import re +import html +import time +import logging +from typing import Dict, List, Any +from django.contrib.auth import get_user_model + + +from core import models +from .rag import RAGSystem +from .contextual_search import ContextualSearchService +try: + from .email_utils import email_utils +except ImportError as e: + print(f"Warning: Could not import email_utils: {e}") + email_utils = None + +logger = logging.getLogger(__name__) +User = get_user_model() + +# RAG system instance to be used for embedding and retrieval +rag_system = RAGSystem() + +# Contextual search service instance +contextual_search_service = ContextualSearchService() + + +class EmailService: + """ + Email service for chatbot integration with configurable search methods. + + Provides a clean interface for: + - Email retrieval with full metadata + - Multiple intelligent search methods (RAG and contextual) + - Background indexing utilities + """ + + def __init__(self, use_rag: bool = True): + """ + Initialize the email service. + + Args: + use_rag: If True, uses RAG-based search by default. + If False, uses contextual search (sending emails to Albert). + """ + self.logger = logger + self.use_rag = use_rag + + def check_user_needs_indexing(self, user_id: str) -> Dict[str, Any]: + """ + Check if a user has emails and needs indexing, without doing the heavy work. + + Args: + user_id: The user ID to check + + Returns: + Dict with information about whether indexing is needed + """ + try: + # Check if RAG collection exists efficiently + rag_system = RAGSystem() + + if rag_system.collection_exists(user_id): + return { + 'needs_indexing': False, + 'has_collection': True, + 'reason': 'Collection already exists' + } + + # Quick check if user has any emails at all + try: + user = User.objects.get(id=user_id) + + # Get accessible mailboxes count + mailboxes_count = models.Mailbox.objects.filter( + accesses__contact__user=user, + accesses__role__in=['owner', 'reader', 'admin'] + ).distinct().count() + + if mailboxes_count == 0: + return { + 'needs_indexing': False, + 'has_collection': False, + 'mailboxes_count': 0, + 'reason': 'User has no accessible mailboxes' + } + + # Quick check if user has any messages + message_count = models.Message.objects.filter( + thread__accesses__contact__user=user, + thread__accesses__role__in=['owner', 'reader', 'admin'], + ).count() + + if message_count == 0: + return { + 'needs_indexing': False, + 'has_collection': False, + 'mailboxes_count': mailboxes_count, + 'message_count': 0, + 'reason': 'User has no messages' + } + + return { + 'needs_indexing': True, + 'has_collection': False, + 'mailboxes_count': mailboxes_count, + 'message_count': message_count, + 'reason': f'User has {message_count} messages but no collection' + } + + except User.DoesNotExist: + return { + 'needs_indexing': False, + 'has_collection': False, + 'reason': 'User does not exist' + } + + except Exception as e: + self.logger.error(f"Error checking indexing needs for user {user_id}: {e}") + return { + 'needs_indexing': False, + 'has_collection': False, + 'error': str(e), + 'reason': 'Error during check' + } + + def parse_ai_response_for_email_search(self, ai_content: str, context_emails: List[Dict]) -> List[Dict[str, Any]]: + """ + Parse AI response for email search and return formatted results for mailbox display. + + Args: + ai_content: Raw AI response content + context_emails: List of email context objects + + Returns: + List of formatted search results compatible with mailbox display + """ + try: + self.logger.info(f"Parsing AI response (length: {len(ai_content)} chars)") + self.logger.info(f"AI response content: {ai_content}") # Log full content + + # Create a lookup dictionary for emails by ID + email_lookup = {email['id']: email for email in context_emails} + self.logger.info(f"Created email lookup with {len(email_lookup)} emails") + + # Try to parse JSON response + try: + self.logger.info("Looking for JSON array in AI response...") + # Look for JSON array in the response + json_match = re.search(r'\[.*?\]', ai_content, re.DOTALL) + if json_match: + json_str = json_match.group(0) + self.logger.info(f"Found JSON string: {json_str}") + ai_results = json.loads(json_str) + self.logger.info(f"Successfully parsed JSON with {len(ai_results)} results") + else: + self.logger.warning("No JSON array found in AI response") + self.logger.warning(f"AI response content: {ai_content}") + return [] + except json.JSONDecodeError as e: + self.logger.warning(f"Failed to parse JSON from AI response: {e}") + self.logger.warning(f"JSON string that failed: {json_str if 'json_str' in locals() else 'No JSON string'}") + return [] + + # Process AI results and format for mailbox display (like Elasticsearch results) + formatted_results = [] + for result in ai_results: + email_id = result.get('id', '') + if email_id in email_lookup: + email = email_lookup[email_id] + + # Format result like Elasticsearch search results (compatible with mailbox display) + formatted_result = { + 'message_id': email_id, + 'thread_id': email.get('thread_id', ''), + 'subject': email.get('subject', ''), + 'from': email.get('sender', {}).get('email', ''), # Frontend expects 'from' + 'sender_name': email.get('sender', {}).get('name', ''), + 'sender_email': email.get('sender', {}).get('email', ''), + 'date': email.get('sent_at'), # Frontend expects 'date' + 'sent_at': email.get('sent_at'), + 'snippet': email.get('content', '')[:200] if email.get('content') else '', # Frontend expects 'snippet' + 'is_unread': email.get('flags', {}).get('is_unread', False), + 'is_starred': email.get('flags', {}).get('is_starred', False), + 'thread_subject': email.get('thread_subject', ''), + 'relevance_score': result.get('relevance_score', 0.5), + 'ai_reason': result.get('reason', 'AI identified as relevant'), + # Additional metadata for debugging + 'attachment_count': email.get('attachment_count', 0), + 'content_preview': email.get('content', '')[:200] if email.get('content') else '', + } + formatted_results.append(formatted_result) + self.logger.debug(f"Added result for email {email_id} with score {result.get('relevance_score', 0.5)}") + else: + self.logger.warning(f"AI returned email ID {email_id} not found in context") + + # Sort by relevance score (highest first) + formatted_results.sort(key=lambda x: x.get('relevance_score', 0), reverse=True) + + self.logger.info(f"Successfully parsed {len(formatted_results)} email search results from AI response") + return formatted_results + + except Exception as e: + self.logger.error(f"Error parsing AI response for email search: {e}", exc_info=True) + return [] + + def chatbot_intelligent_email_search( + self, + user_id: str, + user_query: str, + api_client, + max_results: int = 10, + folder: str = 'all' + ) -> Dict[str, Any]: + """ + Perform intelligent email search using either RAG system or contextual search. + The method used depends on the use_rag flag set during initialization. + + Args: + user_id: UUID of the user + user_query: User's natural language search query + api_client: Albert API client instance + max_results: Maximum number of results to return + folder: Folder to search in ('all', 'sent', 'draft', 'trash') + + Returns: + Dictionary containing search results and metadata + """ + self.logger.info(f"Starting intelligent email search - method: {'RAG' if self.use_rag else 'contextual'}, folder: {folder}") + + if self.use_rag: + # Use RAG-based search (existing implementation) + return self._chatbot_rag_email_search(user_id, user_query, api_client, max_results, folder) + else: + # Use contextual search with Albert API + return contextual_search_service.chatbot_contextual_email_search( + user_id, user_query, api_client, max_results + ) + + def _chatbot_rag_email_search( + self, + user_id: str, + user_query: str, + api_client, + max_results: int = 10, + folder: str = 'all' + ) -> Dict[str, Any]: + """ + Perform intelligent email search using RAG system with embeddings. + This method: + 1. Retrieves emails from the user's mailboxes (filtered by folder) + 2. Checks if emails are already indexed in the RAG system + 3. Indexes any new emails + 4. Uses the query to retrieve the most relevant emails + + Args: + user_id: UUID of the user + user_query: User's natural language search query + api_client: Albert API client instance + max_results: Maximum number of results to return + folder: Folder to search in ('all', 'sent', 'draft', 'trash') + + Returns: + Dictionary containing search results and metadata + """ + search_start_time = time.time() + self.logger.info(f"Starting intelligent email search with RAG - user_id: {user_id}, query: '{user_query[:100]}...', folder: '{folder}'") + + try: + # Step 1: Get email context with folder filtering + self.logger.info("Step 1: Fetching email context") + context_start_time = time.time() + if email_utils is None: + self.logger.error("email_utils not available") + return { + "success": False, + "error": "email_utils not available", + "results": [], + "search_method": "rag", + "total_searched": 0, + "processing_time": time.time() - search_start_time + } + context_result = email_utils.get_email_context_for_chatbot(user_id) + context_time = time.time() - context_start_time + + if not context_result["success"]: + self.logger.error(f"Failed to get email context: {context_result.get('error', 'Unknown error')}") + return { + "success": False, + "error": f"Failed to retrieve emails: {context_result.get('error', 'Unknown error')}", + "results": [], + "search_method": "none", + "total_searched": 0, + "processing_time": time.time() - search_start_time + } + + context_emails = context_result["emails"] + self.logger.info(f"Step 1 completed: Retrieved {len(context_emails)} emails for search in {context_time:.2f}s") + self.logger.info(f"📊 EMAIL CONTEXT PERFORMANCE: {len(context_emails)} emails retrieved in {context_time:.2f}s ({len(context_emails)/context_time:.1f} emails/sec)") + + if not context_emails: + self.logger.warning("No emails available for search") + return { + "success": True, + "results": [], + "message": "No emails found to search", + "search_method": "none", + "total_searched": 0, + "processing_time": time.time() - search_start_time + } + + # Step 2: Set user-specific collection and check if it exists + self.logger.info("Step 2: Setting user-specific RAG collection") + rag_system.set_user_collection(user_id) + + if not rag_system.collection_id: + self.logger.info("Step 2: Creating new RAG collection for user") + rag_system.create_collection() + self.logger.info(f"Step 2 completed: Created RAG collection: {rag_system.collection_id}") + else: + self.logger.info(f"Step 2: Using existing RAG collection: {rag_system.collection_id}") + self.logger.info(f"Step 2: Found {len(rag_system.indexed_email_ids)} previously indexed emails") + + # Step 3: Prepare emails for indexing - format for RAG system + self.logger.info("Step 3: Preparing emails for RAG indexing") + emails_for_rag = [] + + # Process ALL emails, not just a limited batch + # Remove the batch limitation to ensure all emails are indexed + emails_to_process = context_emails + + self.logger.info(f"Processing ALL {len(emails_to_process)} emails for RAG indexing") + + # Track attachment statistics for RAG indexing + total_attachments_for_rag = 0 + emails_with_attachments_for_rag = 0 + + # Keep track of email IDs to ensure we're only processing unique emails + for email in emails_to_process: + email_id = email['id'] + + # Create a complete email document for RAG indexing + # We include the subject in the body to ensure it's part of the embedding + full_content = f"Subject: {email.get('subject', '')}\n\n" + full_content += f"From: {email.get('sender', {}).get('name', '')} <{email.get('sender', {}).get('email', '')}>\n" + + # Add recipients information + recipients = email.get('recipients', {}) + if recipients.get('to'): + to_emails = [f"{r.get('name', '')} <{r.get('email', '')}>".strip() for r in recipients.get('to', [])] + full_content += f"To: {', '.join(to_emails)}\n" + + if recipients.get('cc'): + cc_emails = [f"{r.get('name', '')} <{r.get('email', '')}>".strip() for r in recipients.get('cc', [])] + full_content += f"CC: {', '.join(cc_emails)}\n" + + # Add date information + full_content += f"Date: {email.get('sent_at', '')}\n\n" + + # Add the actual content + full_content += email.get('content', '') + + # Add attachment information + if email.get('attachment_count', 0) > 0: + attachment_names = [att.get('name', '') for att in email.get('attachments', []) if att.get('name', '').strip()] + if attachment_names: + full_content += f"\n\nFiles attached: {', '.join(attachment_names)}" + self.logger.debug(f"Email {email.get('id')} RAG content includes attachments: {', '.join(attachment_names)}") + emails_with_attachments_for_rag += 1 + total_attachments_for_rag += len(attachment_names) + else: + self.logger.debug(f"Email {email.get('id')} has attachment count {email.get('attachment_count')} but no valid attachment names") + + # Add to the RAG indexing list with metadata + emails_for_rag.append({ + 'id': email_id, + 'body': full_content, + 'metadata': { + 'subject': email.get('subject', ''), + 'sender': email.get('sender', {}).get('email', ''), + 'date': email.get('sent_at', ''), + 'thread_id': email.get('thread_id', '') + } + }) + + # Log attachment statistics for RAG indexing + self.logger.info(f"📎 RAG ATTACHMENT STATS: {emails_with_attachments_for_rag} emails with attachments out of {len(emails_to_process)} total emails") + self.logger.info(f"📎 RAG TOTAL ATTACHMENTS: {total_attachments_for_rag} attachments across all emails for RAG indexing") + + self.logger.info(f"Step 3 completed: Prepared {len(emails_for_rag)} emails for RAG indexing") + + # Step 4: Index emails in RAG system (with smart reindexing) + self.logger.info("Step 4: Checking if email indexing is needed") + indexing_start_time = time.time() + try: + rag_system.index_emails(emails_for_rag) + indexing_time = time.time() - indexing_start_time + self.logger.info(f"Step 4 completed: Email indexing process finished in {indexing_time:.2f}s") + self.logger.info(f"📊 RAG INDEXING PERFORMANCE: {len(emails_for_rag)} emails indexed in {indexing_time:.2f}s ({len(emails_for_rag)/indexing_time:.1f} emails/sec)") + except Exception as index_error: + indexing_time = time.time() - indexing_start_time + self.logger.error(f"Failed to index emails in RAG system: {index_error}", exc_info=True) + + # Check if this is a complete failure or partial failure + if "Failed to index any emails" in str(index_error): + # Complete failure - return error + return { + "success": False, + "error": f"Failed to index emails: {str(index_error)}", + "results": [], + "search_method": "rag_error", + "total_searched": len(context_emails), + "processing_time": time.time() - search_start_time + } + else: + # Partial failure - log warning but continue + self.logger.warning(f"Some emails failed to index, but continuing with partial collection: {index_error}") + # Continue to query step + + # Step 5: Query the RAG system with the user's query + self.logger.info(f"Step 5: Querying RAG system with user query: '{user_query}'") + query_time = 0.0 # Initialize query_time for error cases + try: + query_start_time = time.time() + relevant_contents = rag_system.query_emails(user_query, k=max_results) + query_time = time.time() - query_start_time + + self.logger.info(f"Step 5 completed: Retrieved {len(relevant_contents)} relevant emails in {query_time:.2f}s") + self.logger.info(f"📊 RAG QUERY PERFORMANCE: {len(relevant_contents)} results retrieved in {query_time:.2f}s from {len(context_emails)} total emails") + + # Log detailed scores for all retrieved emails + if relevant_contents: + self.logger.info("=== RAG RETRIEVAL SCORES ===") + for i, result in enumerate(relevant_contents): + score = result.get("score", 0.0) + document_name = result.get("metadata", {}).get("document_name", "unknown") + content_preview = result.get("content", "")[:100].replace('\n', ' ') + self.logger.info(f" #{i+1:2d}: Score {score:.4f} | {document_name} | Preview: {content_preview}...") + + scores = [result.get("score", 0.0) for result in relevant_contents] + avg_score = sum(scores) / len(scores) + min_score = min(scores) + max_score = max(scores) + self.logger.info(f"Score Statistics: Max={max_score:.4f}, Min={min_score:.4f}, Avg={avg_score:.4f}") + self.logger.info("=== END RAG SCORES ===") + + if not relevant_contents: + self.logger.warning("RAG query returned no results") + return { + "success": True, + "results": [], + "message": "No relevant emails found", + "search_method": "rag", + "total_searched": len(context_emails), + "processing_time": time.time() - search_start_time + } + + # Step 6: Match RAG results to original emails and format for frontend + self.logger.info("Step 6: Matching RAG results to original emails") + + # Create email lookup by ID + email_lookup = {email['id']: email for email in context_emails} + + # Calculate dynamic score threshold based on highest score + if relevant_contents: + scores = [result.get("score", 0.0) for result in relevant_contents] + highest_score = max(scores) if scores else 0.0 + dynamic_threshold = highest_score * rag_system.config.min_relevance_score_percentage + self.logger.info(f"Dynamic threshold: {dynamic_threshold:.3f} ({rag_system.config.min_relevance_score_percentage:.1%} of highest score {highest_score:.3f})") + else: + dynamic_threshold = 0.0 + + # Format results for frontend + formatted_results = [] + + for i, result in enumerate(relevant_contents): + content = result.get("content", "") + metadata = result.get("metadata", {}) + document_name = metadata.get("document_name", "") + score = result.get("score", 0.0) + + matched_email_id = None + + # Method 1: Extract email ID from document filename + # The filename format is email_{i}_{email_id}.txt + if document_name: + self.logger.debug(f"Trying to match document: {document_name}") + # Extract email ID from filename like "email_5_12345abc-def0-1234-abcd-123456789abc.txt" + if document_name.startswith("email_") and document_name.endswith(".txt"): + # Remove the .txt extension and split by underscore + base_name = document_name.replace('.txt', '') + parts = base_name.split('_') + if len(parts) >= 3: + # The email ID should be everything after the second underscore + # In case the email ID itself contains underscores + potential_email_id = '_'.join(parts[2:]) + if potential_email_id in email_lookup: + matched_email_id = potential_email_id + self.logger.debug(f"✅ Matched email via filename: {document_name} -> {matched_email_id}") + else: + self.logger.debug(f"❌ Email ID '{potential_email_id}' from filename not found in lookup") + else: + self.logger.debug(f"❌ Filename format unexpected: {document_name}") + else: + self.logger.debug(f"❌ Filename doesn't match expected pattern: {document_name}") + + # Method 2: Search for email ID directly in the metadata + if not matched_email_id and metadata: + # Check if there's an email ID in the metadata + if 'email_id' in metadata and metadata['email_id'] in email_lookup: + matched_email_id = metadata['email_id'] + self.logger.debug(f"✅ Matched email via metadata: {matched_email_id}") + + # Method 3: Search for email ID in content + if not matched_email_id: + for email_id in email_lookup.keys(): + if email_id in content: + matched_email_id = email_id + self.logger.debug(f"✅ Matched email via content search: {matched_email_id}") + break + + # Method 4: Fuzzy matching based on subject and sender + if not matched_email_id: + best_match_id = None + best_match_score = 0 + + # Extract subject from content (it should be at the beginning) + content_lines = content.split('\n') + content_subject = "" + content_sender = "" + + for line in content_lines[:5]: # Check first few lines + if line.startswith("Subject: "): + content_subject = line.replace("Subject: ", "").strip() + elif line.startswith("From: "): + content_sender = line.replace("From: ", "").strip() + + if content_subject or content_sender: + for email_id, email in email_lookup.items(): + match_score = 0 + + # Score based on subject similarity + if content_subject and email.get('subject'): + subject_words_content = set(content_subject.lower().split()) + subject_words_email = set(email.get('subject', '').lower().split()) + subject_overlap = len(subject_words_content & subject_words_email) + if subject_overlap > 0: + match_score += subject_overlap * 10 + + # Score based on sender similarity + if content_sender and email.get('sender', {}).get('email'): + if email.get('sender', {}).get('email').lower() in content_sender.lower(): + match_score += 20 + + if match_score > best_match_score: + best_match_score = match_score + best_match_id = email_id + + if best_match_score > 15: # Threshold for a good match + matched_email_id = best_match_id + self.logger.debug(f"✅ Matched email via fuzzy matching: {matched_email_id} (score: {best_match_score})") + + if matched_email_id and matched_email_id in email_lookup: + email = email_lookup[matched_email_id] + + # Use the Albert API score if available, otherwise calculate based on position + relevance_score = score if score > 0 else (1.0 - (i / max(len(relevant_contents), 1))) + + # Apply dynamic score threshold filtering - only include results above percentage of highest score + if relevance_score < dynamic_threshold: + self.logger.info(f"🚫 FILTERED: Email {matched_email_id[:8]}... | Score: {relevance_score:.4f} < Threshold: {dynamic_threshold:.4f} | Subject: {email.get('subject', 'No subject')[:50]}...") + continue + + self.logger.info(f"✅ INCLUDED: Email {matched_email_id[:8]}... | Score: {relevance_score:.4f} ≥ Threshold: {dynamic_threshold:.4f} | Subject: {email.get('subject', 'No subject')[:50]}...") + + # Format result for frontend compatibility + formatted_result = { + 'id': matched_email_id, # Frontend expects 'id' + 'message_id': matched_email_id, + 'thread_id': email.get('thread_id', ''), + 'subject': email.get('subject', ''), + 'from': email.get('sender', {}).get('email', ''), # Frontend expects 'from' + 'sender': { # Frontend expects 'sender' object + 'email': email.get('sender', {}).get('email', ''), + 'name': email.get('sender', {}).get('name', '') + }, + 'sender_name': email.get('sender', {}).get('name', ''), + 'sender_email': email.get('sender', {}).get('email', ''), + 'date': email.get('sent_at'), # Frontend expects 'date' + 'sent_at': email.get('sent_at'), + 'snippet': email.get('content', '')[:200] if email.get('content') else '', # Frontend expects 'snippet' + 'is_unread': email.get('flags', {}).get('is_unread', False), + 'is_starred': email.get('flags', {}).get('is_starred', False), + 'thread_subject': email.get('thread_subject', ''), + 'relevance_score': relevance_score, + 'ai_reason': f"Semantically relevant to query: '{user_query}' (score: {relevance_score:.3f}, threshold: {dynamic_threshold:.3f})", + # Additional metadata for debugging + 'attachment_count': email.get('attachment_count', 0), + 'content_preview': email.get('content', '')[:200] if email.get('content') else '', + 'search_method': 'rag' + } + formatted_results.append(formatted_result) + self.logger.debug(f"Added result for email {matched_email_id} with score {relevance_score:.3f}") + else: + self.logger.warning(f"❌ Could not match RAG result to an email in context") + self.logger.warning(f"📝 Content preview: {content[:200]}...") + self.logger.warning(f"📁 Document name: {document_name}") + self.logger.warning(f"🏷️ Metadata: {metadata}") + # Log available email IDs for debugging + self.logger.warning(f"📋 Available email IDs (first 5): {list(email_lookup.keys())[:5]}...") + + # Try to extract any UUID-like pattern from content + import re + uuid_pattern = r'[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}' + found_uuids = re.findall(uuid_pattern, content) + if found_uuids: + self.logger.warning(f"🔍 Found UUIDs in content: {found_uuids[:3]}...") + for uuid in found_uuids: + if uuid in email_lookup: + self.logger.warning(f"🎯 UUID {uuid} found in email lookup! This should have been matched.") + break + + # Deduplicate results by email ID to avoid duplicate emails + original_count = len(formatted_results) + seen_ids = set() + deduplicated_results = [] + for result in formatted_results: + email_id = result.get('id') + if email_id and email_id not in seen_ids: + seen_ids.add(email_id) + deduplicated_results.append(result) + + if original_count != len(deduplicated_results): + self.logger.info(f"🔍 RAG DEDUPLICATION: Removed {original_count - len(deduplicated_results)} duplicate results") + self.logger.info(f" • Before deduplication: {original_count} results") + self.logger.info(f" • After deduplication: {len(deduplicated_results)} unique results") + + formatted_results = deduplicated_results + + # Log filtering summary + total_rag_results = len(relevant_contents) + filtered_count = total_rag_results - len(formatted_results) + self.logger.info("=== FILTERING SUMMARY ===") + self.logger.info(f"📊 RAG Retrieved: {total_rag_results} emails") + self.logger.info(f"✅ Passed Filter: {len(formatted_results)} emails") + self.logger.info(f"🚫 Filtered Out: {filtered_count} emails") + self.logger.info(f"📈 Filter Rate: {(filtered_count/total_rag_results*100):.1f}% removed") + self.logger.info(f"🎯 Threshold Used: {dynamic_threshold:.4f} ({rag_system.config.min_relevance_score_percentage:.0%} of {max([result.get('score', 0.0) for result in relevant_contents]):.4f})") + if formatted_results: + final_scores = [r.get('relevance_score', 0) for r in formatted_results] + self.logger.info(f"📋 Final Score Range: {min(final_scores):.4f} - {max(final_scores):.4f}") + self.logger.info("=== END FILTERING ===") + + self.logger.info(f"Step 6 completed: Matched and formatted {len(formatted_results)} email results (after dynamic score filtering with threshold {dynamic_threshold:.3f})") + + search_time = time.time() - search_start_time + self.logger.info(f"📊 RAG TOTAL PERFORMANCE SUMMARY:") + self.logger.info(f" • Total Search Time: {search_time:.2f}s") + self.logger.info(f" • Email Context Time: {context_time:.2f}s ({(context_time/search_time*100):.1f}%)") + self.logger.info(f" • Indexing Time: {indexing_time:.2f}s ({(indexing_time/search_time*100):.1f}%)") + self.logger.info(f" • Query Time: {query_time:.2f}s ({(query_time/search_time*100):.1f}%)") + self.logger.info(f" • Processing Time: {(search_time-context_time-indexing_time-query_time):.2f}s ({((search_time-context_time-indexing_time-query_time)/search_time*100):.1f}%)") + self.logger.info(f" • Emails Processed: {len(context_emails)}") + self.logger.info(f" • Results Returned: {len(formatted_results)}") + + return { + "success": True, + "results": formatted_results, + "search_method": "rag", + "total_searched": len(context_emails), + "total_matched": len(relevant_contents), + "results_after_filtering": len(formatted_results), + "dynamic_score_threshold": dynamic_threshold, + "highest_score": max([result.get("score", 0.0) for result in relevant_contents]) if relevant_contents else 0.0, + "score_threshold_percentage": rag_system.config.min_relevance_score_percentage, + "processing_time": search_time + } + + except Exception as query_error: + self.logger.error(f"Failed to query RAG system: {query_error}", exc_info=True) + return { + "success": False, + "error": f"Failed to query emails: {str(query_error)}", + "results": [], + "search_method": "rag_error", + "total_searched": len(context_emails), + "processing_time": time.time() - search_start_time + } + + except Exception as e: + total_search_time = time.time() - search_start_time + self.logger.error(f"Error in chatbot_intelligent_email_search with RAG: {e}", exc_info=True) + return { + "success": False, + "error": f"Search failed: {str(e)}", + "results": [], + "search_method": "error", + "total_searched": 0, + "processing_time": total_search_time + } \ No newline at end of file diff --git a/src/backend/deep_search/email_utils.py b/src/backend/deep_search/email_utils.py new file mode 100644 index 00000000..249cef38 --- /dev/null +++ b/src/backend/deep_search/email_utils.py @@ -0,0 +1,467 @@ +""" +Email utilities for chatbot integration. + +This module provides common email retrieval and parsing functions that can be shared +between different email service implementations (RAG and contextual search). +""" + +import re +import html +import time +import logging +from typing import Dict, List, Any +from django.contrib.auth import get_user_model + +from core import models + +logger = logging.getLogger(__name__) +User = get_user_model() + + +class EmailUtils: + """Utility class for common email operations.""" + + def __init__(self): + """Initialize the email utilities.""" + self.logger = logger + + def get_user_accessible_mailboxes(self, user_id: str) -> List[models.Mailbox]: + """ + Get all mailboxes that a user has access to. + + Args: + user_id: UUID of the user + + Returns: + List of Mailbox objects the user can access + """ + try: + self.logger.info(f"Getting accessible mailboxes for user_id: {user_id}") + + user = User.objects.get(id=user_id) + self.logger.debug(f"Found user: {user.email if hasattr(user, 'email') else user}") + + mailboxes = models.Mailbox.objects.filter( + accesses__user=user + ).select_related('domain', 'contact').order_by("-created_at") + + mailbox_count = mailboxes.count() + self.logger.info(f"Found {mailbox_count} accessible mailboxes for user {user}") + + if mailbox_count > 0: + mailbox_list = list(mailboxes) + self.logger.debug(f"Mailbox IDs: {[str(mb.id) for mb in mailbox_list[:5]]}") + return mailbox_list + else: + self.logger.warning(f"No accessible mailboxes found for user {user}") + return [] + + except User.DoesNotExist: + self.logger.error(f"User with ID {user_id} not found") + return [] + except Exception as e: + self.logger.error(f"Error retrieving mailboxes for user {user_id}: {e}", exc_info=True) + return [] + + def get_recent_messages(self, mailboxes: List[models.Mailbox], limit: int = None) -> List[models.Message]: + """ + Get messages from the given mailboxes. + + Args: + mailboxes: List of Mailbox objects + limit: Maximum number of messages to retrieve (default: None for all messages) + + Returns: + List of Message objects + """ + try: + limit_text = f"limit: {limit}" if limit else "all messages" + self.logger.info(f"Getting messages from {len(mailboxes)} mailboxes, {limit_text}") + + if not mailboxes: + self.logger.warning("No mailboxes provided to get_recent_messages") + return [] + + mailbox_ids = [mb.id for mb in mailboxes] + self.logger.debug(f"Searching messages in mailbox IDs: {mailbox_ids[:5]}") + + messages_query = models.Message.objects.filter( + thread__accesses__mailbox__id__in=mailbox_ids, + is_draft=False, + is_trashed=False + ).select_related( + 'sender', 'thread' + ).prefetch_related( + 'attachments', + 'attachments__blob' + ).order_by('-created_at') + + # Log attachment counts for debugging + self.logger.info(f"Checking attachment counts in database query...") + total_messages_with_attachments = models.Message.objects.filter( + thread__accesses__mailbox__id__in=mailbox_ids, + is_draft=False, + is_trashed=False, + attachments__isnull=False + ).distinct().count() + self.logger.info(f"Total messages with attachments in DB: {total_messages_with_attachments}") + + total_attachments_count = models.Attachment.objects.filter( + messages__thread__accesses__mailbox__id__in=mailbox_ids, + messages__is_draft=False, + messages__is_trashed=False + ).count() + self.logger.info(f"Total attachments in DB for these messages: {total_attachments_count}") + + # Apply limit only if specified + if limit is not None: + messages_query = messages_query[:limit] + + message_list = list(messages_query) + message_count = len(message_list) + + # Additional debugging: Check attachment relationships + if message_count > 0: + sample_msg = message_list[0] + self.logger.debug(f"Sample message {sample_msg.id} attachment count check:") + self.logger.debug(f" - .attachments.all().count(): {sample_msg.attachments.all().count()}") + self.logger.debug(f" - .attachments.exists(): {sample_msg.attachments.exists()}") + + # Check if there are attachments in the mailboxes at all + attachment_check = models.Attachment.objects.filter( + mailbox__id__in=mailbox_ids + ).count() + self.logger.info(f"Total attachments in these mailboxes: {attachment_check}") + + self.logger.info(f"Retrieved {message_count} messages from {len(mailboxes)} mailboxes") + + if message_count > 0: + first_msg = message_list[0] + last_msg = message_list[-1] + self.logger.debug(f"Date range: {last_msg.created_at} to {first_msg.created_at}") + self.logger.debug(f"Sample message IDs: {[str(msg.id) for msg in message_list[:3]]}") + else: + self.logger.warning("No messages found in the provided mailboxes") + + return message_list + + except Exception as e: + self.logger.error(f"Error retrieving recent messages: {e}", exc_info=True) + return [] + + def get_parsed_message_content(self, message: models.Message) -> str: + """ + Get parsed text content from a message for chatbot processing. + + Args: + message: Message object + + Returns: + String containing the parsed text content + """ + try: + self.logger.debug(f"Parsing message {message.id} - {message.subject}") + + parsed_data = message.get_parsed_data() + self.logger.debug(f"Parsed data keys: {list(parsed_data.keys())}") + + text_content = "" + text_body = parsed_data.get('textBody', []) + html_body = parsed_data.get('htmlBody', []) + self.logger.debug(f"Text body parts: {len(text_body)}, HTML body parts: {len(html_body)}") + + if text_body: + for text_part in text_body: + if isinstance(text_part, dict) and 'content' in text_part: + text_content += text_part['content'] + "\n" + elif isinstance(text_part, str): + text_content += text_part + "\n" + + if not text_content and html_body: + self.logger.debug(f"No text body, extracting from HTML") + for html_part in html_body: + if isinstance(html_part, dict) and 'content' in html_part: + html_content = html_part['content'] + clean_text = re.sub(r'<[^>]+>', '', html_content) + clean_text = html.unescape(clean_text) + text_content += clean_text + "\n" + + text_content = text_content.strip() + + self.logger.debug(f"Extracted {len(text_content)} characters of text content") + return text_content + + except Exception as e: + self.logger.error(f"Error parsing message content for {message}: {e}", exc_info=True) + return "" + + def get_parsed_message_details(self, message: models.Message) -> Dict[str, Any]: + """ + Get parsed content details from a message using the model's built-in parsing methods. + + Args: + message: Message object + + Returns: + Dictionary with parsed message content and metadata + """ + try: + self.logger.debug(f"Parsing message details {message.id} - {message.subject}") + + # Check if message has blob with attachments data + if hasattr(message, 'blob') and message.blob: + self.logger.debug(f"Message {message.id} has blob: {message.blob.id}") + + parsed_data = message.get_parsed_data() + self.logger.debug(f"Parsed data keys: {list(parsed_data.keys())}") + + # Check if parsed data contains attachment information + parsed_attachments = parsed_data.get('attachments', []) + if parsed_attachments: + self.logger.debug(f"Found {len(parsed_attachments)} attachments in parsed data") + + text_body = parsed_data.get('textBody', []) + html_body = parsed_data.get('htmlBody', []) + self.logger.debug(f"Text body parts: {len(text_body)}, HTML body parts: {len(html_body)}") + + attachments = [] + attachment_queryset = message.attachments.all() + attachment_count_raw = attachment_queryset.count() + self.logger.debug(f"Message {message.id}: Raw attachment count from queryset: {attachment_count_raw}") + + # If no attachments from the relationship, check if they're in parsed data + if attachment_count_raw == 0 and parsed_attachments: + self.logger.warning(f"Message {message.id}: No attachments in relationship but {len(parsed_attachments)} in parsed data") + # Try to use parsed attachments as fallback + for parsed_att in parsed_attachments: + attachments.append({ + 'name': parsed_att.get('filename', parsed_att.get('name', 'Unknown')), + 'size': parsed_att.get('size', 0), + 'content_type': parsed_att.get('content_type', parsed_att.get('contentType', 'application/octet-stream')), + 'sha256': None, + 'source': 'parsed_data' + }) + else: + # Use the relationship-based attachments + for attachment in attachment_queryset: + try: + attachment_data = { + 'name': attachment.name, + 'size': attachment.size, + 'content_type': attachment.content_type, + 'sha256': attachment.sha256.hex() if attachment.sha256 else None, + 'source': 'relationship' + } + attachments.append(attachment_data) + self.logger.debug(f"Attachment: name='{attachment.name}', size={attachment.size}, type='{attachment.content_type}'") + except Exception as e: + self.logger.error(f"Error retrieving attachment data for {attachment}: {e}", exc_info=True) + # Fallback data in case of error + attachments.append({ + 'name': getattr(attachment, 'name', 'Unknown'), + 'size': 0, + 'content_type': 'application/octet-stream', + 'sha256': None, + 'error': str(e), + 'source': 'relationship_error' + }) + + # Additional debugging: Check has_attachments flag vs actual attachments + if hasattr(message, 'has_attachments'): + flag_has_attachments = message.has_attachments + actual_has_attachments = len(attachments) > 0 + if flag_has_attachments != actual_has_attachments: + self.logger.warning(f"Message {message.id}: has_attachments flag ({flag_has_attachments}) doesn't match actual attachments ({actual_has_attachments})") + + if attachment_count_raw != len(attachments): + self.logger.warning(f"Message {message.id}: Attachment count mismatch! Raw count: {attachment_count_raw}, Processed count: {len(attachments)}") + + self.logger.debug(f"Found {len(attachments)} attachments for message {message.id}") + if attachments: + attachment_names = [att.get('name', 'unnamed') for att in attachments] + self.logger.debug(f"Attachment names: {attachment_names}") + + recipients_by_type = message.get_all_recipient_contacts() + recipients = { + 'to': [{'name': contact.name, 'email': contact.email} + for contact in recipients_by_type.get('to', [])], + 'cc': [{'name': contact.name, 'email': contact.email} + for contact in recipients_by_type.get('cc', [])], + 'bcc': [{'name': contact.name, 'email': contact.email} + for contact in recipients_by_type.get('bcc', [])] + } + self.logger.debug(f"Recipients - To: {len(recipients['to'])}, CC: {len(recipients['cc'])}, BCC: {len(recipients['bcc'])}") + + result = { + 'subject': message.subject, + 'sender': { + 'name': message.sender.name, + 'email': message.sender.email + }, + 'sent_at': message.sent_at, + 'text_body': text_body, + 'html_body': html_body, + 'attachments': attachments, + 'recipients': recipients, + 'is_draft': message.is_draft, + 'is_unread': message.is_unread, + 'is_starred': message.is_starred, + 'thread_id': str(message.thread.id), + 'message_id': str(message.id), + } + + self.logger.debug(f"Successfully parsed message {message.id}") + return result + + except Exception as e: + self.logger.error(f"Error parsing message content for {message}: {e}", exc_info=True) + return { + 'subject': message.subject, + 'error': str(e) + } + + def get_email_context_for_chatbot(self, user_id: str) -> Dict[str, Any]: + """ + Main function to gather email context for the chatbot. + Gets ALL emails with all available information. + + Args: + user_id: UUID of the user + + Returns: + Dictionary containing email context and metadata + """ + start_time = time.time() + self.logger.info(f"Getting email context for chatbot - user_id: {user_id}") + + try: + # Step 1: Get accessible mailboxes + self.logger.info("Step 1: Getting accessible mailboxes") + mailboxes = self.get_user_accessible_mailboxes(user_id) + + if not mailboxes: + self.logger.warning(f"No accessible mailboxes found for user {user_id}") + return { + "success": False, + "error": "No accessible mailboxes found", + "emails": [], + "total_emails": 0, + "mailbox_count": 0, + "processing_time": time.time() - start_time + } + + self.logger.info(f"Step 1 completed: Found {len(mailboxes)} accessible mailboxes") + + # Step 2: Get recent messages with full details + self.logger.info("Step 2: Getting ALL messages with full details") + recent_message_objects = self.get_recent_messages(mailboxes, limit=None) + + if not recent_message_objects: + self.logger.warning(f"No messages found in mailboxes for user {user_id}") + return { + "success": False, + "error": "No messages found", + "emails": [], + "total_emails": 0, + "mailbox_count": len(mailboxes), + "processing_time": time.time() - start_time + } + + self.logger.info(f"Step 2 completed: Retrieved {len(recent_message_objects)} messages") + + # Step 3: Extract complete email information + self.logger.info("Step 3: Extracting complete email information") + emails_with_full_info = [] + + for i, message in enumerate(recent_message_objects): + if i < 10: + self.logger.debug(f"Processing message {i+1}: ID={message.id}, subject={message.subject[:50]}...") + + parsed_details = self.get_parsed_message_details(message) + text_content = self.get_parsed_message_content(message) + + # Use attachments from parsed_details to avoid duplication + attachments = parsed_details.get('attachments', []) + + # Log attachment information for debugging + if attachments: + attachment_names = [att.get('name', 'unnamed') for att in attachments] + self.logger.info(f"Message {message.id} has {len(attachments)} attachments: {attachment_names}") + else: + self.logger.debug(f"Message {message.id} has no attachments") + + email_data = { + "id": str(message.id), + "subject": message.subject, + "content": text_content, + "sender": { + "name": message.sender.name, + "email": message.sender.email + }, + "recipients": parsed_details.get('recipients', {}), + "sent_at": message.sent_at.isoformat() if message.sent_at else None, + "created_at": message.created_at.isoformat(), + "thread_id": str(message.thread.id), + "thread_subject": message.thread.subject, + "attachments": attachments, + "attachment_count": len(attachments), + "flags": { + "is_unread": message.is_unread, + "is_starred": message.is_starred, + "is_draft": message.is_draft, + "is_trashed": message.is_trashed, + "is_spam": message.is_spam, + "is_archived": message.is_archived, + "is_sender": message.is_sender + } + } + + emails_with_full_info.append(email_data) + + # Log attachment summary + total_attachments_found = sum(len(email.get('attachments', [])) for email in emails_with_full_info) + emails_with_attachments = sum(1 for email in emails_with_full_info if email.get('attachments')) + self.logger.info(f"ATTACHMENT SUMMARY: {emails_with_attachments} emails with attachments out of {len(emails_with_full_info)} total emails") + self.logger.info(f"ATTACHMENT SUMMARY: {total_attachments_found} total attachments found across all emails") + + if emails_with_attachments > 0: + sample_attachments = [] + for email in emails_with_full_info[:10]: # Check first 10 emails + if email.get('attachments'): + for att in email['attachments'][:3]: # Show first 3 attachments per email + sample_attachments.append(f"{email['id'][:8]}...:{att.get('name', 'unnamed')}") + self.logger.info(f"SAMPLE ATTACHMENTS: {sample_attachments}") + + self.logger.info(f"Step 3 completed: Processed {len(emails_with_full_info)} emails with full information") + + processing_time = time.time() - start_time + + result = { + "success": True, + "emails": emails_with_full_info, + "total_emails": len(emails_with_full_info), + "mailbox_count": len(mailboxes), + "processing_time": processing_time + } + + self.logger.info(f"Email context retrieval completed successfully: " + f"{len(emails_with_full_info)} emails retrieved, " + f"processing time: {processing_time:.2f}s") + + return result + + except Exception as e: + processing_time = time.time() - start_time + self.logger.error(f"Error in get_email_context_for_chatbot for user {user_id}: {e}", exc_info=True) + return { + "success": False, + "error": f"Error retrieving email context: {str(e)}", + "emails": [], + "total_emails": 0, + "mailbox_count": 0, + "processing_time": processing_time + } + + +# Create a singleton instance of the email utilities +email_utils = EmailUtils() diff --git a/src/backend/deep_search/exceptions.py b/src/backend/deep_search/exceptions.py new file mode 100644 index 00000000..1f327f01 --- /dev/null +++ b/src/backend/deep_search/exceptions.py @@ -0,0 +1,22 @@ +""" +Custom exceptions for the chatbot module. +""" + + +class ChatbotError(Exception): + """Base exception for chatbot-related errors.""" + pass + +class AlbertAPIError(ChatbotError): + """Exception raised when Albert API requests fail.""" + pass + + +class ConfigurationError(ChatbotError): + """Exception raised when there are configuration issues.""" + pass + + +class EmailRetrievalError(ChatbotError): + """Exception raised when email retrieval operations fail.""" + pass diff --git a/src/backend/deep_search/ideas.txt b/src/backend/deep_search/ideas.txt new file mode 100644 index 00000000..c701951d --- /dev/null +++ b/src/backend/deep_search/ideas.txt @@ -0,0 +1,15 @@ +- mesure de try except pour retenter des calls api qui échouent +- faire une recherche au fur et à mesure dans les mails, 50 par 50 +- rechercher dans les divers caractéristiques des mails petit à petit : le titre, +les pièces jointes, etc. +- faire du tool calling pour décider de la façon dont chercher le mail, si la recherche doit +être globale, on cherche sur tous les titres machins, si c'est un mail / une pièce jointe en +particulier, on cherche dans les mails 50 par 50 (tri particulier à faire avant ?) +- faire des trucs algorithmes, arbres binaires de recherche ? + +- faire un historique de recherche en cache pour éviter + +à faire : se renseigner sur elastic search + +le score est trop random +la recherche ne se fait pas dans "chercher dans" \ No newline at end of file diff --git a/src/backend/deep_search/parsers.py b/src/backend/deep_search/parsers.py new file mode 100644 index 00000000..d3cad9da --- /dev/null +++ b/src/backend/deep_search/parsers.py @@ -0,0 +1,67 @@ +""" +Content parsing utilities for the chatbot. +""" + +import re +import logging +from typing import Dict, List, Optional, Any + +logger = logging.getLogger(__name__) + + +class ContentParser: + """Handles parsing and processing of email content.""" + + def __init__(self): + """Initialize the content parser.""" + self.email_regex = re.compile(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b') + self.phone_regex = re.compile(r'(?:\+33|0)[1-9](?:[0-9]{8})') + + def extract_emails(self, text: str) -> List[str]: + """Extract email addresses from text.""" + if not text: + return [] + return self.email_regex.findall(text) + + def extract_phone_numbers(self, text: str) -> List[str]: + """Extract phone numbers from text.""" + if not text: + return [] + return self.phone_regex.findall(text) + + def clean_content(self, content: str) -> str: + """Clean and normalize email content.""" + if not content: + return "" + + # Remove excessive whitespace + content = re.sub(r'\s+', ' ', content) + + # Remove common email signatures patterns + content = re.sub(r'--\s*\n.*', '', content, flags=re.DOTALL) + + return content.strip() + + def extract_keywords(self, content: str, max_keywords: int = 10) -> List[str]: + """Extract important keywords from content.""" + if not content: + return [] + + # Simple keyword extraction - can be enhanced with NLP + words = re.findall(r'\b[a-zA-ZÀ-ÿ]{3,}\b', content.lower()) + + # Remove common French stop words + stop_words = { + 'the', 'and', 'for', 'are', 'but', 'not', 'you', 'all', 'can', 'had', 'her', 'was', 'one', 'our', 'out', 'day', 'get', 'has', 'him', 'his', 'how', 'its', 'may', 'new', 'now', 'old', 'see', 'two', 'who', 'boy', 'did', 'she', 'use', 'way', 'win', 'yet', + 'les', 'des', 'une', 'pour', 'que', 'qui', 'dans', 'avec', 'est', 'sur', 'par', 'tout', 'mais', 'comme', 'aux', 'son', 'ses', 'ces', 'mes', 'nos', 'vos', 'leur', 'leurs', 'dont', 'où', 'quand', 'comment', 'pourquoi' + } + + keywords = [word for word in words if word not in stop_words and len(word) > 3] + + # Count frequency and return most common + word_count = {} + for word in keywords: + word_count[word] = word_count.get(word, 0) + 1 + + sorted_words = sorted(word_count.items(), key=lambda x: x[1], reverse=True) + return [word for word, count in sorted_words[:max_keywords]] diff --git a/src/backend/deep_search/rag.py b/src/backend/deep_search/rag.py new file mode 100644 index 00000000..92a43465 --- /dev/null +++ b/src/backend/deep_search/rag.py @@ -0,0 +1,586 @@ +import os +import json +import requests +import logging +import wget +import uuid +import hashlib +import time +import tempfile + +from .config import AlbertConfig +from .api_client import AlbertAPIClient + +logger = logging.getLogger(__name__) + +class RAGSystem: + """System for Retrieval-Augmented Generation with Albert API.""" + + def __init__(self): + """Initialize the RAG system.""" + self.config = AlbertConfig() + + # Validate that base_url is properly configured + if not self.config.base_url: + raise ValueError("AI_BASE_URL environment variable is not set or is empty. Please configure it with the Albert API base URL.") + + if not self.config.base_url.startswith(('http://', 'https://')): + raise ValueError(f"Invalid base URL '{self.config.base_url}'. URL must start with http:// or https://") + + logger.info(f"RAG System initialized with base URL: {self.config.base_url}") + + self.api_client = AlbertAPIClient(self.config) + self.session = requests.Session() + self.session.headers.update({ + 'Authorization': f'Bearer {self.config.api_key}', + }) + self.collection_name = None # Will be set based on user_id + self.collection_id = None + self.embeddings_model = None + self.indexed_email_ids = set() # Track which emails are already indexed + self.last_indexed_hash = None # Hash of last indexed email set + self.last_index_time = None # Timestamp of last indexing + self._state_file_dir = os.path.join(tempfile.gettempdir(), 'rag_state') + os.makedirs(self._state_file_dir, exist_ok=True) + + def _get_state_file_path(self): + """Get the path to the state file for the current collection.""" + if not self.collection_name: + return None + safe_name = self.collection_name.replace('/', '_').replace('\\', '_') + return os.path.join(self._state_file_dir, f"{safe_name}_state.json") + + def _save_state(self): + """Save the current indexing state to disk.""" + state_file = self._get_state_file_path() + if not state_file: + return + + try: + state = { + 'collection_id': self.collection_id, + 'last_indexed_hash': self.last_indexed_hash, + 'last_index_time': self.last_index_time, + 'indexed_email_ids': list(self.indexed_email_ids) + } + with open(state_file, 'w') as f: + json.dump(state, f) + logger.debug(f"Saved state to {state_file}") + except Exception as e: + logger.warning(f"Failed to save state: {e}") + + def _load_state(self): + """Load the indexing state from disk.""" + state_file = self._get_state_file_path() + if not state_file or not os.path.exists(state_file): + return + + try: + with open(state_file, 'r') as f: + state = json.load(f) + + # Only restore state if collection_id matches + if state.get('collection_id') == self.collection_id: + self.last_indexed_hash = state.get('last_indexed_hash') + self.last_index_time = state.get('last_index_time') + self.indexed_email_ids = set(state.get('indexed_email_ids', [])) + logger.info(f"Restored state: hash={self.last_indexed_hash is not None}, " + f"time={self.last_index_time}, emails={len(self.indexed_email_ids)}") + else: + logger.debug(f"Collection ID mismatch, not loading state") + except Exception as e: + logger.warning(f"Failed to load state: {e}") + + def _get_models(self): + """Get available models from the API using direct HTTP requests.""" + if self.embeddings_model: + return + + # Use direct HTTP requests instead of OpenAI client to avoid compatibility issues + # Following the Albert API documentation pattern + try: + response = self.session.get(f"{self.config.base_url}/models") + response.raise_for_status() + models_data = response.json() + + # Look for embedding model + for model in models_data.get('data', []): + if model.get('type') == "text-embeddings-inference": + self.embeddings_model = model.get('id') + break + + if not self.embeddings_model: + # Fallback to a default embedding model if none found + self.embeddings_model = "embeddings-small" + + except Exception as e: + logger.error(f"Failed to get models from Albert API: {e}") + # Use default embedding model as fallback + self.embeddings_model = "embeddings-small" + + def collection_exists(self, user_id: str) -> bool: + """ + Check if a collection exists for a user without setting up the full state. + This is more efficient for quick checks. + + Args: + user_id: The user ID to check + + Returns: + True if collection exists, False otherwise + """ + try: + collection_name = f"email-collection-user-{user_id}" + logger.debug(f"Checking if collection exists: {collection_name}") + + response = self.session.get(f"{self.config.base_url}/collections") + response.raise_for_status() + collections = response.json().get('data', []) + + for collection in collections: + if collection.get('name') == collection_name: + logger.debug(f"Collection exists for user {user_id}: {collection.get('id')}") + return True + + logger.debug(f"No collection found for user {user_id}") + return False + + except Exception as e: + logger.error(f"Error checking collection existence for user {user_id}: {e}") + return False + + def set_user_collection(self, user_id: str): + """Set the collection name based on user ID for persistence.""" + new_collection_name = f"email-collection-user-{user_id}" + + # Only reset state if we're switching to a different user + if self.collection_name != new_collection_name: + self.collection_name = new_collection_name + # Reset collection_id and indexed_email_ids as we're switching users + self.collection_id = None + self.indexed_email_ids = set() + self.last_indexed_hash = None + self.last_index_time = None + logger.debug(f"Switched to user collection: {self.collection_name}") + + # Check if the collection exists for this user + if self._get_existing_collection(): + logger.debug(f"Found existing collection for {self.collection_name}: {self.collection_id}") + else: + logger.debug(f"No existing collection found for {self.collection_name}") + else: + # Same user, keep existing state + self.collection_name = new_collection_name + logger.debug(f"Keeping existing state for user collection: {self.collection_name}") + + def collection_exists(self, user_id: str = None) -> bool: + """ + Check if a collection exists for a user without fully initializing the RAG system. + + Args: + user_id: Optional user ID. If not provided, uses current collection_name. + + Returns: + True if collection exists, False otherwise + """ + if user_id: + collection_name = f"email-collection-user-{user_id}" + else: + collection_name = self.collection_name + + if not collection_name: + return False + + try: + response = self.session.get(f"{self.config.base_url}/collections") + response.raise_for_status() + collections = response.json().get('data', []) + + for collection in collections: + if collection.get('name') == collection_name: + return True + return False + + except Exception as e: + logger.warning(f"Error checking if collection exists: {e}") + return False + + def _compute_email_hash(self, emails: list[dict]) -> str: + """Compute a hash of the email set to detect changes.""" + email_ids = sorted([email.get('id', '') for email in emails]) + content = '|'.join(email_ids) + return hashlib.md5(content.encode()).hexdigest() + + def _needs_reindexing(self, emails: list[dict]) -> bool: + """Check if reindexing is needed based on email changes and time.""" + current_hash = self._compute_email_hash(emails) + current_time = time.time() + + # Always reindex if no previous hash (first time or collection recreated) + if not self.last_indexed_hash: + logger.info("No previous indexing hash found, indexing needed") + return True + + # Reindex if email set has changed significantly + if current_hash != self.last_indexed_hash: + logger.info("Email set has changed, reindexing needed") + return True + + # Reindex if it's been more than 2 hours since last indexing (to handle any API issues) + if self.last_index_time and (current_time - self.last_index_time) > 7200: + logger.info("More than 2 hours since last indexing, reindexing for freshness") + return True + + logger.info("Email set unchanged and recently indexed, skipping reindexing") + return False + + def _get_existing_collection(self): + """Check if a collection with the current name already exists.""" + try: + logger.debug(f"Fetching collections from {self.config.base_url}/collections") + response = self.session.get(f"{self.config.base_url}/collections") + response.raise_for_status() + collections = response.json().get('data', []) + + logger.debug(f"Found {len(collections)} total collections") + collection_names = [c.get('name', 'unnamed') for c in collections] + logger.debug(f"Collection names: {collection_names}") + + for collection in collections: + if collection.get('name') == self.collection_name: + self.collection_id = collection.get('id') + logger.info(f"Found existing collection '{self.collection_name}' with ID: {self.collection_id}") + self._load_indexed_emails() + return True + + logger.info(f"No existing collection found with name: {self.collection_name}") + return False + except Exception as e: + logger.error(f"Error checking for existing collection: {e}") + return False + + def _load_indexed_emails(self): + """Initialize tracking for the existing collection.""" + if not self.collection_id: + return + + logger.info(f"Collection '{self.collection_name}' exists with ID: {self.collection_id}") + + # Try to load the persistent state + self._load_state() + + # Log what we restored + if self.last_indexed_hash: + logger.info(f"Restored indexing state from previous session") + else: + logger.info(f"No previous indexing state found, will reindex on first run") + + def create_collection(self): + """Create a new collection for emails or use existing one.""" + if not self.collection_name: + raise Exception("Collection name not set. Call set_user_collection() first.") + + self._get_models() + if not self.embeddings_model: + raise Exception("No embedding model found.") + + # Check if collection already exists + logger.info(f"Checking for existing collection: {self.collection_name}") + if self._get_existing_collection(): + logger.info(f"Reusing existing collection: {self.collection_name} (ID: {self.collection_id})") + return + + # Create new collection + logger.info(f"Creating new collection: {self.collection_name}") + response = self.session.post( + f"{self.config.base_url}/collections", + json={"name": self.collection_name, "model": self.embeddings_model} + ) + response.raise_for_status() + self.collection_id = response.json()["id"] + logger.info(f"Collection '{self.collection_name}' created with ID: {self.collection_id}") + + def clear_collection(self): + """Clear all documents from the current collection.""" + if not self.collection_id: + return + + try: + # Clear the collection by deleting it and recreating it + # This is often easier than trying to delete individual documents + response = self.session.delete(f"{self.config.base_url}/collections/{self.collection_id}") + response.raise_for_status() + logger.info(f"Deleted collection {self.collection_id}") + + # Reset state + self.collection_id = None + self.indexed_email_ids = set() + self.last_indexed_hash = None + self.last_index_time = None + + # Recreate the collection + self.create_collection() + + except Exception as e: + logger.error(f"Error clearing collection: {e}") + # Reset state anyway + self.collection_id = None + self.indexed_email_ids = set() + self.last_indexed_hash = None + self.last_index_time = None + + def add_single_email(self, email: dict): + """ + Add a single email to the collection without triggering full reindexing. + This is efficient for real-time updates when new messages arrive. + + Args: + email: Dictionary with 'id', 'body', and other email fields + """ + if not self.collection_id: + logger.warning("Cannot add single email: no collection exists") + return False + + email_id = email.get('id') + if not email_id: + logger.error("Cannot add email without ID") + return False + + # Check if already indexed + if email_id in self.indexed_email_ids: + logger.debug(f"Email {email_id} already indexed, skipping") + return True + + try: + logger.info(f"Adding single email {email_id} to collection {self.collection_id}") + + # Prepare document for indexing + documents = [{ + "id": email_id, + "content": email.get('body', ''), + "metadata": { + "subject": email.get('subject', ''), + "sender": email.get('sender', ''), + "created_at": email.get('created_at', ''), + "email_id": email_id + } + }] + + # Add to collection + response = self.session.post( + f"{self.config.base_url}/collections/{self.collection_id}/documents", + json={"documents": documents} + ) + response.raise_for_status() + + # Update tracking + self.indexed_email_ids.add(email_id) + self.last_index_time = time.time() + self._save_state() + + logger.info(f"✅ Successfully added email {email_id} to collection") + return True + + except Exception as e: + logger.error(f"Failed to add single email {email_id}: {e}") + return False + + def index_emails(self, emails: list[dict]): + """ + Index a list of emails, with smart reindexing detection. + + Args: + emails: A list of dictionaries, where each dictionary represents an email + with at least a 'body' key and 'id' key. + """ + if not self.collection_id: + self.create_collection() + + # Check if reindexing is needed + if not self._needs_reindexing(emails): + logger.info("Skipping indexing: emails already indexed and up to date") + return + + indexing_start_time = time.time() + logger.info(f"Indexing {len(emails)} emails (reindexing needed)") + + # Log attachment information in the emails being indexed + emails_with_attachments = 0 + total_attachments = 0 + sample_attachments = [] + + for email in emails[:10]: # Check first 10 emails for attachment info + email_body = email.get('body', '') + if 'Files attached:' in email_body: + emails_with_attachments += 1 + # Extract attachment names from the body + attachment_section = email_body.split('Files attached:')[-1].strip() + if attachment_section: + attachments = [att.strip() for att in attachment_section.split(',')] + total_attachments += len(attachments) + sample_attachments.extend(attachments[:3]) # Sample first 3 + + if emails_with_attachments > 0: + logger.info(f"📎 RAG INDEXING: {emails_with_attachments} emails with attachments in first 10 emails") + logger.info(f"📎 RAG SAMPLE ATTACHMENTS: {sample_attachments[:5]}") + + successful_indexes = 0 + failed_indexes = 0 + + for i, email in enumerate(emails): + try: + # Limit email body size to avoid API issues, but preserve attachment info + email_body = email.get('body', '') + + # Log if this email has attachment information + has_attachments_info = 'Files attached:' in email_body + attachment_section = "" + + if has_attachments_info: + logger.debug(f"Email {i+1} ({email.get('id', 'unknown')}) contains attachment information") + # Extract attachment section to preserve it + parts = email_body.split('Files attached:') + if len(parts) > 1: + attachment_section = '\n\nFiles attached:' + parts[-1] + + if len(email_body) > self.config.max_email_content_length: + if has_attachments_info and attachment_section: + # Calculate space needed for attachment section + attachment_length = len(attachment_section) + available_length = self.config.max_email_content_length - attachment_length - 20 # 20 chars for "... [truncated]" + + if available_length > 500: # Ensure we have reasonable content left + # Truncate main content but preserve attachments + main_content = email_body.replace(attachment_section, '') + truncated_main = main_content[:available_length] + "... [truncated]" + email_body = truncated_main + attachment_section + logger.info(f"Email {i+1} truncated but attachment information preserved") + else: + # Not enough space, just truncate normally and warn + email_body = email_body[:self.config.max_email_content_length] + "... [truncated]" + logger.warning(f"Email {i+1} truncated and may have lost attachment information") + else: + # No attachments, just truncate normally + email_body = email_body[:self.config.max_email_content_length] + "... [truncated]" + + # Clean the email body to remove potential problematic characters + email_body = email_body.encode('utf-8', errors='ignore').decode('utf-8') + + # Create a temporary file with the email body + file_path = f"/tmp/email_{i}_{email.get('id', 'unknown')}.txt" + with open(file_path, "w", encoding='utf-8') as f: + f.write(email_body) + + # Prepare metadata with email ID for better matching + metadata = email.get('metadata', {}) + metadata['email_id'] = email.get('id', 'unknown') + metadata['document_name'] = os.path.basename(file_path) + + with open(file_path, "rb") as f: + files = {'file': (os.path.basename(file_path), f, 'text/plain')} + # Include metadata in the request + request_data = { + "collection": self.collection_id, + "metadata": metadata + } + data = {'request': json.dumps(request_data)} + + response = self.session.post( + f"{self.config.base_url}/files", + data=data, + files=files, + timeout=30 # Add timeout + ) + + if response.status_code != 201: + logger.warning(f"Failed to index email {i+1}: Status {response.status_code}") + logger.warning(f"Response: {response.text}") + failed_indexes += 1 + else: + successful_indexes += 1 + # Track this email as indexed + email_id = email.get('id') + if email_id: + self.indexed_email_ids.add(email_id) + logger.debug(f"Successfully indexed email {i+1}/{len(emails)}") + + os.remove(file_path) + + # Add a small delay to avoid overwhelming the API + if i > 0 and i % 10 == 0: + time.sleep(self.config.batch_upload_delay) # Configurable delay + + except Exception as e: + failed_indexes += 1 + logger.error(f"Error indexing email {i+1}: {e}") + # Clean up the temp file if it exists + if 'file_path' in locals() and os.path.exists(file_path): + os.remove(file_path) + continue + + # Update tracking information + if successful_indexes > 0: + self.last_indexed_hash = self._compute_email_hash(emails) + self.last_index_time = time.time() + # Save state to disk for persistence across requests + self._save_state() + + indexing_total_time = time.time() - indexing_start_time + logger.info(f"Email indexing completed: {successful_indexes} successful, {failed_indexes} failed in {indexing_total_time:.2f}s") + if successful_indexes > 0: + logger.info(f"📊 RAG INDEXING DETAILED PERFORMANCE: {successful_indexes} emails indexed at {successful_indexes/indexing_total_time:.1f} emails/sec") + + if len(emails) > 0 and successful_indexes == 0: + raise Exception(f"Failed to index any emails. All {failed_indexes} attempts failed.") + elif failed_indexes > 0: + logger.warning(f"Some emails failed to index: {failed_indexes} out of {len(emails)}") + + if len(emails) > 0: + print(f"Successfully indexed {successful_indexes}/{len(emails)} emails") + + def query_emails(self, query: str, k: int = 5) -> list[dict]: + """ + Query the indexed emails to find the most relevant ones. + + Args: + query: The user's query. + k: The number of emails to retrieve. + + Returns: + A list of dictionaries containing content and metadata for matching. + """ + if not self.collection_id: + raise Exception("Collection does not exist. Please index emails first.") + + query_start_time = time.time() + data = { + "collections": [self.collection_id], + "k": k, + "prompt": query, + "method": "semantic" + } + response = self.session.post( + url=f"{self.config.base_url}/search", + json=data + ) + api_time = time.time() - query_start_time + + response.raise_for_status() + + results = response.json()["data"] + + logger.info(f"📊 RAG QUERY DETAILED PERFORMANCE: Albert API search took {api_time:.2f}s for {k} results from query: '{query[:50]}...'") + + # Return both content and metadata for better matching + formatted_results = [] + for result in results: + chunk = result["chunk"] + formatted_results.append({ + "content": chunk["content"], + "metadata": chunk.get("metadata", {}), + "score": result.get("score", 0.0), + "document_name": chunk.get("metadata", {}).get("document_name", "") + }) + + return formatted_results + diff --git a/src/backend/deep_search/urls.py b/src/backend/deep_search/urls.py new file mode 100644 index 00000000..cbcdb76e --- /dev/null +++ b/src/backend/deep_search/urls.py @@ -0,0 +1,20 @@ +""" +URL configuration for the deep_search module. +""" + +from django.urls import path +from . import views + +app_name = 'deep_search' + +urlpatterns = [ + # AI-powered intelligent email search + path('intelligent-search/', views.intelligent_search_api, name='intelligent_search'), + + # General conversation endpoint + path('conversation/', views.conversation_api, name='conversation'), + + # Deep search status and configuration + path('status/', views.chatbot_status_api, name='chatbot_status'), + +] diff --git a/src/backend/deep_search/views.py b/src/backend/deep_search/views.py new file mode 100644 index 00000000..ba540c27 --- /dev/null +++ b/src/backend/deep_search/views.py @@ -0,0 +1,283 @@ +""" +Albert chatbot integration views. + +This module provides the core API endpoints for the chatbot functionality. +""" + +import logging +from typing import Dict, Any + +from rest_framework.decorators import api_view, permission_classes +from rest_framework.permissions import IsAuthenticated +from rest_framework.response import Response +from rest_framework import status + +from .chatbot import get_chatbot +from .email_service import EmailService + +logger = logging.getLogger(__name__) + + +@api_view(['POST']) +@permission_classes([IsAuthenticated]) +def intelligent_search_api(request): + """ + Intelligent email search endpoint using Albert API. + + This endpoint analyzes the user's first 500 emails and returns + the best matches using AI-powered search. + + POST data: + { + "query": "User's natural language search query" + } + + Returns: + { + "success": true, + "results": [...], + "search_summary": "...", + "total_matches": 5, + "total_emails": 500, + "query": "original query" + } + """ + try: + data = request.data + query = data.get('query', '').strip() + + if not query: + return Response({ + 'success': False, + 'error': 'query is required' + }, status=status.HTTP_400_BAD_REQUEST) + + # Get user ID from authenticated user + if not hasattr(request, 'user') or not request.user.is_authenticated: + return Response({ + 'success': False, + 'error': 'Authentication required' + }, status=status.HTTP_401_UNAUTHORIZED) + + user_id = str(request.user.id) + logger.info(f"Processing intelligent search for user {user_id}: {query}") + + # Use actual email service with RAG search + try: + # Initialize email service + email_service = EmailService() + + # Get chatbot instance for API client + chatbot = get_chatbot() + + # Perform intelligent email search using RAG + search_results = email_service.chatbot_intelligent_email_search( + user_id=user_id, + user_query=query, + api_client=chatbot.api_client, + max_results=10 + ) + + except Exception as e: + logger.error(f"Error in intelligent search: {e}") + search_results = { + 'success': False, + 'error': f'Search error: {str(e)}' + } + + # Format result for consistency + if search_results['success']: + results = search_results.get('results', []) + logger.info(f"Found {len(results)} matches for query: {query}") + + result = { + 'success': True, + 'response': f"Found {len(results)} relevant emails", + 'search_summary': f"AI search found {len(results)} relevant emails from {search_results.get('total_searched', 0)} searched", + 'type': 'intelligent_search', + 'results': results, + 'total_matches': len(results), + 'total_emails': search_results.get('total_searched', 0), + 'original_request': query + } + else: + logger.warning(f"Intelligent search failed: {search_results.get('error', 'Unknown error')}") + result = { + 'success': False, + 'response': search_results.get('error', 'Search failed'), + 'message': search_results.get('error', 'Search failed'), + 'type': 'intelligent_search', + 'results': [], + 'original_request': query + } + + if result.get('success'): + return Response({ + 'success': True, + 'results': result.get('results', []), + 'search_summary': result.get('search_summary', result.get('response', '')), + 'total_matches': result.get('total_matches', 0), + 'total_emails': result.get('total_emails', 0), + 'query': query, + 'type': 'intelligent_search' + }, status=status.HTTP_200_OK) + else: + return Response({ + 'success': False, + 'error': result.get('response', result.get('message', 'Search failed')), + 'query': query + }, status=status.HTTP_200_OK) + + except Exception as e: + logger.error(f"Error in intelligent_search_api: {e}") + return Response({ + 'success': False, + 'error': f'Search error: {str(e)}', + 'query': query if 'query' in locals() else '' + }, status=status.HTTP_500_INTERNAL_SERVER_ERROR) + + +@api_view(['GET']) +@permission_classes([IsAuthenticated]) +def chatbot_status_api(request): + """ + Get chatbot status and configuration. + + Returns basic information about the chatbot system. + """ + try: + user_id = str(request.user.id) if request.user.is_authenticated else "anonymous" + + return Response({ + 'success': True, + 'status': 'active', + 'features': { + 'intelligent_search': True, + 'email_analysis': True + }, + 'user_id': user_id + }, status=status.HTTP_200_OK) + + except Exception as e: + logger.error(f"Error in chatbot_status_api: {e}") + return Response({ + 'success': False, + 'status': 'error', + 'error': str(e) + }, status=status.HTTP_500_INTERNAL_SERVER_ERROR) + + +@api_view(['POST']) +@permission_classes([IsAuthenticated]) +def conversation_api(request): + """ + General conversation endpoint for chatbot interactions. + + This endpoint handles general conversation, email analysis, + summarization, and other non-search related tasks. + + POST data: + { + "message": "User's message", + "conversation_history": [ + {"role": "user", "content": "..."}, + {"role": "assistant", "content": "..."} + ] + } + + Returns: + { + "success": true, + "response": "Bot response", + "type": "conversation|email_summary|email_reply|function_call", + "function_used": "name_of_function_if_any" + } + """ + try: + data = request.data + message = data.get('message', '').strip() + conversation_history = data.get('conversation_history', []) + + if not message: + return Response({ + 'success': False, + 'error': 'message is required' + }, status=status.HTTP_400_BAD_REQUEST) + + # Get user ID from authenticated user + if not hasattr(request, 'user') or not request.user.is_authenticated: + return Response({ + 'success': False, + 'error': 'Authentication required' + }, status=status.HTTP_401_UNAUTHORIZED) + + user_id = str(request.user.id) + logger.info(f"Processing conversation for user {user_id}: {message}") + + # For general conversation, redirect to intelligent search + # Since we've simplified the chatbot to only do intelligent search + logger.info(f"Redirecting conversation to intelligent search for user {user_id}: {message}") + + # Simple implementation for conversation (redirected to search) + try: + # Mock search results for demonstration + mock_results = [ + { + "id": "mock-conversation-1", + "subject": f"Conversation result for: {message}", + "sender": {"email": "example@test.com"}, + "snippet": f"This is a mock conversation response for: {message}" + } + ] + + search_results = { + 'success': True, + 'results': mock_results, + 'total_searched': 500 + } + except Exception as e: + logger.error(f"Error in mock conversation: {e}") + search_results = { + 'success': False, + 'error': f'Conversation error: {str(e)}' + } + + # Format result for conversation context + if search_results['success']: + results = search_results.get('results', []) + result = { + 'success': True, + 'response': f"Found {len(results)} relevant emails for your query: {message}", + 'type': 'intelligent_search', + 'function_used': 'intelligent_email_search', + 'results': results + } + else: + result = { + 'success': False, + 'response': f"Could not search emails: {search_results.get('error', 'Search failed')}", + 'type': 'error' + } + + if result.get('success'): + return Response({ + 'success': True, + 'response': result.get('response', ''), + 'type': result.get('type', 'conversation'), + 'function_used': result.get('function_used', ''), + }, status=status.HTTP_200_OK) + else: + return Response({ + 'success': False, + 'error': result.get('response', 'Conversation failed'), + 'type': 'error' + }, status=status.HTTP_200_OK) + + except Exception as e: + logger.error(f"Error in conversation_api: {e}") + return Response({ + 'success': False, + 'error': f'Conversation error: {str(e)}', + 'type': 'error' + }, status=status.HTTP_500_INTERNAL_SERVER_ERROR) + diff --git a/src/backend/messages/settings.py b/src/backend/messages/settings.py index 9c6eb4d3..9ba04d49 100755 --- a/src/backend/messages/settings.py +++ b/src/backend/messages/settings.py @@ -310,6 +310,7 @@ class Base(Configuration): # Django applications from the highest priority to the lowest INSTALLED_APPS = [ "core", + "deep_search", # AI-powered search functionality "drf_spectacular", # Third party apps "corsheaders", diff --git a/src/backend/poetry.lock b/src/backend/poetry.lock index 9a1e571a..e71be07b 100644 --- a/src/backend/poetry.lock +++ b/src/backend/poetry.lock @@ -3557,6 +3557,17 @@ legacy-cgi = {version = ">=2.6", markers = "python_version >= \"3.13\""} docs = ["Sphinx (>=1.7.5)", "pylons-sphinx-themes"] testing = ["coverage", "pytest (>=3.1.0)", "pytest-cov", "pytest-xdist"] +[[package]] +name = "wget" +version = "3.2" +description = "pure python download utility" +optional = false +python-versions = "*" +groups = ["main"] +files = [ + {file = "wget-3.2.zip", hash = "sha256:35e630eca2aa50ce998b9b1a127bb26b30dfee573702782aa982f875e3f16061"}, +] + [[package]] name = "whitenoise" version = "6.8.2" @@ -3578,4 +3589,4 @@ dev = ["django-extensions", "drf-spectacular-sidecar", "flower", "pip-audit", "p [metadata] lock-version = "2.1" python-versions = ">=3.13,<4.0" -content-hash = "19de897b2b3ff7c3d9b64d6a1b3a3bd7a611a062c7065a7596310df33401fd0d" +content-hash = "e22ac075afd89a3e2aee96a09e2dcf3817a328ba12e3d0b91120fcc0daa4a17a" diff --git a/src/backend/pyproject.toml b/src/backend/pyproject.toml index 3cf4c4ed..146cd90a 100644 --- a/src/backend/pyproject.toml +++ b/src/backend/pyproject.toml @@ -61,6 +61,7 @@ dependencies = [ "requests==2.32.3", "sentry-sdk[django]==2.27.0", "url-normalize==1.4.3", + "wget==3.2", "whitenoise==6.8.2", ] diff --git a/src/frontend/src/features/forms/components/chatbot-search-input/index.tsx b/src/frontend/src/features/forms/components/chatbot-search-input/index.tsx new file mode 100644 index 00000000..e69de29b diff --git a/src/frontend/src/features/forms/components/search-filters-form/_index.scss b/src/frontend/src/features/forms/components/search-filters-form/_index.scss index ccdf2e3d..436b7c2f 100644 --- a/src/frontend/src/features/forms/components/search-filters-form/_index.scss +++ b/src/frontend/src/features/forms/components/search-filters-form/_index.scss @@ -36,4 +36,106 @@ display: flex; align-items: center; gap: var(--c--theme--spacings--2xs); +} + +.search__advanced-research-container { + position: relative; + display: flex; + flex-direction: column; + gap: var(--c--theme--spacings--xs); + padding-top: var(--c--theme--spacings--sm); + margin-top: var(--c--theme--spacings--sm); + + // Straight line separator above the chatbot input + &::before { + content: ''; + position: absolute; + top: 0; + left: 0; + right: 0; + height: 1px; + background-color: var(--c--theme--colors--greyscale-300); + } +} + +.search__advanced-research-input-wrapper { + position: relative; + display: flex; + align-items: center; + + // Center the text in the input + input { + padding-right: 50px; // Make space for the button inside + } + + + // Ensure the labelled-box container doesn't interfere with centering + .labelled-box { + display: flex; + align-items: center; + height: 26px; // Match button height + + .labelled-box__children { + display: flex; + align-items: center; + width: 100%; + height: 100%; + } + } +} + +.search__advanced-research-send-button { + position: absolute; + right: 8px; + top: 50%; + transform: translateY(-50%); + min-width: auto !important; + width: 36px !important; + height: 36px !important; + padding: 6px !important; + border-radius: 50% !important; + z-index: 1; + background: transparent !important; + border: none !important; + box-shadow: none !important; + + // Hover effect similar to "Rechercher" button + &:hover { + background-color: var(--c--theme--colors--primary-100) !important; + cursor: pointer; + } + + // Remove focus outline + &:focus { + outline: none !important; + box-shadow: none !important; + } + + // Ensure button is visible inside input + .material-icons { + font-size: 18px; + color: var(--c--theme--colors--primary-500); + + &:hover { + color: var(--c--theme--colors--primary-600); + } + } +} + +.search__filters-footer { + display: flex; + justify-content: flex-end; + gap: var(--c--theme--spacings--sm); +} + +.search__filters-folder-option { + display: flex; + align-items: center; + gap: var(--c--theme--spacings--2xs); +} + +.search__advanced-research-actions { + display: flex; + justify-content: flex-end; + gap: var(--c--theme--spacings--xs); } \ No newline at end of file diff --git a/src/frontend/src/features/forms/components/search-filters-form/index.tsx b/src/frontend/src/features/forms/components/search-filters-form/index.tsx index cb1c21c2..4aeae26d 100644 --- a/src/frontend/src/features/forms/components/search-filters-form/index.tsx +++ b/src/frontend/src/features/forms/components/search-filters-form/index.tsx @@ -1,8 +1,11 @@ import { SearchHelper } from "@/features/utils/search-helper"; -import { Label } from "@gouvfr-lasuite/ui-kit"; +import { Label, Spinner } from "@gouvfr-lasuite/ui-kit"; import { Button, Checkbox, Input, Select } from "@openfun/cunningham-react"; -import { useRef } from "react"; +import { useRef, useState } from "react"; import { useTranslation } from "react-i18next"; +import { fetchAPI } from "@/features/api/fetch-api"; +import { useRouter } from "next/router"; +import { usePathname } from "next/navigation"; type SearchFiltersFormProps = { query: string; @@ -11,21 +14,126 @@ type SearchFiltersFormProps = { export const SearchFiltersForm = ({ query, onChange }: SearchFiltersFormProps) => { const { t, i18n } = useTranslation(); + const router = useRouter(); + const pathname = usePathname(); const formRef = useRef(null); + const [advancedResearchValue, setAdvancedResearchValue] = useState(''); + const [isSubmittingAdvancedResearch, setIsSubmittingAdvancedResearch] = useState(false); - const updateQuery = (submit: boolean) => { + const updateQuery = async (submit: boolean) => { const formData = new FormData(formRef.current as HTMLFormElement); - const query = SearchHelper.serializeSearchFormData(formData, i18n.resolvedLanguage); + const advancedResearchQuery = formData.get('advanced_research'); + + // If advanced research field is filled, use Albert API for intelligent search + if (submit && advancedResearchQuery && String(advancedResearchQuery).trim() !== "") { + setIsSubmittingAdvancedResearch(true); + try { + // Call Albert API + const intelligentSearchRes = await fetchAPI<{ + success: boolean; + results?: Array<{ id: string }>; + error?: string; + }>( + "/api/v1.0/deep_search/intelligent-search/", + { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ query: advancedResearchQuery }), + } + ); + + const actualResponse = (intelligentSearchRes as { data?: { success: boolean; results?: Array<{ id: string }>; error?: string } })?.data || intelligentSearchRes; + + if (actualResponse?.success && actualResponse?.results && actualResponse.results.length > 0) { + // Build query object for router - this ensures router.query gets populated + const mailIds = actualResponse.results.map((r: { id: string }) => r.id).join(','); + console.log('AI Search: Setting message_ids in router query:', mailIds); + console.log('AI Search: Results from API:', actualResponse.results); + + const newQuery: Record = { + ...router.query, + message_ids: mailIds + }; + + // Remove any search text parameters - keep deep search separate from "contient les mots" + if ('search' in newQuery) { + delete newQuery.search; + } + + // Use router navigation with query object instead of URL string + router.replace({ + pathname, + query: newQuery + }); + + // Clear the advanced research input after successful search + setAdvancedResearchValue(''); + return; + } else if (actualResponse?.success && actualResponse?.results && actualResponse.results.length === 0) { + // Successful search but no results - use a non-existent message ID to show no emails + const newQuery: Record = { + ...router.query, + message_ids: '00000000-0000-0000-0000-000000000000' // Non-existent UUID to ensure no results + }; + + // Remove any search text parameters - keep deep search separate from "contient les mots" + if ('search' in newQuery) { + delete newQuery.search; + } + + // Use router navigation to show empty results + router.replace({ + pathname, + query: newQuery + }); + + // Clear the advanced research input after successful search + setAdvancedResearchValue(''); + return; + } else { + // Show error for actual failures + alert(actualResponse?.error || t("api.error.unexpected")); + return; + } + } catch (error) { + console.error('Advanced research error:', error instanceof Error ? error.message : String(error)); + alert(t("api.error.unexpected")); + return; + } finally { + setIsSubmittingAdvancedResearch(false); + } + } + + // Otherwise, classic search + const query = SearchHelper.serializeSearchFormData(formData, i18n.resolvedLanguage || 'en'); onChange(query, submit); formRef.current?.reset(); } - const handleSubmit = (event: React.FormEvent) => updateQuery(event.type === 'submit'); + const handleSubmit = (event: React.FormEvent) => { + event.preventDefault(); + updateQuery(true); + }; const handleChange = () => updateQuery(false); + const handleAdvancedResearchChange = (event: React.ChangeEvent) => { + setAdvancedResearchValue(event.target.value); + } + + const handleAdvancedResearchKeyPress = (event: React.KeyboardEvent) => { + if (event.key === 'Enter' && !event.shiftKey) { + event.preventDefault(); + updateQuery(true); + } + } + const handleReset = () => { onChange('', false); formRef.current?.reset(); + setAdvancedResearchValue(''); // Clear the advanced research input value + + // Navigate back to the default view without any search parameters + router.replace(pathname); } const parsedQuery = SearchHelper.parseSearchQuery(query); @@ -101,6 +209,38 @@ export const SearchFiltersForm = ({ query, onChange }: SearchFiltersFormProps) = + + {/* Advanced Research Field */} +
+
+ + +
+
+