Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions alembic/versions/7791001eb8a2_add_fts_gin_index.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
"""add_fts_gin_index

Revision ID: 7791001eb8a2
Revises: 8dceb919041e
Create Date: 2026-02-09 17:40:52.049941

"""
from typing import Sequence, Union

from alembic import op


# revision identifiers, used by Alembic.
revision: str = '7791001eb8a2'
down_revision: Union[str, Sequence[str], None] = '8dceb919041e'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
"""Create GIN index for full-text search on document_chunks.content.

Note: For production deployments with large existing datasets, consider
using CREATE INDEX CONCURRENTLY to avoid locking the table. Alembic
doesn't support CONCURRENTLY natively — use raw SQL with autocommit.
"""
op.execute("""
CREATE INDEX ix_document_chunks_content_fts
ON document_chunks
USING GIN (to_tsvector('english', content))
""")


def downgrade() -> None:
"""Drop the full-text search GIN index."""
op.execute("DROP INDEX IF EXISTS ix_document_chunks_content_fts")
19 changes: 10 additions & 9 deletions ragitect/agents/rag/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@
from typing import Callable, Awaitable
from uuid import UUID

from langchain_core.tools import tool, BaseTool
from langchain_core.tools import tool

from ragitect.services.config import RETRIEVAL_RRF_K
from ragitect.services.database.repositories.vector_repo import VectorRepository
from ragitect.agents.rag.state import ContextChunk

Expand Down Expand Up @@ -42,29 +43,29 @@ async def _retrieve_documents_impl(
if query_embedding is None:
query_embedding = await embed_fn(query)

# Search for similar chunks
chunks_with_distances = await vector_repo.search_similar_chunks(
# Search using hybrid RRF fusion (vector + full-text search)
chunks_with_scores = await vector_repo.hybrid_search(
workspace_id=UUID(workspace_id),
query_vector=query_embedding,
query_text=query,
k=top_k,
rrf_k=RETRIEVAL_RRF_K,
)

# Convert to ContextChunk format
# Note: search_similar_chunks returns (chunk, distance) tuples
# Distance is cosine distance: 0 = identical, 2 = opposite
# Convert to similarity: similarity = 1.0 - distance
# Note: document.file_name requires relationship loading, deferred to graph node
# Note: hybrid_search returns (chunk, rrf_score) tuples
# RRF score is already "higher = better", no conversion needed
# Embedding preserved from DB to avoid redundant API calls during MMR selection
return [
ContextChunk(
chunk_id=str(chunk.id),
content=chunk.content,
score=1.0 - distance,
score=rrf_score,
document_id=str(chunk.document_id),
title="", # Populated by graph node after document lookup
embedding=list(chunk.embedding),
)
for chunk, distance in chunks_with_distances
for chunk, rrf_score in chunks_with_scores
]


Expand Down
4 changes: 4 additions & 0 deletions ragitect/services/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
)
DEFAULT_RETRIEVAL_K: int = int(os.getenv("DEFAULT_RETRIEVAL_K", "10"))

# Embedding dimension (must match the model's output dimension)
EMBEDDING_DIMENSION: int = int(os.getenv("EMBEDDING_DIMENSION", "768"))

# Default LLM model for LangGraph nodes and other services
# Format: "{provider}/{model}" for LiteLLM compatibility
DEFAULT_LLM_MODEL: str = os.getenv("DEFAULT_LLM_MODEL", "ollama/llama3.1:8b")
Expand All @@ -48,6 +51,7 @@
os.getenv("RETRIEVAL_ADAPTIVE_K_GAP_THRESHOLD", "0.15")
)
RETRIEVAL_TOKEN_BUDGET: int = int(os.getenv("RETRIEVAL_TOKEN_BUDGET", "4000"))
RETRIEVAL_RRF_K: int = int(os.getenv("RETRIEVAL_RRF_K", "60"))

# Encryption key for API key storage (required for cloud LLM providers)
ENCRYPTION_KEY: str | None = os.getenv("ENCRYPTION_KEY")
Expand Down
171 changes: 164 additions & 7 deletions ragitect/services/database/repositories/vector_repo.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
"""Vector repository for similarity search operations"""

from sqlalchemy import func, select
from sqlalchemy import func, select, type_coerce, Float
from ragitect.services.database.exceptions import NotFoundError
from ragitect.services.database.models import Document, Workspace
from ragitect.services.database.exceptions import ValidationError
from ragitect.services.database.models import DocumentChunk
from ragitect.services.config import EMBEDDING_DIMENSION
from uuid import UUID
from sqlalchemy.ext.asyncio.session import AsyncSession
import logging
Expand Down Expand Up @@ -72,9 +73,10 @@ async def search_similar_chunks(
ValidationError: If query_vector dimension is invalid
NotFoundError: If workspace does not exist
"""
if len(query_vector) != 768:
if len(query_vector) != EMBEDDING_DIMENSION:
raise ValidationError(
"query_vector", f"Expected 768 dimensions, got {len(query_vector)}"
"query_vector",
f"Expected {EMBEDDING_DIMENSION} dimensions, got {len(query_vector)}",
)

# verify if workspace exists
Expand Down Expand Up @@ -114,6 +116,159 @@ async def search_similar_chunks(
)
return chunks_with_score

async def hybrid_search(
self,
workspace_id: UUID,
query_vector: list[float],
query_text: str,
k: int = 10,
rrf_k: int = 60,
vector_weight: float = 1.0,
fts_weight: float = 1.0,
) -> list[tuple[DocumentChunk, float]]:
"""Search using hybrid RRF fusion of vector similarity and full-text search.

Combines cosine similarity (pgvector) with PostgreSQL full-text search
via Reciprocal Rank Fusion (RRF) in a single CTE-based SQL query.

RRF Formula: score = Σ(weight / (k + rank_i)) for each retrieval system.

When full-text search returns no matches, gracefully degrades to
vector-only ranking.

Args:
workspace_id: Workspace to search within
query_vector: Query embedding vector (768 dims)
query_text: Original query text for full-text search
k: Number of results to return
rrf_k: RRF constant (default: 60). Higher values reduce rank impact.
vector_weight: Weight for vector search scores (default: 1.0)
fts_weight: Weight for full-text search scores (default: 1.0)

Returns:
List of (DocumentChunk, rrf_score) tuples ordered by RRF score
(descending, higher = better).

Raises:
ValidationError: If query_vector dimension is invalid
NotFoundError: If workspace does not exist
"""
if len(query_vector) != EMBEDDING_DIMENSION:
raise ValidationError(
"query_vector",
f"Expected {EMBEDDING_DIMENSION} dimensions, got {len(query_vector)}",
)

# Verify workspace exists
workspace = await self.session.get(Workspace, workspace_id)
if workspace is None:
raise NotFoundError("Workspace", workspace_id)

oversample = k * 3

# CTE 1: Semantic search ranked by cosine distance (ascending = better)
distance_col = DocumentChunk.embedding.cosine_distance(query_vector).label(
"distance"
)
semantic_cte = (
select(
DocumentChunk.id.label("chunk_id"),
func.row_number()
.over(order_by=distance_col)
.label("semantic_rank"),
)
.where(DocumentChunk.workspace_id == workspace_id)
.order_by(distance_col)
.limit(oversample)
.cte("semantic_search")
)

# CTE 2: Full-text search ranked by ts_rank_cd (descending = better)
fts_config = "english"
ts_vector = func.to_tsvector(fts_config, DocumentChunk.content)
ts_query = func.plainto_tsquery(fts_config, query_text)

keyword_cte = (
select(
DocumentChunk.id.label("chunk_id"),
func.row_number()
.over(
order_by=func.ts_rank_cd(ts_vector, ts_query).desc()
)
.label("keyword_rank"),
)
.where(
DocumentChunk.workspace_id == workspace_id,
ts_vector.op("@@")(ts_query),
)
.order_by(func.ts_rank_cd(ts_vector, ts_query).desc())
.limit(oversample)
.cte("keyword_search")
)

# Full outer join + RRF score computation
# Use coalesce: if a chunk only appears in one list, the other rank is absent
rrf_score = (
func.coalesce(
type_coerce(vector_weight, Float)
/ (rrf_k + semantic_cte.c.semantic_rank),
0.0,
)
+ func.coalesce(
type_coerce(fts_weight, Float)
/ (rrf_k + keyword_cte.c.keyword_rank),
0.0,
)
).label("rrf_score")

# Coalesce chunk_id from both CTEs for the join back to DocumentChunk
chunk_id_col = func.coalesce(
semantic_cte.c.chunk_id, keyword_cte.c.chunk_id
).label("chunk_id")

# Full outer join
fusion_query = (
select(chunk_id_col, rrf_score)
.select_from(
semantic_cte.outerjoin(
keyword_cte,
semantic_cte.c.chunk_id == keyword_cte.c.chunk_id,
full=True,
)
)
.order_by(rrf_score.desc())
.limit(k)
.subquery("fusion")
)

# Join back to DocumentChunk to get full model
final_stmt = (
select(DocumentChunk, fusion_query.c.rrf_score)
.join(fusion_query, DocumentChunk.id == fusion_query.c.chunk_id)
.order_by(fusion_query.c.rrf_score.desc())
)

result = await self.session.execute(final_stmt)
results = result.all()

chunks_with_scores = [
(chunk, float(rrf_score)) for chunk, rrf_score in results
]

logger.info(
f"Hybrid search: found {len(chunks_with_scores)} chunks "
+ f"(workspace={workspace_id}, k={k}, rrf_k={rrf_k})"
)

if chunks_with_scores:
scores = [score for _, score in chunks_with_scores]
logger.debug(
f"RRF score range: [{min(scores):.6f}, {max(scores):.6f}], "
+ f"mean: {sum(scores) / len(scores):.6f}"
)

return chunks_with_scores

async def search_similar_documents(
self,
workspace_id: UUID,
Expand Down Expand Up @@ -141,9 +296,10 @@ async def search_similar_documents(
ValidationError: If query_vector dimension is invalid
NotFoundError: If workspace does not exist
"""
if len(query_vector) != 768:
if len(query_vector) != EMBEDDING_DIMENSION:
raise ValidationError(
"query_vector", f"Expected 768 dimensions, got {len(query_vector)}"
"query_vector",
f"Expected {EMBEDDING_DIMENSION} dimensions, got {len(query_vector)}",
)

workspace = await self.session.get(Workspace, workspace_id)
Expand Down Expand Up @@ -220,9 +376,10 @@ async def get_chunk_by_document(
Raises:
ValidationError: if query_vector dimension is invalid
"""
if len(query_vector) != 768:
if len(query_vector) != EMBEDDING_DIMENSION:
raise ValidationError(
"query_vector", f"Expected 768 dimensions, got {len(query_vector)}"
"query_vector",
f"Expected {EMBEDDING_DIMENSION} dimensions, got {len(query_vector)}",
)

distance_col = DocumentChunk.embedding.cosine_distance(query_vector).label(
Expand Down
Loading