-
Notifications
You must be signed in to change notification settings - Fork 0
feat: add vector migration script to re-ingest chunks with new models #57
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
c47fbd0
3d0dcdb
9b0926c
81f10f5
b390dd9
509dc7f
0a3b002
6b7c354
6d4a77c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
| @@ -0,0 +1,111 @@ | ||||||
| """ | ||||||
| Script to migrate/re-ingest chunks into the configured vector database. | ||||||
| It reads the `chunk_index` records from the SQL database and pushes | ||||||
| them to the Vector Store, using the current embedding model. | ||||||
| This is useful when changing embedding models or vector databases. | ||||||
| """ | ||||||
|
|
||||||
| import os | ||||||
| import sys | ||||||
| from datetime import datetime | ||||||
| from typing import cast, Dict, Any | ||||||
| from uuid import UUID | ||||||
|
|
||||||
| # Add project root to sys.path | ||||||
| sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) | ||||||
|
|
||||||
| from sqlalchemy.orm import Session | ||||||
| from src.infrastructure.repositories.sql.connector import Session as DBSessionFactory | ||||||
| from src.infrastructure.repositories.sql.models.chunk_index import ChunkIndexModel | ||||||
| from src.infrastructure.services.model_loader_service import ModelLoaderService | ||||||
| from src.infrastructure.repositories.vector.models.chunk_model import ChunkModel | ||||||
| from src.config.settings import Settings | ||||||
| from src.config.logger import Logger | ||||||
| from src.presentation.api.dependencies import get_vector_repository | ||||||
|
|
||||||
| logger = Logger() | ||||||
|
|
||||||
|
|
||||||
| def migrate_vector_db(batch_size: int = 100) -> None: | ||||||
| settings = Settings() | ||||||
|
|
||||||
| embedding_model_name = settings.model_embedding.name | ||||||
| logger.info(f"Initializing Model Loader Service with model: {embedding_model_name}") | ||||||
| model_loader = ModelLoaderService(model_name=embedding_model_name) | ||||||
| model_loader.load_model() | ||||||
|
|
||||||
| logger.info("Initializing Vector Repository...") | ||||||
| # This automatically instantiates the vector repo for the correct type defined in .env | ||||||
| vector_repo = get_vector_repository(settings=settings, model_loader=model_loader) | ||||||
|
|
||||||
| if not vector_repo.is_ready(): | ||||||
| logger.error("Vector Repository is not ready. Aborting.") | ||||||
| sys.exit(1) | ||||||
|
|
||||||
| db: Session = DBSessionFactory() | ||||||
| try: | ||||||
| total_chunks = db.query(ChunkIndexModel).count() | ||||||
| logger.info(f"Total chunks to migrate: {total_chunks}") | ||||||
|
|
||||||
| offset = 0 | ||||||
| while offset < total_chunks: | ||||||
| chunk_models_sql = ( | ||||||
| db.query(ChunkIndexModel) | ||||||
| .order_by(ChunkIndexModel.created_at) | ||||||
| .offset(offset) | ||||||
| .limit(batch_size) | ||||||
| .all() | ||||||
| ) | ||||||
|
|
||||||
| if not chunk_models_sql: | ||||||
| break | ||||||
|
|
||||||
| documents = [] | ||||||
| for chunk_sql in chunk_models_sql: | ||||||
| extra_data = ( | ||||||
| cast(Dict[str, Any], chunk_sql.extra) | ||||||
| if isinstance(chunk_sql.extra, dict) | ||||||
| else {} | ||||||
| ) | ||||||
| if chunk_sql.vector_store_type: | ||||||
| extra_data["original_vector_store_type"] = ( | ||||||
| chunk_sql.vector_store_type | ||||||
| ) | ||||||
|
||||||
|
|
||||||
| doc = ChunkModel( | ||||||
| id=cast(UUID, chunk_sql.id), | ||||||
| job_id=cast(UUID, chunk_sql.job_id), | ||||||
| content_source_id=cast(UUID, chunk_sql.content_source_id), | ||||||
| source_type=str(chunk_sql.source_type or "UNKNOWN"), | ||||||
| external_source=cast(str, chunk_sql.external_source), | ||||||
| subject_id=cast(UUID, chunk_sql.subject_id), | ||||||
| index=cast(int, chunk_sql.index), | ||||||
| content=cast(str, chunk_sql.content), | ||||||
| tokens_count=cast(int, chunk_sql.tokens_count), | ||||||
| language=cast(str, chunk_sql.language), | ||||||
| embedding_model=embedding_model_name, | ||||||
| created_at=cast(datetime, chunk_sql.created_at or datetime.now()), | ||||||
|
||||||
| created_at=cast(datetime, chunk_sql.created_at or datetime.now()), | |
| created_at=cast(datetime, chunk_sql.created_at), |
Copilot
AI
Apr 7, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This script reuses the existing chunk_sql.id as the vector document ID. For backends that don't upsert on insert (e.g., Chroma uses add_texts in create_documents), rerunning this migration against a non-empty collection can fail with duplicate-ID errors or create duplicates. Consider either deleting/clearing the target collection up front (or documenting that scripts/clear_vector_db.py must be run first), or adding an explicit upsert/replace behavior for migration mode.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@copilot apply changes based on this feedback
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Applied in commit 81f10f5. Added a --clear CLI flag that calls clear_vector_db() before the migration begins, preventing duplicate-ID errors on backends that don't upsert on insert (e.g. ChromaDB). Run with python scripts/migrate_vector_db.py --clear for a clean migration, or without the flag to preserve existing data.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The
count()+OFFSET/LIMITpagination pattern becomes very slow on large tables, and SQLAlchemy will also keep each loaded ORM object in the session identity map (risking unbounded memory growth during long migrations). Consider keyset pagination (e.g., by(created_at, id)) or iterating withyield_per/streaming results, and expunging/clearing the session per batch.