diff --git a/test_e2e/python/test-ingest-couchbase-output.py b/test_e2e/python/test-ingest-couchbase-output.py index 040c56f9e..6552b9e5b 100644 --- a/test_e2e/python/test-ingest-couchbase-output.py +++ b/test_e2e/python/test-ingest-couchbase-output.py @@ -2,8 +2,10 @@ import math import time from datetime import timedelta +from pathlib import Path import click +import ndjson from couchbase import search from couchbase.auth import PasswordAuthenticator from couchbase.cluster import Cluster @@ -23,6 +25,16 @@ def get_client(username, password, connection_string) -> Cluster: return cluster +def get_data(filepath: Path) -> list[dict]: + with filepath.open() as f: + if filepath.suffix == ".json": + return json.load(f) + elif filepath.suffix == ".ndjson": + return ndjson.load(f) + else: + raise ValueError(f"Unsupported input format: {filepath}") + + @click.group(name="couchbase-ingest") @click.option("--username", type=str) @click.option("--password", type=str) @@ -87,10 +99,10 @@ def check(ctx, expected_docs): @cli.command() -@click.option("--output-json", type=click.File()) +@click.option("--output-json", type=click.Path(path_type=Path)) @click.pass_context -def check_vector(ctx, output_json): - json_content = json.load(output_json) +def check_vector(ctx, output_json: Path): + json_content = get_data(filepath=output_json) key_0 = next(iter(json_content[0])) # Get the first key exact_embedding = json_content[0][key_0]["embedding"] exact_text = json_content[0][key_0]["text"] diff --git a/unstructured_ingest/v2/interfaces/file_data.py b/unstructured_ingest/v2/interfaces/file_data.py index e3a9a0ddd..b46b992fc 100644 --- a/unstructured_ingest/v2/interfaces/file_data.py +++ b/unstructured_ingest/v2/interfaces/file_data.py @@ -102,7 +102,7 @@ def file_data_from_file(path: str) -> FileData: try: return BatchFileData.from_file(path=path) except ValidationError: - logger.debug(f"{path} not valid for batch file data") + logger.debug(f"{path} not detected as batch file data") return FileData.from_file(path=path) diff --git a/unstructured_ingest/v2/interfaces/upload_stager.py b/unstructured_ingest/v2/interfaces/upload_stager.py index 8b623a4ae..fb6d0628c 100644 --- a/unstructured_ingest/v2/interfaces/upload_stager.py +++ b/unstructured_ingest/v2/interfaces/upload_stager.py @@ -39,7 +39,6 @@ def get_output_path(self, output_filename: str, output_dir: Path) -> Path: output_path = Path(output_filename) output_filename = f"{Path(output_filename).stem}{output_path.suffix}" output_path = Path(output_dir) / Path(f"{output_filename}") - output_path.parent.mkdir(parents=True, exist_ok=True) return output_path def stream_update(self, input_file: Path, output_file: Path, file_data: FileData) -> None: @@ -53,16 +52,12 @@ def stream_update(self, input_file: Path, output_file: Path, file_data: FileData writer.f.flush() def process_whole(self, input_file: Path, output_file: Path, file_data: FileData) -> None: - with input_file.open() as in_f: - elements_contents = json.load(in_f) - + elements_contents = self.get_data(elements_filepath=input_file) conformed_elements = [ self.conform_dict(element_dict=element, file_data=file_data) for element in elements_contents ] - - with open(output_file, "w") as out_f: - json.dump(conformed_elements, out_f, indent=2) + self.save_data(output_filepath=output_file, content=conformed_elements) def run( self, diff --git a/unstructured_ingest/v2/pipeline/pipeline.py b/unstructured_ingest/v2/pipeline/pipeline.py index 327eefb5a..b41b45127 100644 --- a/unstructured_ingest/v2/pipeline/pipeline.py +++ b/unstructured_ingest/v2/pipeline/pipeline.py @@ -111,6 +111,13 @@ def check_destination_connector(self): uploader_connector_type = self.uploader_step.process.connector_type registry_entry = destination_registry[uploader_connector_type] if registry_entry.upload_stager and self.stager_step is None: + try: + self.stager_step = UploadStageStep( + process=registry_entry.upload_stager(), context=self.context + ) + return + except Exception as e: + logger.debug(f"failed to instantiate required stager on user's behalf: {e}") raise ValueError( f"pipeline with uploader type {self.uploader_step.process.__class__.__name__} " f"expects a stager of type {registry_entry.upload_stager.__name__} " diff --git a/unstructured_ingest/v2/pipeline/steps/chunk.py b/unstructured_ingest/v2/pipeline/steps/chunk.py index 519dcc8f9..a495c8351 100644 --- a/unstructured_ingest/v2/pipeline/steps/chunk.py +++ b/unstructured_ingest/v2/pipeline/steps/chunk.py @@ -1,10 +1,11 @@ import asyncio import hashlib -import json from dataclasses import dataclass from pathlib import Path from typing import Callable, Optional, TypedDict +import ndjson + from unstructured_ingest.v2.interfaces import FileData from unstructured_ingest.v2.interfaces.file_data import file_data_from_file from unstructured_ingest.v2.logger import logger @@ -38,7 +39,7 @@ def should_chunk(self, filepath: Path, file_data: FileData) -> bool: return not filepath.exists() def get_output_filepath(self, filename: Path) -> Path: - hashed_output_file = f"{self.get_hash(extras=[filename.name])}.json" + hashed_output_file = f"{self.get_hash(extras=[filename.name])}.ndjson" filepath = (self.cache_dir / hashed_output_file).resolve() filepath.parent.mkdir(parents=True, exist_ok=True) return filepath @@ -46,7 +47,7 @@ def get_output_filepath(self, filename: Path) -> Path: def _save_output(self, output_filepath: str, chunked_content: list[dict]): with open(str(output_filepath), "w") as f: logger.debug(f"writing chunker output to: {output_filepath}") - json.dump(chunked_content, f, indent=2) + ndjson.dump(chunked_content, f) async def _run_async( self, fn: Callable, path: str, file_data_path: str, **kwargs diff --git a/unstructured_ingest/v2/pipeline/steps/embed.py b/unstructured_ingest/v2/pipeline/steps/embed.py index ba3431a89..0861a5c25 100644 --- a/unstructured_ingest/v2/pipeline/steps/embed.py +++ b/unstructured_ingest/v2/pipeline/steps/embed.py @@ -5,6 +5,8 @@ from pathlib import Path from typing import Callable, Optional, TypedDict +import ndjson + from unstructured_ingest.v2.interfaces import FileData from unstructured_ingest.v2.interfaces.file_data import file_data_from_file from unstructured_ingest.v2.logger import logger @@ -37,21 +39,26 @@ def should_embed(self, filepath: Path, file_data: FileData) -> bool: return True return not filepath.exists() - def get_output_filepath(self, filename: Path) -> Path: - hashed_output_file = f"{self.get_hash(extras=[filename.name])}.json" + def get_output_filepath(self, filename: Path, suffix: str = ".json") -> Path: + hashed_output_file = f"{self.get_hash(extras=[filename.name])}{suffix}" filepath = (self.cache_dir / hashed_output_file).resolve() filepath.parent.mkdir(parents=True, exist_ok=True) return filepath - def _save_output(self, output_filepath: str, embedded_content: list[dict]): - with open(str(output_filepath), "w") as f: + def _save_output(self, output_filepath: Path, embedded_content: list[dict]): + with output_filepath.open("w") as f: logger.debug(f"writing embedded output to: {output_filepath}") - json.dump(embedded_content, f, indent=2) + if output_filepath.suffix == ".json": + json.dump(embedded_content, f, indent=2) + elif output_filepath.suffix == ".ndjson": + ndjson.dump(embedded_content, f) + else: + raise ValueError(f"Unsupported input format: {output_filepath}") async def _run_async(self, fn: Callable, path: str, file_data_path: str) -> EmbedStepResponse: path = Path(path) file_data = file_data_from_file(path=file_data_path) - output_filepath = self.get_output_filepath(filename=path) + output_filepath = self.get_output_filepath(filename=path, suffix=Path(path).suffix) if not self.should_embed(filepath=output_filepath, file_data=file_data): logger.debug(f"skipping embedding, output already exists: {output_filepath}") return EmbedStepResponse(file_data_path=file_data_path, path=str(output_filepath)) @@ -65,7 +72,7 @@ async def _run_async(self, fn: Callable, path: str, file_data_path: str) -> Embe embed_content_raw = await fn(**fn_kwargs) self._save_output( - output_filepath=str(output_filepath), + output_filepath=output_filepath, embedded_content=embed_content_raw, ) return EmbedStepResponse(file_data_path=file_data_path, path=str(output_filepath)) diff --git a/unstructured_ingest/v2/pipeline/steps/partition.py b/unstructured_ingest/v2/pipeline/steps/partition.py index 4ffd549fe..371028052 100644 --- a/unstructured_ingest/v2/pipeline/steps/partition.py +++ b/unstructured_ingest/v2/pipeline/steps/partition.py @@ -1,10 +1,11 @@ import asyncio import hashlib -import json from dataclasses import dataclass from pathlib import Path from typing import Callable, Optional, TypedDict +import ndjson + from unstructured_ingest.v2.interfaces import FileData from unstructured_ingest.v2.interfaces.file_data import file_data_from_file from unstructured_ingest.v2.logger import logger @@ -38,7 +39,7 @@ def should_partition(self, filepath: Path, file_data: FileData) -> bool: return not filepath.exists() def get_output_filepath(self, filename: Path) -> Path: - hashed_output_file = f"{self.get_hash(extras=[filename.name])}.json" + hashed_output_file = f"{self.get_hash(extras=[filename.name])}.ndjson" filepath = (self.cache_dir / hashed_output_file).resolve() filepath.parent.mkdir(parents=True, exist_ok=True) return filepath @@ -46,7 +47,7 @@ def get_output_filepath(self, filename: Path) -> Path: def _save_output(self, output_filepath: str, partitioned_content: list[dict]): with open(str(output_filepath), "w") as f: logger.debug(f"writing partitioned output to: {output_filepath}") - json.dump(partitioned_content, f, indent=2) + ndjson.dump(partitioned_content, f) async def _run_async( self, fn: Callable, path: str, file_data_path: str diff --git a/unstructured_ingest/v2/processes/chunker.py b/unstructured_ingest/v2/processes/chunker.py index a209cca92..10bd19f75 100644 --- a/unstructured_ingest/v2/processes/chunker.py +++ b/unstructured_ingest/v2/processes/chunker.py @@ -1,8 +1,10 @@ +import json from abc import ABC from dataclasses import dataclass from pathlib import Path from typing import Any, Optional +import ndjson from pydantic import BaseModel, Field, SecretStr from unstructured_ingest.utils.chunking import assign_and_map_hash_ids @@ -89,12 +91,23 @@ class Chunker(BaseProcess, ABC): def is_async(self) -> bool: return self.config.chunk_by_api + def get_data(self, elements_filepath: Path) -> list[dict]: + if elements_filepath.suffix == ".json": + with elements_filepath.open() as f: + return json.load(f) + elif elements_filepath.suffix == ".ndjson": + with elements_filepath.open() as f: + return ndjson.load(f) + else: + raise ValueError(f"Unsupported input format: {elements_filepath}") + @requires_dependencies(dependencies=["unstructured"]) def run(self, elements_filepath: Path, **kwargs: Any) -> list[dict]: from unstructured.chunking import dispatch - from unstructured.staging.base import elements_from_json + from unstructured.staging.base import elements_from_dicts - elements = elements_from_json(filename=str(elements_filepath)) + elements_dicts = self.get_data(elements_filepath=elements_filepath) + elements = elements_from_dicts(elements_dicts) if not elements: return [e.to_dict() for e in elements] local_chunking_strategies = ("basic", "by_title") diff --git a/unstructured_ingest/v2/processes/connectors/databricks/volumes.py b/unstructured_ingest/v2/processes/connectors/databricks/volumes.py index 3e0f8645e..222b0c866 100644 --- a/unstructured_ingest/v2/processes/connectors/databricks/volumes.py +++ b/unstructured_ingest/v2/processes/connectors/databricks/volumes.py @@ -187,9 +187,9 @@ class DatabricksVolumesUploader(Uploader, ABC): upload_config: DatabricksVolumesUploaderConfig connection_config: DatabricksVolumesConnectionConfig - def get_output_path(self, file_data: FileData) -> str: + def get_output_path(self, file_data: FileData, suffix: str = ".json") -> str: return os.path.join( - self.upload_config.path, f"{file_data.source_identifiers.filename}.json" + self.upload_config.path, f"{file_data.source_identifiers.filename}{suffix}" ) def precheck(self) -> None: @@ -199,7 +199,7 @@ def precheck(self) -> None: raise self.connection_config.wrap_error(e=e) def run(self, path: Path, file_data: FileData, **kwargs: Any) -> None: - output_path = self.get_output_path(file_data=file_data) + output_path = self.get_output_path(file_data=file_data, suffix=path.suffix) with open(path, "rb") as elements_file: try: self.connection_config.get_client().files.upload( diff --git a/unstructured_ingest/v2/processes/connectors/databricks/volumes_aws.py b/unstructured_ingest/v2/processes/connectors/databricks/volumes_aws.py index f0b217a90..871c5d48d 100644 --- a/unstructured_ingest/v2/processes/connectors/databricks/volumes_aws.py +++ b/unstructured_ingest/v2/processes/connectors/databricks/volumes_aws.py @@ -17,6 +17,10 @@ DatabricksVolumesUploader, DatabricksVolumesUploaderConfig, ) +from unstructured_ingest.v2.processes.utils.blob_storage import ( + BlobStoreUploadStager, + BlobStoreUploadStagerConfig, +) CONNECTOR_TYPE = "databricks_volumes_aws" @@ -76,6 +80,8 @@ class DatabricksAWSVolumesUploader(DatabricksVolumesUploader): connection_config=DatabricksAWSVolumesConnectionConfig, uploader=DatabricksAWSVolumesUploader, uploader_config=DatabricksAWSVolumesUploaderConfig, + upload_stager=BlobStoreUploadStager, + upload_stager_config=BlobStoreUploadStagerConfig, ) databricks_aws_volumes_source_entry = SourceRegistryEntry( diff --git a/unstructured_ingest/v2/processes/connectors/databricks/volumes_azure.py b/unstructured_ingest/v2/processes/connectors/databricks/volumes_azure.py index cd53c0de6..1b05c71b0 100644 --- a/unstructured_ingest/v2/processes/connectors/databricks/volumes_azure.py +++ b/unstructured_ingest/v2/processes/connectors/databricks/volumes_azure.py @@ -17,6 +17,10 @@ DatabricksVolumesUploader, DatabricksVolumesUploaderConfig, ) +from unstructured_ingest.v2.processes.utils.blob_storage import ( + BlobStoreUploadStager, + BlobStoreUploadStagerConfig, +) CONNECTOR_TYPE = "databricks_volumes_azure" @@ -91,6 +95,8 @@ class DatabricksAzureVolumesUploader(DatabricksVolumesUploader): connection_config=DatabricksAzureVolumesConnectionConfig, uploader=DatabricksAzureVolumesUploader, uploader_config=DatabricksAzureVolumesUploaderConfig, + upload_stager_config=BlobStoreUploadStagerConfig, + upload_stager=BlobStoreUploadStager, ) databricks_azure_volumes_source_entry = SourceRegistryEntry( diff --git a/unstructured_ingest/v2/processes/connectors/databricks/volumes_gcp.py b/unstructured_ingest/v2/processes/connectors/databricks/volumes_gcp.py index 65f37da75..79a9eea5e 100644 --- a/unstructured_ingest/v2/processes/connectors/databricks/volumes_gcp.py +++ b/unstructured_ingest/v2/processes/connectors/databricks/volumes_gcp.py @@ -17,6 +17,10 @@ DatabricksVolumesUploader, DatabricksVolumesUploaderConfig, ) +from unstructured_ingest.v2.processes.utils.blob_storage import ( + BlobStoreUploadStager, + BlobStoreUploadStagerConfig, +) CONNECTOR_TYPE = "databricks_volumes_gcp" @@ -74,6 +78,8 @@ class DatabricksGoogleVolumesUploader(DatabricksVolumesUploader): connection_config=DatabricksGoogleVolumesConnectionConfig, uploader=DatabricksGoogleVolumesUploader, uploader_config=DatabricksGoogleVolumesUploaderConfig, + upload_stager=BlobStoreUploadStager, + upload_stager_config=BlobStoreUploadStagerConfig, ) databricks_gcp_volumes_source_entry = SourceRegistryEntry( diff --git a/unstructured_ingest/v2/processes/connectors/databricks/volumes_native.py b/unstructured_ingest/v2/processes/connectors/databricks/volumes_native.py index 43223cf87..ab58a1f32 100644 --- a/unstructured_ingest/v2/processes/connectors/databricks/volumes_native.py +++ b/unstructured_ingest/v2/processes/connectors/databricks/volumes_native.py @@ -17,6 +17,10 @@ DatabricksVolumesUploader, DatabricksVolumesUploaderConfig, ) +from unstructured_ingest.v2.processes.utils.blob_storage import ( + BlobStoreUploadStager, + BlobStoreUploadStagerConfig, +) CONNECTOR_TYPE = "databricks_volumes" @@ -75,6 +79,8 @@ class DatabricksNativeVolumesUploader(DatabricksVolumesUploader): connection_config=DatabricksNativeVolumesConnectionConfig, uploader=DatabricksNativeVolumesUploader, uploader_config=DatabricksNativeVolumesUploaderConfig, + upload_stager=BlobStoreUploadStager, + upload_stager_config=BlobStoreUploadStagerConfig, ) databricks_native_volumes_source_entry = SourceRegistryEntry( diff --git a/unstructured_ingest/v2/processes/connectors/fsspec/azure.py b/unstructured_ingest/v2/processes/connectors/fsspec/azure.py index a18c85354..456278b1a 100644 --- a/unstructured_ingest/v2/processes/connectors/fsspec/azure.py +++ b/unstructured_ingest/v2/processes/connectors/fsspec/azure.py @@ -26,6 +26,10 @@ FsspecUploaderConfig, ) from unstructured_ingest.v2.processes.connectors.fsspec.utils import json_serial, sterilize_dict +from unstructured_ingest.v2.processes.utils.blob_storage import ( + BlobStoreUploadStager, + BlobStoreUploadStagerConfig, +) if TYPE_CHECKING: from adlfs import AzureBlobFileSystem @@ -194,4 +198,6 @@ class AzureUploader(FsspecUploader): uploader=AzureUploader, uploader_config=AzureUploaderConfig, connection_config=AzureConnectionConfig, + upload_stager_config=BlobStoreUploadStagerConfig, + upload_stager=BlobStoreUploadStager, ) diff --git a/unstructured_ingest/v2/processes/connectors/fsspec/box.py b/unstructured_ingest/v2/processes/connectors/fsspec/box.py index f72b2b809..bbb53c0b4 100644 --- a/unstructured_ingest/v2/processes/connectors/fsspec/box.py +++ b/unstructured_ingest/v2/processes/connectors/fsspec/box.py @@ -28,6 +28,10 @@ FsspecUploaderConfig, ) from unstructured_ingest.v2.processes.connectors.utils import conform_string_to_dict +from unstructured_ingest.v2.processes.utils.blob_storage import ( + BlobStoreUploadStager, + BlobStoreUploadStagerConfig, +) if TYPE_CHECKING: from boxfs import BoxFileSystem @@ -167,4 +171,6 @@ class BoxUploader(FsspecUploader): uploader=BoxUploader, uploader_config=BoxUploaderConfig, connection_config=BoxConnectionConfig, + upload_stager=BlobStoreUploadStager, + upload_stager_config=BlobStoreUploadStagerConfig, ) diff --git a/unstructured_ingest/v2/processes/connectors/fsspec/dropbox.py b/unstructured_ingest/v2/processes/connectors/fsspec/dropbox.py index f6c5d1d29..32dfb30bb 100644 --- a/unstructured_ingest/v2/processes/connectors/fsspec/dropbox.py +++ b/unstructured_ingest/v2/processes/connectors/fsspec/dropbox.py @@ -32,6 +32,10 @@ FsspecUploader, FsspecUploaderConfig, ) +from unstructured_ingest.v2.processes.utils.blob_storage import ( + BlobStoreUploadStager, + BlobStoreUploadStagerConfig, +) if TYPE_CHECKING: from dropboxdrivefs import DropboxDriveFileSystem @@ -165,4 +169,6 @@ class DropboxUploader(FsspecUploader): uploader=DropboxUploader, uploader_config=DropboxUploaderConfig, connection_config=DropboxConnectionConfig, + upload_stager=BlobStoreUploadStager, + upload_stager_config=BlobStoreUploadStagerConfig, ) diff --git a/unstructured_ingest/v2/processes/connectors/fsspec/fsspec.py b/unstructured_ingest/v2/processes/connectors/fsspec/fsspec.py index 464a9d786..f423c29e4 100644 --- a/unstructured_ingest/v2/processes/connectors/fsspec/fsspec.py +++ b/unstructured_ingest/v2/processes/connectors/fsspec/fsspec.py @@ -308,12 +308,12 @@ def precheck(self) -> None: except Exception as e: raise self.wrap_error(e=e) - def get_upload_path(self, file_data: FileData) -> Path: + def get_upload_path(self, file_data: FileData, suffix: str = ".json") -> Path: upload_path = ( Path(self.upload_config.path_without_protocol) / file_data.source_identifiers.relative_path ) - updated_upload_path = upload_path.parent / f"{upload_path.name}.json" + updated_upload_path = upload_path.parent / f"{upload_path.name}{suffix}" return updated_upload_path def run(self, path: Path, file_data: FileData, **kwargs: Any) -> None: @@ -324,7 +324,7 @@ def run(self, path: Path, file_data: FileData, **kwargs: Any) -> None: client.upload(lpath=path_str, rpath=upload_path.as_posix()) async def run_async(self, path: Path, file_data: FileData, **kwargs: Any) -> None: - upload_path = self.get_upload_path(file_data=file_data) + upload_path = self.get_upload_path(file_data=file_data, suffix=path.suffix) path_str = str(path.resolve()) # Odd that fsspec doesn't run exists() as async even when client support async logger.debug(f"writing local file {path_str} to {upload_path}") diff --git a/unstructured_ingest/v2/processes/connectors/fsspec/gcs.py b/unstructured_ingest/v2/processes/connectors/fsspec/gcs.py index 7657a1ec0..ef1cf71de 100644 --- a/unstructured_ingest/v2/processes/connectors/fsspec/gcs.py +++ b/unstructured_ingest/v2/processes/connectors/fsspec/gcs.py @@ -28,6 +28,10 @@ FsspecUploader, FsspecUploaderConfig, ) +from unstructured_ingest.v2.processes.utils.blob_storage import ( + BlobStoreUploadStager, + BlobStoreUploadStagerConfig, +) if TYPE_CHECKING: from gcsfs import GCSFileSystem @@ -194,4 +198,6 @@ class GcsUploader(FsspecUploader): uploader=GcsUploader, uploader_config=GcsUploaderConfig, connection_config=GcsConnectionConfig, + upload_stager=BlobStoreUploadStager, + upload_stager_config=BlobStoreUploadStagerConfig, ) diff --git a/unstructured_ingest/v2/processes/connectors/fsspec/s3.py b/unstructured_ingest/v2/processes/connectors/fsspec/s3.py index 0e08695f3..5c7f0ef2c 100644 --- a/unstructured_ingest/v2/processes/connectors/fsspec/s3.py +++ b/unstructured_ingest/v2/processes/connectors/fsspec/s3.py @@ -26,6 +26,10 @@ FsspecUploader, FsspecUploaderConfig, ) +from unstructured_ingest.v2.processes.utils.blob_storage import ( + BlobStoreUploadStager, + BlobStoreUploadStagerConfig, +) CONNECTOR_TYPE = "s3" @@ -182,4 +186,6 @@ class S3Uploader(FsspecUploader): uploader=S3Uploader, uploader_config=S3UploaderConfig, connection_config=S3ConnectionConfig, + upload_stager=BlobStoreUploadStager, + upload_stager_config=BlobStoreUploadStagerConfig, ) diff --git a/unstructured_ingest/v2/processes/connectors/fsspec/sftp.py b/unstructured_ingest/v2/processes/connectors/fsspec/sftp.py index c770f3c6a..435747d01 100644 --- a/unstructured_ingest/v2/processes/connectors/fsspec/sftp.py +++ b/unstructured_ingest/v2/processes/connectors/fsspec/sftp.py @@ -26,6 +26,10 @@ FsspecUploader, FsspecUploaderConfig, ) +from unstructured_ingest.v2.processes.utils.blob_storage import ( + BlobStoreUploadStager, + BlobStoreUploadStagerConfig, +) if TYPE_CHECKING: from fsspec.implementations.sftp import SFTPFileSystem @@ -168,4 +172,6 @@ class SftpUploader(FsspecUploader): uploader=SftpUploader, uploader_config=SftpUploaderConfig, connection_config=SftpConnectionConfig, + upload_stager_config=BlobStoreUploadStagerConfig, + upload_stager=BlobStoreUploadStager, ) diff --git a/unstructured_ingest/v2/processes/connectors/local.py b/unstructured_ingest/v2/processes/connectors/local.py index 82b6fd9cf..17b18e0d7 100644 --- a/unstructured_ingest/v2/processes/connectors/local.py +++ b/unstructured_ingest/v2/processes/connectors/local.py @@ -27,6 +27,10 @@ DestinationRegistryEntry, SourceRegistryEntry, ) +from unstructured_ingest.v2.processes.utils.blob_storage import ( + BlobStoreUploadStager, + BlobStoreUploadStagerConfig, +) CONNECTOR_TYPE = "local" @@ -176,7 +180,7 @@ class LocalUploader(Uploader): def is_async(self) -> bool: return False - def get_destination_path(self, file_data: FileData) -> Path: + def get_destination_path(self, file_data: FileData, suffix: str = ".json") -> Path: if source_identifiers := file_data.source_identifiers: rel_path = ( source_identifiers.relative_path[1:] @@ -185,10 +189,10 @@ def get_destination_path(self, file_data: FileData) -> Path: ) new_path = self.upload_config.output_path / Path(rel_path) final_path = str(new_path).replace( - source_identifiers.filename, f"{source_identifiers.filename}.json" + source_identifiers.filename, f"{source_identifiers.filename}{suffix}" ) else: - final_path = self.upload_config.output_path / Path(f"{file_data.identifier}.json") + final_path = self.upload_config.output_path / Path(f"{file_data.identifier}{suffix}") final_path = Path(final_path) final_path.parent.mkdir(parents=True, exist_ok=True) return final_path @@ -199,7 +203,7 @@ def run_data(self, data: list[dict], file_data: FileData, **kwargs: Any) -> None json.dump(data, f) def run(self, path: Path, file_data: FileData, **kwargs: Any) -> None: - final_path = self.get_destination_path(file_data=file_data) + final_path = self.get_destination_path(file_data=file_data, suffix=path.suffix) logger.debug(f"copying file from {path} to {final_path}") shutil.copy(src=str(path), dst=str(final_path)) @@ -213,5 +217,8 @@ def run(self, path: Path, file_data: FileData, **kwargs: Any) -> None: ) local_destination_entry = DestinationRegistryEntry( - uploader=LocalUploader, uploader_config=LocalUploaderConfig + uploader=LocalUploader, + uploader_config=LocalUploaderConfig, + upload_stager=BlobStoreUploadStager, + upload_stager_config=BlobStoreUploadStagerConfig, ) diff --git a/unstructured_ingest/v2/processes/embedder.py b/unstructured_ingest/v2/processes/embedder.py index 66ae9bbee..629db77d2 100644 --- a/unstructured_ingest/v2/processes/embedder.py +++ b/unstructured_ingest/v2/processes/embedder.py @@ -4,6 +4,7 @@ from pathlib import Path from typing import TYPE_CHECKING, Any, Literal, Optional +import ndjson from pydantic import BaseModel, Field, SecretStr from unstructured_ingest.v2.interfaces.process import BaseProcess @@ -184,12 +185,20 @@ def get_embedder(self) -> "BaseEmbeddingEncoder": class Embedder(BaseProcess, ABC): config: EmbedderConfig + def get_data(self, elements_filepath: Path) -> list[dict]: + with elements_filepath.open() as f: + if elements_filepath.suffix == ".json": + return json.load(f) + elif elements_filepath.suffix == ".ndjson": + return ndjson.load(f) + else: + raise ValueError(f"Unsupported input format: {elements_filepath}") + def run(self, elements_filepath: Path, **kwargs: Any) -> list[dict]: # TODO update base embedder classes to support async embedder = self.config.get_embedder() - with elements_filepath.open("r") as elements_file: - elements = json.load(elements_file) + elements = self.get_data(elements_filepath=elements_filepath) if not elements: - return [e.to_dict() for e in elements] + return [] embedded_elements = embedder.embed_documents(elements=elements) return embedded_elements diff --git a/unstructured_ingest/v2/processes/utils/__init__.py b/unstructured_ingest/v2/processes/utils/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/unstructured_ingest/v2/processes/utils/blob_storage.py b/unstructured_ingest/v2/processes/utils/blob_storage.py new file mode 100644 index 000000000..37ace2048 --- /dev/null +++ b/unstructured_ingest/v2/processes/utils/blob_storage.py @@ -0,0 +1,30 @@ +from dataclasses import dataclass, field +from pathlib import Path +from typing import Any + +from unstructured_ingest.v2.interfaces import FileData, UploadStager, UploadStagerConfig + + +class BlobStoreUploadStagerConfig(UploadStagerConfig): + pass + + +@dataclass +class BlobStoreUploadStager(UploadStager): + upload_stager_config: BlobStoreUploadStagerConfig = field( + default_factory=BlobStoreUploadStagerConfig + ) + + def run( + self, + elements_filepath: Path, + file_data: FileData, + output_dir: Path, + output_filename: str, + **kwargs: Any, + ) -> Path: + output_file = self.get_output_path(output_filename=output_filename, output_dir=output_dir) + # Always save as json + data = self.get_data(elements_filepath) + self.save_data(output_filepath=output_file.with_suffix(".json"), content=data) + return output_file.with_suffix(".json")