diff --git a/langchain_lakefs/document_loaders.py b/langchain_lakefs/document_loaders.py index 9e20c72..744015e 100644 --- a/langchain_lakefs/document_loaders.py +++ b/langchain_lakefs/document_loaders.py @@ -1,72 +1,163 @@ -"""LakeFS document loader.""" - -from typing import Iterator - +"""LakeFS document loader using the official lakeFS Python SDK.""" +import tempfile +import os +from typing import List, Optional, Any +import lakefs.branch +from lakefs.client import Client from langchain_core.document_loaders.base import BaseLoader from langchain_core.documents import Document +from langchain_community.document_loaders.unstructured import UnstructuredBaseLoader +from unstructured.partition.auto import partition class LakeFSLoader(BaseLoader): - # TODO: Replace all TODOs in docstring. See example docstring: - # https://github.com/langchain-ai/langchain/blob/869523ad728e6b76d77f170cce13925b4ebc3c1e/libs/community/langchain_community/document_loaders/recursive_url_loader.py#L54 """ - LakeFS document loader integration + LakeFS document loader integration using the official lakeFS Python SDK. - # TODO: Replace with relevant packages, env vars. Setup: - Install ``langchain-lakefs`` and set environment variable ``LAKEFS_API_KEY``. + Install ``langchain-lakefs`` and ``lakefs``: .. code-block:: bash - pip install -U langchain-lakefs - export LAKEFS_API_KEY="your-api-key" + pip install -U langchain-lakefs lakefs-client - # TODO: Replace with relevant init params. Instantiate: .. code-block:: python - - from langchain_community.document_loaders import LakeFSLoader + from langchain_lakefs.document_loaders import LakeFSLoader loader = LakeFSLoader( - # required params = ... - # other params = ... + lakefs_endpoint="https://example.my-lakefs.com", + lakefs_access_key="your-access-key", + lakefs_secret_key="your-secret-key", + repo="my-repo", + ref="main", + path="path/to/files" ) + """ - Lazy load: - .. code-block:: python - - docs = [] - docs_lazy = loader.lazy_load() - - # async variant: - # docs_lazy = await loader.alazy_load() - - for doc in docs_lazy: - docs.append(doc) - print(docs[0].page_content[:100]) - print(docs[0].metadata) - - .. code-block:: python - - TODO: Example output - - # TODO: Delete if async load is not implemented - Async load: - .. code-block:: python - - docs = await loader.aload() - print(docs[0].page_content[:100]) - print(docs[0].metadata) - - .. code-block:: python - - TODO: Example output - """ # noqa: E501 - - # TODO: This method must be implemented to load documents. - # Do not implement load(), a default implementation is already available. - def lazy_load(self) -> Iterator[Document]: - raise NotImplementedError() - - # TODO: Implement if you would like to change default BaseLoader implementation - # async def alazy_load(self) -> AsyncIterator[Document]: + def __init__( + self, + lakefs_endpoint: str, + lakefs_access_key: str, + lakefs_secret_key: str, + repo: str = "", + ref: str = "main", + path: str = "", + ): + self.client = Client( + host=lakefs_endpoint, + username=lakefs_access_key, + password=lakefs_secret_key + ) + self.repo = repo + self.ref = ref + self.path = path + self.user_metadata = False + + def set_path(self, path: str) -> None: + """Set the path to load documents from.""" + self.path = path + + def set_ref(self, ref: str) -> None: + """Set the ref to load documents from.""" + self.ref = ref + + def set_repo(self, repo: str) -> None: + """Set the repository to load documents from.""" + self.repo = repo + + def set_user_metadata(self, user_metadata: bool) -> None: + """Set whether to load user metadata.""" + self.user_metadata = user_metadata + + def load(self) -> List[Document]: + """Load documents from lakeFS using presigned URLs if supported.""" + + self.__validate_instance() + + objects = lakefs.repository(self.repo, client=self.client).ref(self.ref).objects(user_metadata=True, prefix=self.path) + documents = [ + doc + for obj in objects # Iterate over ObjectInfo instances + for doc in UnstructuredLakeFSLoader( + obj.physical_address, # Extract physical_address + self.repo, + self.ref, + obj.path, # Extract path + user_metadata=obj.metadata, # Extract metadata + client=self.client, + ).load() + ] + + return documents + + def __validate_instance(self) -> None: + if self.repo is None or self.repo == "": + raise ValueError( + "no repository was provided. use `set_repo` to specify a repository" + ) + if self.ref is None or self.ref == "": + raise ValueError("no ref was provided. use `set_ref` to specify a ref") + if self.path is None: + raise ValueError("no path was provided. use `set_path` to specify a path") + + +class UnstructuredLakeFSLoader(UnstructuredBaseLoader): + """Load from `lakeFS` as unstructured data.""" + + def __init__( + self, + url: str, + repo: str, + ref: str = "main", + path: str = "", + presign: bool = True, + client: Optional[Client] = None, + # presign: bool = False, + user_metadata: Optional[dict[str,str]] = None, + **unstructured_kwargs: Any, + ): + """Initialize UnstructuredLakeFSLoader. + + Args: + :param url: + :param repo: + :param ref: + :param path: + :param presign: + :param user_metadata: + :param lakefs_access_key: + :param lakefs_secret_key: + :param lakefs_endpoint: + """ + + super().__init__(**unstructured_kwargs) + self.user_metadata = user_metadata + self.url = url + self.repo = repo + self.ref = ref + self.path = path + self.presign = presign + self.client = client + + def _get_metadata(self) -> dict[str, any]: + metadata = {"repo": self.repo, "ref": self.ref, "path": self.path} + if self.user_metadata: + for key, value in self.user_metadata.items(): + if key not in metadata: + metadata[key] = value + return metadata + + def _get_elements(self) -> List: + local_prefix = "local://" + if self.url.startswith(local_prefix): + local_path = self.url[len(local_prefix):] + return partition(filename=local_path) + else: + with tempfile.TemporaryDirectory() as temp_dir: + file_path = f"{temp_dir}/{self.path.split('/')[-1]}" + os.makedirs(os.path.dirname(file_path), exist_ok=True) + obj = lakefs.repository(self.repo, client=self.client).ref(self.ref).object(self.path) + with open(file_path, mode="wb") as file: + file.write(obj.reader().read()) + return partition(filename=file_path) diff --git a/tests/unit_tests/test_lakefs.py b/tests/unit_tests/test_lakefs.py new file mode 100644 index 0000000..2aef34e --- /dev/null +++ b/tests/unit_tests/test_lakefs.py @@ -0,0 +1,99 @@ +import unittest +from typing import Any +from unittest.mock import patch + +import pytest +from lakefs import ObjectReader, StoredObject, Reference +import lakefs_sdk +from langchain_lakefs.document_loaders import LakeFSLoader + + +@pytest.fixture +def mock_get_object() -> Any: + with patch.object(ObjectReader, "read", return_value=b'pdf content'): + yield + + +@pytest.fixture +def mock_get_storage_id() -> Any: + with patch.object(StoredObject, "storage_id", return_value=''): + yield + + +@pytest.fixture +def mock_get_reader() -> Any: + with patch.object(StoredObject, "reader", return_value=ObjectReader(None, mode='r', pre_sign=True, client=None)): + yield + + +@pytest.fixture +def mock_unstructured_local() -> Any: + with patch( + "langchain_lakefs.document_loaders.UnstructuredLakeFSLoader" + ) as mock_unstructured_lakefs: + mock_unstructured_lakefs.return_value.load.return_value = [ + ("text content", "pdf content") + ] + yield mock_unstructured_lakefs.return_value + + +@pytest.fixture +def mock_list_objects() -> Any: + fake_list = [ + lakefs_sdk.ObjectStats( + path="fake_path_1.txt", + path_type="object", + physical_address="fake_address_1", + checksum="checksum1", + metadata={"key": "value", "key2": "value2"}, + size_bytes=123, + mtime=1234567890, + ), + lakefs_sdk.ObjectStats( + path="fake_path_2.txt", + path_type="object", + physical_address="fake_address_2", + metadata={"key": "value", "key2": "value2"}, + checksum="checksum2", + size_bytes=456, + mtime=1234567891, + ), + ] + with patch.object(Reference, "objects", return_value=fake_list): + yield + + +class TestLakeFSLoader(unittest.TestCase): + lakefs_access_key: str = "lakefs_access_key" + lakefs_secret_key: str = "lakefs_secret_key" + endpoint: str = "http://localhost:8000" + repo: str = "repo" + ref: str = "ref" + path: str = "path" + + @pytest.mark.usefixtures("mock_unstructured_local", "mock_list_objects") + def test_non_presigned_loading(self) -> None: + loader = LakeFSLoader( + lakefs_access_key="lakefs_access_key", + lakefs_secret_key="lakefs_secret_key", + lakefs_endpoint=self.endpoint, + ) + loader.set_repo(self.repo) + loader.set_ref(self.ref) + loader.set_path(self.path) + loader.load() + + @pytest.mark.usefixtures("mock_list_objects", "mock_get_object", "mock_get_storage_id", "mock_get_reader") + def test_load(self) -> None: + loader = LakeFSLoader( + lakefs_access_key="lakefs_access_key", + lakefs_secret_key="lakefs_secret_key", + lakefs_endpoint=self.endpoint, + ) + + loader.set_repo(self.repo) + loader.set_ref(self.ref) + loader.set_path(self.path) + documents = loader.load() + self.assertEqual(len(documents), 2) + self.assertEqual(len(documents[0].metadata), 5)