Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat/support ndjson in pipeline #327

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions test_e2e/python/test-ingest-couchbase-output.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@
import math
import time
from datetime import timedelta
from pathlib import Path

import click
import ndjson
from couchbase import search
from couchbase.auth import PasswordAuthenticator
from couchbase.cluster import Cluster
Expand All @@ -23,6 +25,16 @@ def get_client(username, password, connection_string) -> Cluster:
return cluster


def get_data(filepath: Path) -> list[dict]:
with filepath.open() as f:
if filepath.suffix == ".json":
return json.load(f)
elif filepath.suffix == ".ndjson":
return ndjson.load(f)
else:
raise ValueError(f"Unsupported input format: {filepath}")


@click.group(name="couchbase-ingest")
@click.option("--username", type=str)
@click.option("--password", type=str)
Expand Down Expand Up @@ -87,10 +99,10 @@ def check(ctx, expected_docs):


@cli.command()
@click.option("--output-json", type=click.File())
@click.option("--output-json", type=click.Path(path_type=Path))
@click.pass_context
def check_vector(ctx, output_json):
json_content = json.load(output_json)
def check_vector(ctx, output_json: Path):
json_content = get_data(filepath=output_json)
key_0 = next(iter(json_content[0])) # Get the first key
exact_embedding = json_content[0][key_0]["embedding"]
exact_text = json_content[0][key_0]["text"]
Expand Down
2 changes: 1 addition & 1 deletion unstructured_ingest/v2/interfaces/file_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ def file_data_from_file(path: str) -> FileData:
try:
return BatchFileData.from_file(path=path)
except ValidationError:
logger.debug(f"{path} not valid for batch file data")
logger.debug(f"{path} not detected as batch file data")

return FileData.from_file(path=path)

Expand Down
9 changes: 2 additions & 7 deletions unstructured_ingest/v2/interfaces/upload_stager.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ def get_output_path(self, output_filename: str, output_dir: Path) -> Path:
output_path = Path(output_filename)
output_filename = f"{Path(output_filename).stem}{output_path.suffix}"
output_path = Path(output_dir) / Path(f"{output_filename}")
output_path.parent.mkdir(parents=True, exist_ok=True)
return output_path

def stream_update(self, input_file: Path, output_file: Path, file_data: FileData) -> None:
Expand All @@ -53,16 +52,12 @@ def stream_update(self, input_file: Path, output_file: Path, file_data: FileData
writer.f.flush()

def process_whole(self, input_file: Path, output_file: Path, file_data: FileData) -> None:
with input_file.open() as in_f:
elements_contents = json.load(in_f)

elements_contents = self.get_data(elements_filepath=input_file)
conformed_elements = [
self.conform_dict(element_dict=element, file_data=file_data)
for element in elements_contents
]

with open(output_file, "w") as out_f:
json.dump(conformed_elements, out_f, indent=2)
self.save_data(output_filepath=output_file, content=conformed_elements)

def run(
self,
Expand Down
7 changes: 7 additions & 0 deletions unstructured_ingest/v2/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,13 @@ def check_destination_connector(self):
uploader_connector_type = self.uploader_step.process.connector_type
registry_entry = destination_registry[uploader_connector_type]
if registry_entry.upload_stager and self.stager_step is None:
try:
self.stager_step = UploadStageStep(
process=registry_entry.upload_stager(), context=self.context
)
return
except Exception as e:
logger.debug(f"failed to instantiate required stager on user's behalf: {e}")
raise ValueError(
f"pipeline with uploader type {self.uploader_step.process.__class__.__name__} "
f"expects a stager of type {registry_entry.upload_stager.__name__} "
Expand Down
7 changes: 4 additions & 3 deletions unstructured_ingest/v2/pipeline/steps/chunk.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import asyncio
import hashlib
import json
from dataclasses import dataclass
from pathlib import Path
from typing import Callable, Optional, TypedDict

import ndjson

from unstructured_ingest.v2.interfaces import FileData
from unstructured_ingest.v2.interfaces.file_data import file_data_from_file
from unstructured_ingest.v2.logger import logger
Expand Down Expand Up @@ -38,15 +39,15 @@ def should_chunk(self, filepath: Path, file_data: FileData) -> bool:
return not filepath.exists()

def get_output_filepath(self, filename: Path) -> Path:
hashed_output_file = f"{self.get_hash(extras=[filename.name])}.json"
hashed_output_file = f"{self.get_hash(extras=[filename.name])}.ndjson"
filepath = (self.cache_dir / hashed_output_file).resolve()
filepath.parent.mkdir(parents=True, exist_ok=True)
return filepath

def _save_output(self, output_filepath: str, chunked_content: list[dict]):
with open(str(output_filepath), "w") as f:
logger.debug(f"writing chunker output to: {output_filepath}")
json.dump(chunked_content, f, indent=2)
ndjson.dump(chunked_content, f)

async def _run_async(
self, fn: Callable, path: str, file_data_path: str, **kwargs
Expand Down
21 changes: 14 additions & 7 deletions unstructured_ingest/v2/pipeline/steps/embed.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
from pathlib import Path
from typing import Callable, Optional, TypedDict

import ndjson

from unstructured_ingest.v2.interfaces import FileData
from unstructured_ingest.v2.interfaces.file_data import file_data_from_file
from unstructured_ingest.v2.logger import logger
Expand Down Expand Up @@ -37,21 +39,26 @@ def should_embed(self, filepath: Path, file_data: FileData) -> bool:
return True
return not filepath.exists()

def get_output_filepath(self, filename: Path) -> Path:
hashed_output_file = f"{self.get_hash(extras=[filename.name])}.json"
def get_output_filepath(self, filename: Path, suffix: str = ".json") -> Path:
hashed_output_file = f"{self.get_hash(extras=[filename.name])}{suffix}"
filepath = (self.cache_dir / hashed_output_file).resolve()
filepath.parent.mkdir(parents=True, exist_ok=True)
return filepath

def _save_output(self, output_filepath: str, embedded_content: list[dict]):
with open(str(output_filepath), "w") as f:
def _save_output(self, output_filepath: Path, embedded_content: list[dict]):
with output_filepath.open("w") as f:
logger.debug(f"writing embedded output to: {output_filepath}")
json.dump(embedded_content, f, indent=2)
if output_filepath.suffix == ".json":
json.dump(embedded_content, f, indent=2)
elif output_filepath.suffix == ".ndjson":
ndjson.dump(embedded_content, f)
else:
raise ValueError(f"Unsupported input format: {output_filepath}")

async def _run_async(self, fn: Callable, path: str, file_data_path: str) -> EmbedStepResponse:
path = Path(path)
file_data = file_data_from_file(path=file_data_path)
output_filepath = self.get_output_filepath(filename=path)
output_filepath = self.get_output_filepath(filename=path, suffix=Path(path).suffix)
if not self.should_embed(filepath=output_filepath, file_data=file_data):
logger.debug(f"skipping embedding, output already exists: {output_filepath}")
return EmbedStepResponse(file_data_path=file_data_path, path=str(output_filepath))
Expand All @@ -65,7 +72,7 @@ async def _run_async(self, fn: Callable, path: str, file_data_path: str) -> Embe
embed_content_raw = await fn(**fn_kwargs)

self._save_output(
output_filepath=str(output_filepath),
output_filepath=output_filepath,
embedded_content=embed_content_raw,
)
return EmbedStepResponse(file_data_path=file_data_path, path=str(output_filepath))
Expand Down
7 changes: 4 additions & 3 deletions unstructured_ingest/v2/pipeline/steps/partition.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import asyncio
import hashlib
import json
from dataclasses import dataclass
from pathlib import Path
from typing import Callable, Optional, TypedDict

import ndjson

from unstructured_ingest.v2.interfaces import FileData
from unstructured_ingest.v2.interfaces.file_data import file_data_from_file
from unstructured_ingest.v2.logger import logger
Expand Down Expand Up @@ -38,15 +39,15 @@ def should_partition(self, filepath: Path, file_data: FileData) -> bool:
return not filepath.exists()

def get_output_filepath(self, filename: Path) -> Path:
hashed_output_file = f"{self.get_hash(extras=[filename.name])}.json"
hashed_output_file = f"{self.get_hash(extras=[filename.name])}.ndjson"
filepath = (self.cache_dir / hashed_output_file).resolve()
filepath.parent.mkdir(parents=True, exist_ok=True)
return filepath

def _save_output(self, output_filepath: str, partitioned_content: list[dict]):
with open(str(output_filepath), "w") as f:
logger.debug(f"writing partitioned output to: {output_filepath}")
json.dump(partitioned_content, f, indent=2)
ndjson.dump(partitioned_content, f)

async def _run_async(
self, fn: Callable, path: str, file_data_path: str
Expand Down
17 changes: 15 additions & 2 deletions unstructured_ingest/v2/processes/chunker.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import json
from abc import ABC
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Optional

import ndjson
from pydantic import BaseModel, Field, SecretStr

from unstructured_ingest.utils.chunking import assign_and_map_hash_ids
Expand Down Expand Up @@ -89,12 +91,23 @@ class Chunker(BaseProcess, ABC):
def is_async(self) -> bool:
return self.config.chunk_by_api

def get_data(self, elements_filepath: Path) -> list[dict]:
if elements_filepath.suffix == ".json":
with elements_filepath.open() as f:
return json.load(f)
elif elements_filepath.suffix == ".ndjson":
with elements_filepath.open() as f:
return ndjson.load(f)
else:
raise ValueError(f"Unsupported input format: {elements_filepath}")

@requires_dependencies(dependencies=["unstructured"])
def run(self, elements_filepath: Path, **kwargs: Any) -> list[dict]:
from unstructured.chunking import dispatch
from unstructured.staging.base import elements_from_json
from unstructured.staging.base import elements_from_dicts

elements = elements_from_json(filename=str(elements_filepath))
elements_dicts = self.get_data(elements_filepath=elements_filepath)
elements = elements_from_dicts(elements_dicts)
if not elements:
return [e.to_dict() for e in elements]
local_chunking_strategies = ("basic", "by_title")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,9 +187,9 @@ class DatabricksVolumesUploader(Uploader, ABC):
upload_config: DatabricksVolumesUploaderConfig
connection_config: DatabricksVolumesConnectionConfig

def get_output_path(self, file_data: FileData) -> str:
def get_output_path(self, file_data: FileData, suffix: str = ".json") -> str:
return os.path.join(
self.upload_config.path, f"{file_data.source_identifiers.filename}.json"
self.upload_config.path, f"{file_data.source_identifiers.filename}{suffix}"
)

def precheck(self) -> None:
Expand All @@ -199,7 +199,7 @@ def precheck(self) -> None:
raise self.connection_config.wrap_error(e=e)

def run(self, path: Path, file_data: FileData, **kwargs: Any) -> None:
output_path = self.get_output_path(file_data=file_data)
output_path = self.get_output_path(file_data=file_data, suffix=path.suffix)
with open(path, "rb") as elements_file:
try:
self.connection_config.get_client().files.upload(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@
DatabricksVolumesUploader,
DatabricksVolumesUploaderConfig,
)
from unstructured_ingest.v2.processes.utils.blob_storage import (
BlobStoreUploadStager,
BlobStoreUploadStagerConfig,
)

CONNECTOR_TYPE = "databricks_volumes_aws"

Expand Down Expand Up @@ -76,6 +80,8 @@ class DatabricksAWSVolumesUploader(DatabricksVolumesUploader):
connection_config=DatabricksAWSVolumesConnectionConfig,
uploader=DatabricksAWSVolumesUploader,
uploader_config=DatabricksAWSVolumesUploaderConfig,
upload_stager=BlobStoreUploadStager,
upload_stager_config=BlobStoreUploadStagerConfig,
)

databricks_aws_volumes_source_entry = SourceRegistryEntry(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@
DatabricksVolumesUploader,
DatabricksVolumesUploaderConfig,
)
from unstructured_ingest.v2.processes.utils.blob_storage import (
BlobStoreUploadStager,
BlobStoreUploadStagerConfig,
)

CONNECTOR_TYPE = "databricks_volumes_azure"

Expand Down Expand Up @@ -91,6 +95,8 @@ class DatabricksAzureVolumesUploader(DatabricksVolumesUploader):
connection_config=DatabricksAzureVolumesConnectionConfig,
uploader=DatabricksAzureVolumesUploader,
uploader_config=DatabricksAzureVolumesUploaderConfig,
upload_stager_config=BlobStoreUploadStagerConfig,
upload_stager=BlobStoreUploadStager,
)

databricks_azure_volumes_source_entry = SourceRegistryEntry(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@
DatabricksVolumesUploader,
DatabricksVolumesUploaderConfig,
)
from unstructured_ingest.v2.processes.utils.blob_storage import (
BlobStoreUploadStager,
BlobStoreUploadStagerConfig,
)

CONNECTOR_TYPE = "databricks_volumes_gcp"

Expand Down Expand Up @@ -74,6 +78,8 @@ class DatabricksGoogleVolumesUploader(DatabricksVolumesUploader):
connection_config=DatabricksGoogleVolumesConnectionConfig,
uploader=DatabricksGoogleVolumesUploader,
uploader_config=DatabricksGoogleVolumesUploaderConfig,
upload_stager=BlobStoreUploadStager,
upload_stager_config=BlobStoreUploadStagerConfig,
)

databricks_gcp_volumes_source_entry = SourceRegistryEntry(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@
DatabricksVolumesUploader,
DatabricksVolumesUploaderConfig,
)
from unstructured_ingest.v2.processes.utils.blob_storage import (
BlobStoreUploadStager,
BlobStoreUploadStagerConfig,
)

CONNECTOR_TYPE = "databricks_volumes"

Expand Down Expand Up @@ -75,6 +79,8 @@ class DatabricksNativeVolumesUploader(DatabricksVolumesUploader):
connection_config=DatabricksNativeVolumesConnectionConfig,
uploader=DatabricksNativeVolumesUploader,
uploader_config=DatabricksNativeVolumesUploaderConfig,
upload_stager=BlobStoreUploadStager,
upload_stager_config=BlobStoreUploadStagerConfig,
)

databricks_native_volumes_source_entry = SourceRegistryEntry(
Expand Down
6 changes: 6 additions & 0 deletions unstructured_ingest/v2/processes/connectors/fsspec/azure.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@
FsspecUploaderConfig,
)
from unstructured_ingest.v2.processes.connectors.fsspec.utils import json_serial, sterilize_dict
from unstructured_ingest.v2.processes.utils.blob_storage import (
BlobStoreUploadStager,
BlobStoreUploadStagerConfig,
)

if TYPE_CHECKING:
from adlfs import AzureBlobFileSystem
Expand Down Expand Up @@ -194,4 +198,6 @@ class AzureUploader(FsspecUploader):
uploader=AzureUploader,
uploader_config=AzureUploaderConfig,
connection_config=AzureConnectionConfig,
upload_stager_config=BlobStoreUploadStagerConfig,
upload_stager=BlobStoreUploadStager,
)
6 changes: 6 additions & 0 deletions unstructured_ingest/v2/processes/connectors/fsspec/box.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@
FsspecUploaderConfig,
)
from unstructured_ingest.v2.processes.connectors.utils import conform_string_to_dict
from unstructured_ingest.v2.processes.utils.blob_storage import (
BlobStoreUploadStager,
BlobStoreUploadStagerConfig,
)

if TYPE_CHECKING:
from boxfs import BoxFileSystem
Expand Down Expand Up @@ -167,4 +171,6 @@ class BoxUploader(FsspecUploader):
uploader=BoxUploader,
uploader_config=BoxUploaderConfig,
connection_config=BoxConnectionConfig,
upload_stager=BlobStoreUploadStager,
upload_stager_config=BlobStoreUploadStagerConfig,
)
6 changes: 6 additions & 0 deletions unstructured_ingest/v2/processes/connectors/fsspec/dropbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@
FsspecUploader,
FsspecUploaderConfig,
)
from unstructured_ingest.v2.processes.utils.blob_storage import (
BlobStoreUploadStager,
BlobStoreUploadStagerConfig,
)

if TYPE_CHECKING:
from dropboxdrivefs import DropboxDriveFileSystem
Expand Down Expand Up @@ -165,4 +169,6 @@ class DropboxUploader(FsspecUploader):
uploader=DropboxUploader,
uploader_config=DropboxUploaderConfig,
connection_config=DropboxConnectionConfig,
upload_stager=BlobStoreUploadStager,
upload_stager_config=BlobStoreUploadStagerConfig,
)
Loading
Loading