-
Notifications
You must be signed in to change notification settings - Fork 448
[FOR SHARING PURPOSES ONLY] RAG ingestion and chat pipelines #2736
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 29 commits
c0a1dfe
e437994
81e6723
3548250
af11a91
03f5a46
0a773f0
f7d00ed
4511190
fce81d3
82dc6ac
ddbc355
f0daa10
ed8c45f
ccae17a
f84bc01
90f97f3
95b6e1d
d884295
afffbd4
c23e063
e62fed2
0ea564c
e083324
ae88f7c
ac55e11
419ef6f
93e5369
8591322
55c7306
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -37,3 +37,4 @@ wandb>=0.16.4 | |
| xdg-base-dirs>=6.0.1 | ||
| psutil>=6.0.0 | ||
| huggingface_hub[hf_transfer]>=0.1.8 | ||
| haystack-ai>=2.8 | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,4 @@ | ||
| # Cannot upgrade because of https://github.com/milvus-io/milvus-haystack/issues/39 | ||
| milvus_haystack==0.0.11 | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same here for new requirements |
||
| docling-core[chunking]>=2.10.0 | ||
| sentence-transformers>=3.0.0 | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,128 @@ | ||
| # SPDX-License-Identifier: Apache-2.0 | ||
|
|
||
| # Standard | ||
| import logging | ||
|
|
||
| # Third Party | ||
| import click | ||
|
|
||
| # First Party | ||
| from instructlab import clickext | ||
| from instructlab.data.ingest_docs import ingest_docs | ||
| from instructlab.data.taxonomy_utils import lookup_processed_documents_folder | ||
| from instructlab.defaults import DEFAULTS | ||
| from instructlab.rag.rag_configuration import ( # type: ignore | ||
| document_store_configuration, | ||
| embedder_configuration, | ||
| ) | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| # TODO: fill-in help fields | ||
| @click.command() | ||
| @click.option( | ||
| "--document-store-type", | ||
| default="milvuslite", | ||
| envvar="ILAB_DOCUMENT_STORE_TYPE", | ||
| type=click.STRING, | ||
| help="The document store type, one of: `milvuslite`.", | ||
| ) | ||
| @click.option( | ||
| "--document-store-uri", | ||
| default="embeddings.db", | ||
| envvar="ILAB_DOCUMENT_STORE_URI", | ||
| type=click.STRING, | ||
| help="The document store URI", | ||
| ) | ||
| @click.option( | ||
| "--document-store-collection-name", | ||
| default="IlabEmbeddings", | ||
| envvar="ILAB_DOCUMENT_STORE_COLLECTION_NAME", | ||
| type=click.STRING, | ||
| help="The document store collection name", | ||
| ) | ||
| @click.option( | ||
| "--model-dir", | ||
| default=lambda: DEFAULTS.MODELS_DIR, | ||
| envvar="ILAB_MODEL_DIR", | ||
| show_default="The default system model location store, located in the data directory.", | ||
| help="Base directories where models are stored.", | ||
| ) | ||
| @click.option( | ||
| "--embedding-model", | ||
| "embedding_model_name", | ||
| default="sentence-transformers/all-minilm-l6-v2", | ||
| envvar="ILAB_EMBEDDING_MODEL_NAME", | ||
| type=click.STRING, | ||
| help="The embedding model name", | ||
| ) | ||
| @click.option( | ||
| "--output-dir", | ||
| envvar="ILAB_OUTPUT_DIR", | ||
| help="Directory where generated datasets are stored.", | ||
| ) | ||
| @click.argument( | ||
dmartinol marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| "input_dir", | ||
| required=False, | ||
| type=click.Path( | ||
| exists=True, | ||
| dir_okay=True, | ||
| file_okay=False, | ||
| ), | ||
| ) | ||
| @click.pass_context | ||
| @clickext.display_params | ||
| def ingest( | ||
| ctx, | ||
| document_store_type, | ||
| document_store_uri, | ||
| document_store_collection_name, | ||
| model_dir, | ||
| embedding_model_name, | ||
| output_dir, | ||
| input_dir, | ||
| ): | ||
| """The embedding ingestion pipeline""" | ||
|
|
||
| document_store_config = document_store_configuration( | ||
| type=document_store_type, | ||
| uri=document_store_uri, | ||
| collection_name=document_store_collection_name, | ||
| ) | ||
| embedder_config = embedder_configuration( | ||
| model_dir=model_dir, | ||
| model_name=embedding_model_name, | ||
| ) | ||
| logger.info(f"VectorDB params: {vars(document_store_config)}") | ||
| logger.info(f"Embedding model: {vars(embedder_config)}") | ||
| if not embedder_config.validate_local_model_path(): | ||
| raise click.UsageError( | ||
| f"Cannot find local embedding model {embedding_model_name} in {model_dir}. Download the model before running the pipeline." | ||
| ) | ||
|
|
||
| if input_dir is None: | ||
| if output_dir is None: | ||
| output_dir = ctx.obj.config.generate.output_dir | ||
| if output_dir is None: | ||
| output_dir = DEFAULTS.DATASETS_DIR | ||
| logger.info(f"Ingesting latest taxonomy changes at {output_dir}") | ||
| processed_docs_folder = lookup_processed_documents_folder(output_dir) | ||
| if processed_docs_folder is None: | ||
| click.secho( | ||
| f"Cannot find the latest processed documents folders from {output_dir}." | ||
| + " Please verify that you executed `ilab data generate` and you have updated or new knowledge" | ||
| + " documents in the current taxonomy." | ||
| ) | ||
| raise click.exceptions.Exit(1) | ||
|
|
||
| logger.info(f"Latest processed docs are in {processed_docs_folder}") | ||
| input_dir = processed_docs_folder | ||
|
|
||
| ingest_docs( | ||
| input_dir=input_dir, | ||
| document_store_config=document_store_config, | ||
| embedder_config=embedder_config, | ||
| ) | ||
|
|
||
| return | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,84 @@ | ||
| # SPDX-License-Identifier: Apache-2.0 | ||
|
|
||
| # Standard | ||
| import logging | ||
|
|
||
| # Third Party | ||
| import click | ||
|
|
||
| # First Party | ||
| from instructlab import clickext | ||
| from instructlab.configuration import DEFAULTS | ||
| from instructlab.data.process_docs import ( | ||
| process_docs_from_folder, | ||
| process_docs_from_taxonomy, | ||
| ) | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| # TODO: fill-in help fields | ||
| @click.command() | ||
| @click.option( | ||
| "--input", | ||
| "input_dir", | ||
| required=False, | ||
| default=None, | ||
| envvar="ILAB_PROCESS_INPUT", | ||
| help="The folder with user documents to process. In case it's missing, the knowledge taxonomy files will be processed instead.", | ||
| type=click.Path(exists=True, file_okay=False, dir_okay=True, readable=True), | ||
| ) | ||
| @click.option( | ||
| "--taxonomy-path", | ||
| type=click.Path(), | ||
| envvar="ILAB_TAXONOMY_PATH", | ||
| help="Directory where taxonomy is stored and accessed from.", | ||
| ) | ||
| @click.option( | ||
| "--taxonomy-base", | ||
| envvar="ILAB_TAXONOMY_BASE", | ||
| help="Branch of taxonomy used to calculate diff against.", | ||
| ) | ||
| @click.argument( | ||
dmartinol marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| "output_dir", | ||
| type=click.Path( | ||
| exists=True, | ||
| dir_okay=True, | ||
| file_okay=False, | ||
| ), | ||
| ) | ||
| @click.pass_context | ||
| @clickext.display_params | ||
| def process( | ||
| ctx, | ||
| taxonomy_path, | ||
| taxonomy_base, | ||
| input_dir, | ||
| output_dir, | ||
| ): | ||
| """The document processing pipeline""" | ||
|
|
||
| if input_dir is None: | ||
| if taxonomy_path is None: | ||
| taxonomy_path = ctx.obj.config.generate.taxonomy_path | ||
| if taxonomy_path is None: | ||
| taxonomy_path = DEFAULTS.TAXONOMY_DIR | ||
| if taxonomy_base is None: | ||
| taxonomy_base = ctx.obj.config.generate.taxonomy_base | ||
| if taxonomy_base is None: | ||
| taxonomy_base = DEFAULTS.TAXONOMY_BASE | ||
|
|
||
| logger.info( | ||
| f"Pre-processing latest taxonomy changes at {taxonomy_path}@{taxonomy_base}" | ||
| ) | ||
| process_docs_from_taxonomy( | ||
| taxonomy_path=taxonomy_path, | ||
| taxonomy_base=taxonomy_base, | ||
| output_dir=output_dir, | ||
| ) | ||
| else: | ||
| logger.info(f"Pre-processing documents from {input_dir} to {output_dir}") | ||
| process_docs_from_folder( | ||
| input_dir=input_dir, | ||
| output_dir=output_dir, | ||
| ) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -3,7 +3,7 @@ | |
| # Standard | ||
| from os import path | ||
| from re import match | ||
| from typing import Any, Optional, Union | ||
| from typing import Any, Dict, Optional, Union | ||
| import enum | ||
| import logging | ||
| import os | ||
|
|
@@ -162,6 +162,9 @@ class _chat(BaseModel): | |
| default=1.0, | ||
| description="Controls the randomness of the model's responses. Lower values make the output more deterministic, while higher values produce more random results.", | ||
| ) | ||
| rag: Optional[Dict[str, Any]] = Field( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should be its own class with specific arguments we can validate and run through tests rather than a dictionary that can contain anything. If we are putting something directly into our config, we need to have control over validation. if we were going to separate config yaml route, we could just blindly read it in, but I think this approach is better.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The way it's been designed follows the initial conversation we had 2 weeks ago, when we decided not to update the configuration to avoid compatibility issues in case we would have changed any of these commands, as their definition seemed "unstable". |
||
| default_factory=dict, description="The RAG chat configuration" | ||
| ) | ||
|
|
||
|
|
||
| class _serve_vllm(BaseModel): | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,116 @@ | ||
| # Standard | ||
| from pathlib import Path | ||
| import glob | ||
| import logging | ||
| import os | ||
|
|
||
| # Third Party | ||
| from haystack import Pipeline # type: ignore | ||
| from haystack.components.converters import TextFileToDocument # type: ignore | ||
| from haystack.components.embedders import ( # type: ignore | ||
| SentenceTransformersDocumentEmbedder, | ||
| ) | ||
| from haystack.components.preprocessors import DocumentCleaner # type: ignore | ||
| from haystack.components.writers import DocumentWriter # type: ignore | ||
| from milvus_haystack import MilvusDocumentStore # type: ignore | ||
|
|
||
| # First Party | ||
| from instructlab.haystack.docling_splitter import DoclingDocumentSplitter | ||
| from instructlab.rag.rag_configuration import ( | ||
| document_store_configuration, | ||
| embedder_configuration, | ||
| ) | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| def ingest_docs( | ||
| input_dir: str, | ||
| document_store_config: document_store_configuration, | ||
| embedder_config: embedder_configuration, | ||
| ): | ||
| pipeline = _create_pipeline( | ||
| document_store_config=document_store_config, | ||
| embedder_config=embedder_config, | ||
| ) | ||
| _connect_components(pipeline) | ||
| _ingest_docs(pipeline=pipeline, input_dir=input_dir) | ||
|
|
||
|
|
||
| def _create_pipeline( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if we could add docstrings to these, that would be great!
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. only to "public" functions/methods? |
||
| document_store_config: document_store_configuration, | ||
| embedder_config: embedder_configuration, | ||
| ) -> Pipeline: | ||
| pipeline = Pipeline() | ||
| pipeline.add_component(instance=_converter_component(), name="converter") | ||
| pipeline.add_component(instance=DocumentCleaner(), name="document_cleaner") | ||
| # TODO make the params configurable | ||
| pipeline.add_component( | ||
| instance=_splitter_component(embedder_config), | ||
| name="document_splitter", | ||
| ) | ||
| # TODO make this more generic | ||
| pipeline.add_component( | ||
| instance=SentenceTransformersDocumentEmbedder( | ||
| model=embedder_config.local_model_path() | ||
| ), | ||
| name="document_embedder", | ||
| ) | ||
| pipeline.add_component( | ||
| instance=DocumentWriter( | ||
| _document_store_component( | ||
| document_store_type=document_store_config.type, | ||
| document_store_uri=document_store_config.uri, | ||
| collection_name=document_store_config.collection_name, | ||
| ) | ||
| ), | ||
| name="document_writer", | ||
| ) | ||
| return pipeline | ||
|
|
||
|
|
||
| def _connect_components(pipeline): | ||
| pipeline.connect("converter", "document_cleaner") | ||
| pipeline.connect("document_cleaner", "document_splitter") | ||
| pipeline.connect("document_splitter", "document_embedder") | ||
| pipeline.connect("document_embedder", "document_writer") | ||
|
|
||
|
|
||
| def _ingest_docs(pipeline, input_dir): | ||
| pattern = "*json" | ||
| if Path(os.path.join(input_dir, "docling-artifacts")).exists(): | ||
| pattern = "docling-artifacts/" + pattern | ||
|
|
||
| ingestion_results = pipeline.run( | ||
| {"converter": {"sources": glob.glob(os.path.join(input_dir, pattern))}} | ||
| ) | ||
|
|
||
| document_store = pipeline.get_component("document_writer").document_store | ||
| logger.info(f"count_documents: {document_store.count_documents()}") | ||
| logger.info( | ||
| f"document_writer.documents_written: {ingestion_results['document_writer']['documents_written']}" | ||
| ) | ||
|
|
||
|
|
||
| def _document_store_component(document_store_type, document_store_uri, collection_name): | ||
| if document_store_type == "milvuslite": | ||
| document_store = MilvusDocumentStore( | ||
| connection_args={"uri": document_store_uri}, | ||
| collection_name=collection_name, | ||
| drop_old=True, | ||
| ) | ||
| return document_store | ||
| else: | ||
| raise ValueError(f"Unmanaged document store type {document_store_type}") | ||
|
|
||
|
|
||
| def _converter_component(): | ||
| return TextFileToDocument() | ||
|
|
||
|
|
||
| def _splitter_component(embedding_model): | ||
| return DoclingDocumentSplitter( | ||
| embedding_model_id=embedding_model.local_model_path(), | ||
| content_format="json", | ||
| max_tokens=150, | ||
| ) | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am pretty sure new requirements like these need to be run by a specific set of people.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could you please elaborate more on the "specific set of people" part? (for sure I miss something here)