Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions backend/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ AI_SYSTEM_BULK_IMPORT_MAX_ROWS=5000
# ── Module 3: RAG Intelligence ────────────────────────────────────────────────
S3_BUCKET_NAME=
FAISS_INDEX_PATH=faiss_index
RAG_DOCUMENT_STORAGE_PATH=rag_documents
RAG_CHUNK_SIZE=1000
RAG_CHUNK_OVERLAP=200

Expand Down
44 changes: 44 additions & 0 deletions backend/alembic/versions/7f3b2e91a6d4_add_rag_documents_table.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
"""add_rag_documents_table

Revision ID: 7f3b2e91a6d4
Revises: add_onboarding_completed_to_users, c0e71c86214f, e7d9f2b3c4a5
Create Date: 2026-06-08 00:00:00.000000

"""
from typing import Sequence, Union

from alembic import op
import sqlalchemy as sa


revision: str = "7f3b2e91a6d4"
down_revision: Union[str, tuple[str, ...], None] = (
"add_onboarding_completed_to_users",
"c0e71c86214f",
"e7d9f2b3c4a5",
)
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
op.create_table(
"rag_documents",
sa.Column("id", sa.Integer(), nullable=False),
sa.Column("filename", sa.String(length=255), nullable=False),
sa.Column("original_filename", sa.String(length=255), nullable=False),
sa.Column("storage_path", sa.String(length=1000), nullable=False),
sa.Column("content_type", sa.String(length=255), nullable=True),
sa.Column("file_size_bytes", sa.Integer(), nullable=False),
sa.Column("chunks_count", sa.Integer(), nullable=False),
sa.Column("uploaded_by_id", sa.Integer(), nullable=True),
sa.Column("created_at", sa.DateTime(), nullable=False),
sa.Column("updated_at", sa.DateTime(), nullable=False),
sa.PrimaryKeyConstraint("id"),
)
op.create_index(op.f("ix_rag_documents_id"), "rag_documents", ["id"], unique=False)


def downgrade() -> None:
op.drop_index(op.f("ix_rag_documents_id"), table_name="rag_documents")
op.drop_table("rag_documents")
4 changes: 2 additions & 2 deletions backend/app/api/v1/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from fastapi import APIRouter
from app.api.v1 import auth, ai_systems, documents, classification, guard, badge, analytics, notifications, webhooks
from app.api.v1 import auth, ai_systems, documents, classification, guard, rag, badge, analytics, notifications, webhooks

api_router = APIRouter()

Expand All @@ -11,7 +11,7 @@
)
api_router.include_router(documents.router, prefix="/documents", tags=["Documents"])
api_router.include_router(guard.router, prefix="/guard", tags=["LLM Guard"])
#api_router.include_router(rag.router, prefix="/rag", tags=["RAG Intelligence"])
api_router.include_router(rag.router, prefix="/rag", tags=["RAG Intelligence"])
api_router.include_router(analytics.router, prefix="/analytics", tags=["Analytics"])
api_router.include_router(notifications.router, prefix="/notifications", tags=["Notifications"])
api_router.include_router(webhooks.router, prefix="/webhooks", tags=["Webhooks"])
186 changes: 165 additions & 21 deletions backend/app/api/v1/rag.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@

import os
import shutil
import tempfile
import uuid
from datetime import datetime
from typing import List, Literal, Optional
import mimetypes

Expand All @@ -25,6 +26,7 @@
from app.core.config import settings
from app.core.database import get_db
from app.core.security import get_current_user
from app.models.rag_document import RAGDocument
from app.models.rag_feedback import RAGFeedback
from app.models.rag_query import RagQuery
from app.models.user import SubscriptionTier, User
Expand Down Expand Up @@ -54,6 +56,81 @@ class RAGIngestResponse(BaseModel):
index_size_bytes: int


class RAGDocumentResponse(BaseModel):
id: int
filename: str
original_filename: str
content_type: Optional[str] = None
file_size_bytes: int
chunks_count: int
uploaded_by_id: Optional[int] = None
created_at: datetime
updated_at: datetime

class Config:
from_attributes = True


class RAGDocumentListResponse(BaseModel):
items: list[RAGDocumentResponse]
total: int


class RAGDocumentDeleteResponse(BaseModel):
deleted_document_id: int
documents_remaining: int
index_rebuilt: bool
index_size_bytes: int


def _ensure_storage_dir() -> str:
os.makedirs(settings.RAG_DOCUMENT_STORAGE_PATH, exist_ok=True)
return settings.RAG_DOCUMENT_STORAGE_PATH


def _stored_filename(original_filename: str) -> str:
safe_name = os.path.basename(original_filename).replace(os.sep, "_")
return f"{uuid.uuid4().hex}_{safe_name}"


def _index_size_bytes() -> int:
index_path = settings.FAISS_INDEX_PATH
index_size_bytes = 0
for fname in ("index.faiss", "index.pkl"):
fpath = os.path.join(index_path, fname)
if os.path.exists(fpath):
index_size_bytes += os.path.getsize(fpath)
return index_size_bytes


def _valid_text_chunks(file_paths: list[str]):
raw_chunks = load_documents_from_paths(file_paths)
return [
chunk for chunk in raw_chunks
if getattr(chunk, "page_content", None) and chunk.page_content.strip()
]


def _rebuild_index_from_documents(documents: list[RAGDocument]) -> int:
file_paths = [doc.storage_path for doc in documents if os.path.exists(doc.storage_path)]
if not file_paths:
shutil.rmtree(settings.FAISS_INDEX_PATH, ignore_errors=True)
return 0

chunks = _valid_text_chunks(file_paths)
if not chunks:
shutil.rmtree(settings.FAISS_INDEX_PATH, ignore_errors=True)
return 0

create_vector_store(chunks)
return _index_size_bytes()


def _current_user_id(current_user: User) -> Optional[int]:
user_id = getattr(current_user, "id", None)
return user_id if isinstance(user_id, int) else None


# ---------------------------------------------------------------------------
# POST /rag/ingest
# ---------------------------------------------------------------------------
Expand All @@ -66,6 +143,7 @@ class RAGIngestResponse(BaseModel):
def ingest_documents(
files: List[UploadFile] = File(..., description="One or more PDF files to ingest"),
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db),
):
"""Accept one or more PDF uploads, process them through the document loader,"""
if len(files) > settings.RAG_MAX_FILES_PER_REQUEST:
Expand Down Expand Up @@ -103,24 +181,30 @@ def ingest_documents(
detail=f"Total upload size exceeds the maximum budget of {settings.RAG_TOTAL_BUDGET_BYTES // (1024 * 1024)}MB.",
)

tmp_dir = tempfile.mkdtemp(prefix="aegis_ingest_")
storage_dir = _ensure_storage_dir()
saved_paths: list[str] = []
pending_documents: list[RAGDocument] = []

try:
for upload in pdf_files:
dest = os.path.join(tmp_dir, os.path.basename(upload.filename))
filename = _stored_filename(upload.filename)
dest = os.path.join(storage_dir, filename)
with open(dest, "wb") as buf:
shutil.copyfileobj(upload.file, buf)
saved_paths.append(dest)
pending_documents.append(
RAGDocument(
filename=filename,
original_filename=os.path.basename(upload.filename),
storage_path=dest,
content_type=upload.content_type,
file_size_bytes=os.path.getsize(dest),
uploaded_by_id=_current_user_id(current_user),
)
)

# ── 3. Chunk documents (gives us the accurate chunk count) ────────
raw_chunks = load_documents_from_paths(saved_paths)

# Filter out chunks with empty or whitespace-only page_content
chunks = [
chunk for chunk in raw_chunks
if chunk.page_content and chunk.page_content.strip()
]
chunks = _valid_text_chunks(saved_paths)

if not chunks:
raise HTTPException(
Expand All @@ -129,22 +213,29 @@ def ingest_documents(
"Ensure the files are not scanned images or password-protected.",
)

chunks_by_source: dict[str, int] = {path: 0 for path in saved_paths}
for chunk in chunks:
source = str(getattr(chunk, "metadata", {}).get("source", ""))
if source in chunks_by_source:
chunks_by_source[source] += 1

for document in pending_documents:
document.chunks_count = chunks_by_source.get(document.storage_path, 0)
db.add(document)
db.flush()

# ── 4. Build / rebuild FAISS index and persist to disk ────────────
try:
create_vector_store(chunks) # Pass the extracted Document objects!
all_documents = db.query(RAGDocument).order_by(RAGDocument.id.asc()).all()
index_size_bytes = _rebuild_index_from_documents(all_documents)
except Exception as exc:
db.rollback()
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail=f"Failed to build FAISS index: {exc}",
)

# ── 5. Calculate on-disk index size ───────────────────────────────
index_path = settings.FAISS_INDEX_PATH
index_size_bytes = 0
for fname in ("index.faiss", "index.pkl"):
fpath = os.path.join(index_path, fname)
if os.path.exists(fpath):
index_size_bytes += os.path.getsize(fpath)
db.commit()

return RAGIngestResponse(
files_processed=len(saved_paths),
Expand All @@ -153,8 +244,61 @@ def ingest_documents(
)

finally:
# ── 6. Always clean up the temp directory ─────────────────────────
shutil.rmtree(tmp_dir, ignore_errors=True)
if db.is_active:
for path in saved_paths:
exists_in_db = db.query(RAGDocument).filter(RAGDocument.storage_path == path).first()
if not exists_in_db and os.path.exists(path):
os.remove(path)


@router.get("/documents", response_model=RAGDocumentListResponse)
def list_rag_documents(
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db),
):
"""List documents currently included in the RAG knowledge base."""
documents = db.query(RAGDocument).order_by(RAGDocument.created_at.desc()).all()
return RAGDocumentListResponse(items=documents, total=len(documents))


@router.delete("/documents/{document_id}", response_model=RAGDocumentDeleteResponse)
def delete_rag_document(
document_id: int,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db),
):
"""Remove a RAG source document and rebuild the FAISS index."""
document = db.query(RAGDocument).filter(RAGDocument.id == document_id).first()
if not document:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="RAG document not found")

storage_path = document.storage_path
db.delete(document)
db.flush()

remaining_documents = db.query(RAGDocument).order_by(RAGDocument.id.asc()).all()
try:
index_size_bytes = _rebuild_index_from_documents(remaining_documents)
except Exception as exc:
db.rollback()
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail=f"Failed to rebuild FAISS index: {exc}",
)

db.commit()
if os.path.exists(storage_path):
try:
os.remove(storage_path)
except OSError:
pass

return RAGDocumentDeleteResponse(
deleted_document_id=document_id,
documents_remaining=len(remaining_documents),
index_rebuilt=bool(remaining_documents),
index_size_bytes=index_size_bytes,
)


@router.post("/query", response_model=RAGQueryResponse)
Expand Down Expand Up @@ -375,4 +519,4 @@ def get_rag_history(
}
for q in queries
],
}
}
1 change: 1 addition & 0 deletions backend/app/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ class Settings(BaseSettings):
RAG_CHUNK_SIZE: int = 1000
RAG_CHUNK_OVERLAP: int = 200
FAISS_INDEX_PATH: str = "faiss_index"
RAG_DOCUMENT_STORAGE_PATH: str = "rag_documents"
MLFLOW_TRACKING_URI: str = ""
EMBEDDINGS_MODEL: str = "nomic-embed-text"
RAG_MAX_FILES_PER_REQUEST: int = 10
Expand Down
3 changes: 2 additions & 1 deletion backend/app/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@
from app.models.document import Document
from app.models.rag_feedback import RAGFeedback
from app.models.rag_query import RagQuery
from app.models.rag_document import RAGDocument
from app.models.audit_log import AISystemAuditLog
from app.models.guard_scan_log import GuardScanLog
from app.models.webhook import WebhookConfig
from app.models.notification import Notification
from app.models.compliance_snapshot import ComplianceSnapshot
__all__ = ["User", "AISystem", "RiskAssessment", "Document", "RAGFeedback", "RagQuery", "AISystemAuditLog", "GuardScanLog", "WebhookConfig", "Notification", "ComplianceSnapshot"]
__all__ = ["User", "AISystem", "RiskAssessment", "Document", "RAGFeedback", "RagQuery", "RAGDocument", "AISystemAuditLog", "GuardScanLog", "WebhookConfig", "Notification", "ComplianceSnapshot"]
20 changes: 20 additions & 0 deletions backend/app/models/rag_document.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from datetime import datetime

from sqlalchemy import Column, DateTime, Integer, String

from app.core.database import Base


class RAGDocument(Base):
__tablename__ = "rag_documents"

id = Column(Integer, primary_key=True, index=True)
filename = Column(String(255), nullable=False)
original_filename = Column(String(255), nullable=False)
storage_path = Column(String(1000), nullable=False)
content_type = Column(String(255), nullable=True)
file_size_bytes = Column(Integer, nullable=False, default=0)
chunks_count = Column(Integer, nullable=False, default=0)
uploaded_by_id = Column(Integer, nullable=True)
created_at = Column(DateTime, default=datetime.utcnow, nullable=False)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False)
Loading