diff --git a/backend/.env.example b/backend/.env.example index a75780942..0a7863f9c 100644 --- a/backend/.env.example +++ b/backend/.env.example @@ -65,6 +65,7 @@ AI_SYSTEM_BULK_IMPORT_MAX_ROWS=5000 # ── Module 3: RAG Intelligence ──────────────────────────────────────────────── S3_BUCKET_NAME= FAISS_INDEX_PATH=faiss_index +RAG_DOCUMENT_STORAGE_PATH=rag_documents RAG_CHUNK_SIZE=1000 RAG_CHUNK_OVERLAP=200 diff --git a/backend/alembic/versions/7f3b2e91a6d4_add_rag_documents_table.py b/backend/alembic/versions/7f3b2e91a6d4_add_rag_documents_table.py new file mode 100644 index 000000000..6c9debaca --- /dev/null +++ b/backend/alembic/versions/7f3b2e91a6d4_add_rag_documents_table.py @@ -0,0 +1,44 @@ +"""add_rag_documents_table + +Revision ID: 7f3b2e91a6d4 +Revises: add_onboarding_completed_to_users, c0e71c86214f, e7d9f2b3c4a5 +Create Date: 2026-06-08 00:00:00.000000 + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + + +revision: str = "7f3b2e91a6d4" +down_revision: Union[str, tuple[str, ...], None] = ( + "add_onboarding_completed_to_users", + "c0e71c86214f", + "e7d9f2b3c4a5", +) +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + op.create_table( + "rag_documents", + sa.Column("id", sa.Integer(), nullable=False), + sa.Column("filename", sa.String(length=255), nullable=False), + sa.Column("original_filename", sa.String(length=255), nullable=False), + sa.Column("storage_path", sa.String(length=1000), nullable=False), + sa.Column("content_type", sa.String(length=255), nullable=True), + sa.Column("file_size_bytes", sa.Integer(), nullable=False), + sa.Column("chunks_count", sa.Integer(), nullable=False), + sa.Column("uploaded_by_id", sa.Integer(), nullable=True), + sa.Column("created_at", sa.DateTime(), nullable=False), + sa.Column("updated_at", sa.DateTime(), nullable=False), + sa.PrimaryKeyConstraint("id"), + ) + op.create_index(op.f("ix_rag_documents_id"), "rag_documents", ["id"], unique=False) + + +def downgrade() -> None: + op.drop_index(op.f("ix_rag_documents_id"), table_name="rag_documents") + op.drop_table("rag_documents") diff --git a/backend/app/api/v1/__init__.py b/backend/app/api/v1/__init__.py index e3919f042..e22671f98 100644 --- a/backend/app/api/v1/__init__.py +++ b/backend/app/api/v1/__init__.py @@ -1,5 +1,5 @@ from fastapi import APIRouter -from app.api.v1 import auth, ai_systems, documents, classification, guard, badge, analytics, notifications, webhooks +from app.api.v1 import auth, ai_systems, documents, classification, guard, rag, badge, analytics, notifications, webhooks api_router = APIRouter() @@ -11,7 +11,7 @@ ) api_router.include_router(documents.router, prefix="/documents", tags=["Documents"]) api_router.include_router(guard.router, prefix="/guard", tags=["LLM Guard"]) -#api_router.include_router(rag.router, prefix="/rag", tags=["RAG Intelligence"]) +api_router.include_router(rag.router, prefix="/rag", tags=["RAG Intelligence"]) api_router.include_router(analytics.router, prefix="/analytics", tags=["Analytics"]) api_router.include_router(notifications.router, prefix="/notifications", tags=["Notifications"]) api_router.include_router(webhooks.router, prefix="/webhooks", tags=["Webhooks"]) diff --git a/backend/app/api/v1/rag.py b/backend/app/api/v1/rag.py index f4882ddf0..bc3b8c073 100644 --- a/backend/app/api/v1/rag.py +++ b/backend/app/api/v1/rag.py @@ -13,7 +13,8 @@ import os import shutil -import tempfile +import uuid +from datetime import datetime from typing import List, Literal, Optional import mimetypes @@ -25,6 +26,7 @@ from app.core.config import settings from app.core.database import get_db from app.core.security import get_current_user +from app.models.rag_document import RAGDocument from app.models.rag_feedback import RAGFeedback from app.models.rag_query import RagQuery from app.models.user import SubscriptionTier, User @@ -54,6 +56,81 @@ class RAGIngestResponse(BaseModel): index_size_bytes: int +class RAGDocumentResponse(BaseModel): + id: int + filename: str + original_filename: str + content_type: Optional[str] = None + file_size_bytes: int + chunks_count: int + uploaded_by_id: Optional[int] = None + created_at: datetime + updated_at: datetime + + class Config: + from_attributes = True + + +class RAGDocumentListResponse(BaseModel): + items: list[RAGDocumentResponse] + total: int + + +class RAGDocumentDeleteResponse(BaseModel): + deleted_document_id: int + documents_remaining: int + index_rebuilt: bool + index_size_bytes: int + + +def _ensure_storage_dir() -> str: + os.makedirs(settings.RAG_DOCUMENT_STORAGE_PATH, exist_ok=True) + return settings.RAG_DOCUMENT_STORAGE_PATH + + +def _stored_filename(original_filename: str) -> str: + safe_name = os.path.basename(original_filename).replace(os.sep, "_") + return f"{uuid.uuid4().hex}_{safe_name}" + + +def _index_size_bytes() -> int: + index_path = settings.FAISS_INDEX_PATH + index_size_bytes = 0 + for fname in ("index.faiss", "index.pkl"): + fpath = os.path.join(index_path, fname) + if os.path.exists(fpath): + index_size_bytes += os.path.getsize(fpath) + return index_size_bytes + + +def _valid_text_chunks(file_paths: list[str]): + raw_chunks = load_documents_from_paths(file_paths) + return [ + chunk for chunk in raw_chunks + if getattr(chunk, "page_content", None) and chunk.page_content.strip() + ] + + +def _rebuild_index_from_documents(documents: list[RAGDocument]) -> int: + file_paths = [doc.storage_path for doc in documents if os.path.exists(doc.storage_path)] + if not file_paths: + shutil.rmtree(settings.FAISS_INDEX_PATH, ignore_errors=True) + return 0 + + chunks = _valid_text_chunks(file_paths) + if not chunks: + shutil.rmtree(settings.FAISS_INDEX_PATH, ignore_errors=True) + return 0 + + create_vector_store(chunks) + return _index_size_bytes() + + +def _current_user_id(current_user: User) -> Optional[int]: + user_id = getattr(current_user, "id", None) + return user_id if isinstance(user_id, int) else None + + # --------------------------------------------------------------------------- # POST /rag/ingest # --------------------------------------------------------------------------- @@ -66,6 +143,7 @@ class RAGIngestResponse(BaseModel): def ingest_documents( files: List[UploadFile] = File(..., description="One or more PDF files to ingest"), current_user: User = Depends(get_current_user), + db: Session = Depends(get_db), ): """Accept one or more PDF uploads, process them through the document loader,""" if len(files) > settings.RAG_MAX_FILES_PER_REQUEST: @@ -103,24 +181,30 @@ def ingest_documents( detail=f"Total upload size exceeds the maximum budget of {settings.RAG_TOTAL_BUDGET_BYTES // (1024 * 1024)}MB.", ) - tmp_dir = tempfile.mkdtemp(prefix="aegis_ingest_") + storage_dir = _ensure_storage_dir() saved_paths: list[str] = [] + pending_documents: list[RAGDocument] = [] try: for upload in pdf_files: - dest = os.path.join(tmp_dir, os.path.basename(upload.filename)) + filename = _stored_filename(upload.filename) + dest = os.path.join(storage_dir, filename) with open(dest, "wb") as buf: shutil.copyfileobj(upload.file, buf) saved_paths.append(dest) + pending_documents.append( + RAGDocument( + filename=filename, + original_filename=os.path.basename(upload.filename), + storage_path=dest, + content_type=upload.content_type, + file_size_bytes=os.path.getsize(dest), + uploaded_by_id=_current_user_id(current_user), + ) + ) # ── 3. Chunk documents (gives us the accurate chunk count) ──────── - raw_chunks = load_documents_from_paths(saved_paths) - - # Filter out chunks with empty or whitespace-only page_content - chunks = [ - chunk for chunk in raw_chunks - if chunk.page_content and chunk.page_content.strip() - ] + chunks = _valid_text_chunks(saved_paths) if not chunks: raise HTTPException( @@ -129,22 +213,29 @@ def ingest_documents( "Ensure the files are not scanned images or password-protected.", ) + chunks_by_source: dict[str, int] = {path: 0 for path in saved_paths} + for chunk in chunks: + source = str(getattr(chunk, "metadata", {}).get("source", "")) + if source in chunks_by_source: + chunks_by_source[source] += 1 + + for document in pending_documents: + document.chunks_count = chunks_by_source.get(document.storage_path, 0) + db.add(document) + db.flush() + # ── 4. Build / rebuild FAISS index and persist to disk ──────────── try: - create_vector_store(chunks) # Pass the extracted Document objects! + all_documents = db.query(RAGDocument).order_by(RAGDocument.id.asc()).all() + index_size_bytes = _rebuild_index_from_documents(all_documents) except Exception as exc: + db.rollback() raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail=f"Failed to build FAISS index: {exc}", ) - # ── 5. Calculate on-disk index size ─────────────────────────────── - index_path = settings.FAISS_INDEX_PATH - index_size_bytes = 0 - for fname in ("index.faiss", "index.pkl"): - fpath = os.path.join(index_path, fname) - if os.path.exists(fpath): - index_size_bytes += os.path.getsize(fpath) + db.commit() return RAGIngestResponse( files_processed=len(saved_paths), @@ -153,8 +244,61 @@ def ingest_documents( ) finally: - # ── 6. Always clean up the temp directory ───────────────────────── - shutil.rmtree(tmp_dir, ignore_errors=True) + if db.is_active: + for path in saved_paths: + exists_in_db = db.query(RAGDocument).filter(RAGDocument.storage_path == path).first() + if not exists_in_db and os.path.exists(path): + os.remove(path) + + +@router.get("/documents", response_model=RAGDocumentListResponse) +def list_rag_documents( + current_user: User = Depends(get_current_user), + db: Session = Depends(get_db), +): + """List documents currently included in the RAG knowledge base.""" + documents = db.query(RAGDocument).order_by(RAGDocument.created_at.desc()).all() + return RAGDocumentListResponse(items=documents, total=len(documents)) + + +@router.delete("/documents/{document_id}", response_model=RAGDocumentDeleteResponse) +def delete_rag_document( + document_id: int, + current_user: User = Depends(get_current_user), + db: Session = Depends(get_db), +): + """Remove a RAG source document and rebuild the FAISS index.""" + document = db.query(RAGDocument).filter(RAGDocument.id == document_id).first() + if not document: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="RAG document not found") + + storage_path = document.storage_path + db.delete(document) + db.flush() + + remaining_documents = db.query(RAGDocument).order_by(RAGDocument.id.asc()).all() + try: + index_size_bytes = _rebuild_index_from_documents(remaining_documents) + except Exception as exc: + db.rollback() + raise HTTPException( + status_code=status.HTTP_503_SERVICE_UNAVAILABLE, + detail=f"Failed to rebuild FAISS index: {exc}", + ) + + db.commit() + if os.path.exists(storage_path): + try: + os.remove(storage_path) + except OSError: + pass + + return RAGDocumentDeleteResponse( + deleted_document_id=document_id, + documents_remaining=len(remaining_documents), + index_rebuilt=bool(remaining_documents), + index_size_bytes=index_size_bytes, + ) @router.post("/query", response_model=RAGQueryResponse) @@ -375,4 +519,4 @@ def get_rag_history( } for q in queries ], - } \ No newline at end of file + } diff --git a/backend/app/core/config.py b/backend/app/core/config.py index 794e0bed2..86545a4e1 100644 --- a/backend/app/core/config.py +++ b/backend/app/core/config.py @@ -57,6 +57,7 @@ class Settings(BaseSettings): RAG_CHUNK_SIZE: int = 1000 RAG_CHUNK_OVERLAP: int = 200 FAISS_INDEX_PATH: str = "faiss_index" + RAG_DOCUMENT_STORAGE_PATH: str = "rag_documents" MLFLOW_TRACKING_URI: str = "" EMBEDDINGS_MODEL: str = "nomic-embed-text" RAG_MAX_FILES_PER_REQUEST: int = 10 diff --git a/backend/app/models/__init__.py b/backend/app/models/__init__.py index 5477066f4..060e7aae6 100644 --- a/backend/app/models/__init__.py +++ b/backend/app/models/__init__.py @@ -3,9 +3,10 @@ from app.models.document import Document from app.models.rag_feedback import RAGFeedback from app.models.rag_query import RagQuery +from app.models.rag_document import RAGDocument from app.models.audit_log import AISystemAuditLog from app.models.guard_scan_log import GuardScanLog from app.models.webhook import WebhookConfig from app.models.notification import Notification from app.models.compliance_snapshot import ComplianceSnapshot -__all__ = ["User", "AISystem", "RiskAssessment", "Document", "RAGFeedback", "RagQuery", "AISystemAuditLog", "GuardScanLog", "WebhookConfig", "Notification", "ComplianceSnapshot"] +__all__ = ["User", "AISystem", "RiskAssessment", "Document", "RAGFeedback", "RagQuery", "RAGDocument", "AISystemAuditLog", "GuardScanLog", "WebhookConfig", "Notification", "ComplianceSnapshot"] diff --git a/backend/app/models/rag_document.py b/backend/app/models/rag_document.py new file mode 100644 index 000000000..366031a1e --- /dev/null +++ b/backend/app/models/rag_document.py @@ -0,0 +1,20 @@ +from datetime import datetime + +from sqlalchemy import Column, DateTime, Integer, String + +from app.core.database import Base + + +class RAGDocument(Base): + __tablename__ = "rag_documents" + + id = Column(Integer, primary_key=True, index=True) + filename = Column(String(255), nullable=False) + original_filename = Column(String(255), nullable=False) + storage_path = Column(String(1000), nullable=False) + content_type = Column(String(255), nullable=True) + file_size_bytes = Column(Integer, nullable=False, default=0) + chunks_count = Column(Integer, nullable=False, default=0) + uploaded_by_id = Column(Integer, nullable=True) + created_at = Column(DateTime, default=datetime.utcnow, nullable=False) + updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False) diff --git a/backend/tests/test_rag_ingest.py b/backend/tests/test_rag_ingest.py index 89a9d9cb2..0eaa61bfe 100644 --- a/backend/tests/test_rag_ingest.py +++ b/backend/tests/test_rag_ingest.py @@ -7,8 +7,10 @@ """ import io +import os import pytest from unittest.mock import MagicMock, patch +from app.core.config import settings from app.core.security import get_current_user from app.main import app @@ -41,6 +43,13 @@ def _mock_current_user(): PATCH_CREATE_VS = "app.api.v1.rag.create_vector_store" +@pytest.fixture(autouse=True) +def isolated_rag_document_storage(tmp_path): + """Keep persisted RAG upload files out of the repository during tests.""" + with patch.object(settings, "RAG_DOCUMENT_STORAGE_PATH", str(tmp_path / "rag_documents")): + yield + + @pytest.fixture def mock_rag_user(): """Authenticate RAG ingest tests without requiring a real JWT.""" @@ -317,3 +326,76 @@ def test_exceed_total_budget_returns_413(self, client, mock_rag_user): ) assert response.status_code == 413 assert "total upload size exceeds" in response.json()["detail"].lower() + + +class TestRagDocuments: + """Tests for managing documents stored in the RAG knowledge base.""" + + @staticmethod + def _chunk(text: str = "regulatory text"): + chunk = MagicMock() + chunk.page_content = text + chunk.metadata = {} + return chunk + + @patch(PATCH_CREATE_VS) + @patch(PATCH_LOAD_DOCS) + def test_list_documents_returns_metadata(self, mock_load, mock_create, client, mock_rag_user): + mock_load.return_value = [self._chunk()] + mock_create.return_value = MagicMock() + + ingest_response = client.post( + "/api/v1/rag/ingest", + files={"files": _make_pdf_upload("eu_ai_act.pdf")}, + ) + assert ingest_response.status_code == 200 + + response = client.get("/api/v1/rag/documents") + + assert response.status_code == 200 + data = response.json() + assert data["total"] == 1 + assert data["items"][0]["original_filename"] == "eu_ai_act.pdf" + assert data["items"][0]["content_type"] == "application/pdf" + assert data["items"][0]["file_size_bytes"] > 0 + + @patch(PATCH_CREATE_VS) + @patch(PATCH_LOAD_DOCS) + def test_delete_document_removes_file_and_rebuilds_index(self, mock_load, mock_create, client, mock_rag_user): + deleted_chunk = self._chunk("delete me") + remaining_chunk = self._chunk("keep me") + mock_load.side_effect = [ + [deleted_chunk, remaining_chunk], + [deleted_chunk, remaining_chunk], + [remaining_chunk], + ] + mock_create.return_value = MagicMock() + + ingest_response = client.post( + "/api/v1/rag/ingest", + files=[ + ("files", _make_pdf_upload("delete-me.pdf")), + ("files", _make_pdf_upload("keep-me.pdf")), + ], + ) + assert ingest_response.status_code == 200 + + documents = client.get("/api/v1/rag/documents").json()["items"] + deleted_doc = next(doc for doc in documents if doc["original_filename"] == "delete-me.pdf") + deleted_path = os.path.join(settings.RAG_DOCUMENT_STORAGE_PATH, deleted_doc["filename"]) + + mock_create.reset_mock() + response = client.delete(f"/api/v1/rag/documents/{deleted_doc['id']}") + + assert response.status_code == 200 + data = response.json() + assert data["deleted_document_id"] == deleted_doc["id"] + assert data["documents_remaining"] == 1 + assert data["index_rebuilt"] is True + assert not os.path.exists(deleted_path) + mock_create.assert_called_once_with([remaining_chunk]) + + def test_delete_missing_document_returns_404(self, client, mock_rag_user): + response = client.delete("/api/v1/rag/documents/999999") + + assert response.status_code == 404 diff --git a/docs/api-reference.md b/docs/api-reference.md index f137ea11f..f300366ef 100644 --- a/docs/api-reference.md +++ b/docs/api-reference.md @@ -427,6 +427,50 @@ Check if the Guard module is loaded and available. ## RAG Intelligence +### GET /rag/documents + +List source documents currently stored in the RAG knowledge base. + +**Response `200`:** +```json +{ + "items": [ + { + "id": 12, + "filename": "c93b..._eu_ai_act.pdf", + "original_filename": "eu_ai_act.pdf", + "content_type": "application/pdf", + "file_size_bytes": 2483921, + "chunks_count": 184, + "uploaded_by_id": 4, + "created_at": "2026-06-08T12:00:00", + "updated_at": "2026-06-08T12:00:00" + } + ], + "total": 1 +} +``` + +--- + +### DELETE /rag/documents/{id} + +Remove a source document from the RAG knowledge base and rebuild the FAISS index from the remaining stored documents. + +**Response `200`:** +```json +{ + "deleted_document_id": 12, + "documents_remaining": 3, + "index_rebuilt": true, + "index_size_bytes": 3912048 +} +``` + +**Errors:** `404` if the document does not exist, `503` if the FAISS index rebuild fails. + +--- + ### POST /rag/query Ask a regulatory question and receive a grounded answer.