Skip to content

Implement lakeFS document loader #1

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Mar 26, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
199 changes: 145 additions & 54 deletions langchain_lakefs/document_loaders.py
Original file line number Diff line number Diff line change
@@ -1,72 +1,163 @@
"""LakeFS document loader."""

from typing import Iterator

"""LakeFS document loader using the official lakeFS Python SDK."""
import tempfile
import os
from typing import List, Optional, Any
import lakefs.branch
from lakefs.client import Client
from langchain_core.document_loaders.base import BaseLoader
from langchain_core.documents import Document
from langchain_community.document_loaders.unstructured import UnstructuredBaseLoader
from unstructured.partition.auto import partition


class LakeFSLoader(BaseLoader):
# TODO: Replace all TODOs in docstring. See example docstring:
# https://github.com/langchain-ai/langchain/blob/869523ad728e6b76d77f170cce13925b4ebc3c1e/libs/community/langchain_community/document_loaders/recursive_url_loader.py#L54
"""
LakeFS document loader integration
LakeFS document loader integration using the official lakeFS Python SDK.

# TODO: Replace with relevant packages, env vars.
Setup:
Install ``langchain-lakefs`` and set environment variable ``LAKEFS_API_KEY``.
Install ``langchain-lakefs`` and ``lakefs``:

.. code-block:: bash

pip install -U langchain-lakefs
export LAKEFS_API_KEY="your-api-key"
pip install -U langchain-lakefs lakefs-client

# TODO: Replace with relevant init params.
Instantiate:
.. code-block:: python

from langchain_community.document_loaders import LakeFSLoader
from langchain_lakefs.document_loaders import LakeFSLoader

loader = LakeFSLoader(
# required params = ...
# other params = ...
lakefs_endpoint="https://example.my-lakefs.com",
lakefs_access_key="your-access-key",
lakefs_secret_key="your-secret-key",
repo="my-repo",
ref="main",
path="path/to/files"
)
"""

Lazy load:
.. code-block:: python

docs = []
docs_lazy = loader.lazy_load()

# async variant:
# docs_lazy = await loader.alazy_load()

for doc in docs_lazy:
docs.append(doc)
print(docs[0].page_content[:100])
print(docs[0].metadata)

.. code-block:: python

TODO: Example output

# TODO: Delete if async load is not implemented
Async load:
.. code-block:: python

docs = await loader.aload()
print(docs[0].page_content[:100])
print(docs[0].metadata)

.. code-block:: python

TODO: Example output
""" # noqa: E501

# TODO: This method must be implemented to load documents.
# Do not implement load(), a default implementation is already available.
def lazy_load(self) -> Iterator[Document]:
raise NotImplementedError()

# TODO: Implement if you would like to change default BaseLoader implementation
# async def alazy_load(self) -> AsyncIterator[Document]:
def __init__(
self,
lakefs_endpoint: str,
lakefs_access_key: str,
lakefs_secret_key: str,
repo: str = "",
ref: str = "main",
path: str = "",
):
self.client = Client(
host=lakefs_endpoint,
username=lakefs_access_key,
password=lakefs_secret_key
)
self.repo = repo
self.ref = ref
self.path = path
self.user_metadata = False

def set_path(self, path: str) -> None:
"""Set the path to load documents from."""
self.path = path

def set_ref(self, ref: str) -> None:
"""Set the ref to load documents from."""
self.ref = ref

def set_repo(self, repo: str) -> None:
"""Set the repository to load documents from."""
self.repo = repo

def set_user_metadata(self, user_metadata: bool) -> None:
"""Set whether to load user metadata."""
self.user_metadata = user_metadata

def load(self) -> List[Document]:
"""Load documents from lakeFS using presigned URLs if supported."""

self.__validate_instance()

objects = lakefs.repository(self.repo, client=self.client).ref(self.ref).objects(user_metadata=True, prefix=self.path)
documents = [
doc
for obj in objects # Iterate over ObjectInfo instances
for doc in UnstructuredLakeFSLoader(
obj.physical_address, # Extract physical_address
self.repo,
self.ref,
obj.path, # Extract path
user_metadata=obj.metadata, # Extract metadata
client=self.client,
).load()
]

return documents

def __validate_instance(self) -> None:
if self.repo is None or self.repo == "":
raise ValueError(
"no repository was provided. use `set_repo` to specify a repository"
)
if self.ref is None or self.ref == "":
raise ValueError("no ref was provided. use `set_ref` to specify a ref")
if self.path is None:
raise ValueError("no path was provided. use `set_path` to specify a path")


class UnstructuredLakeFSLoader(UnstructuredBaseLoader):
"""Load from `lakeFS` as unstructured data."""

def __init__(
self,
url: str,
repo: str,
ref: str = "main",
path: str = "",
presign: bool = True,
client: Optional[Client] = None,
# presign: bool = False,
user_metadata: Optional[dict[str,str]] = None,
**unstructured_kwargs: Any,
):
"""Initialize UnstructuredLakeFSLoader.

Args:
:param url:
:param repo:
:param ref:
:param path:
:param presign:
:param user_metadata:
:param lakefs_access_key:
:param lakefs_secret_key:
:param lakefs_endpoint:
"""

super().__init__(**unstructured_kwargs)
self.user_metadata = user_metadata
self.url = url
self.repo = repo
self.ref = ref
self.path = path
self.presign = presign
self.client = client

def _get_metadata(self) -> dict[str, any]:
metadata = {"repo": self.repo, "ref": self.ref, "path": self.path}
if self.user_metadata:
for key, value in self.user_metadata.items():
if key not in metadata:
metadata[key] = value
return metadata

def _get_elements(self) -> List:
local_prefix = "local://"
if self.url.startswith(local_prefix):
local_path = self.url[len(local_prefix):]
return partition(filename=local_path)
else:
with tempfile.TemporaryDirectory() as temp_dir:
file_path = f"{temp_dir}/{self.path.split('/')[-1]}"

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

format alternative path under temporary directory

Suggested change
file_path = f"{temp_dir}/{self.path.split('/')[-1]}"
file_path = os.path.join(temp_dir, os.path.basename(self.path))

os.makedirs(os.path.dirname(file_path), exist_ok=True)
obj = lakefs.repository(self.repo, client=self.client).ref(self.ref).object(self.path)
with open(file_path, mode="wb") as file:
file.write(obj.reader().read())
return partition(filename=file_path)
99 changes: 99 additions & 0 deletions tests/unit_tests/test_lakefs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
import unittest
from typing import Any
from unittest.mock import patch

import pytest
from lakefs import ObjectReader, StoredObject, Reference
import lakefs_sdk
from langchain_lakefs.document_loaders import LakeFSLoader


@pytest.fixture
def mock_get_object() -> Any:
with patch.object(ObjectReader, "read", return_value=b'pdf content'):
yield


@pytest.fixture
def mock_get_storage_id() -> Any:
with patch.object(StoredObject, "storage_id", return_value=''):
yield


@pytest.fixture
def mock_get_reader() -> Any:
with patch.object(StoredObject, "reader", return_value=ObjectReader(None, mode='r', pre_sign=True, client=None)):
yield


@pytest.fixture
def mock_unstructured_local() -> Any:
with patch(
"langchain_lakefs.document_loaders.UnstructuredLakeFSLoader"

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indent

) as mock_unstructured_lakefs:
mock_unstructured_lakefs.return_value.load.return_value = [
("text content", "pdf content")
]
yield mock_unstructured_lakefs.return_value


@pytest.fixture
def mock_list_objects() -> Any:
fake_list = [
lakefs_sdk.ObjectStats(
path="fake_path_1.txt",
path_type="object",
physical_address="fake_address_1",
checksum="checksum1",
metadata={"key": "value", "key2": "value2"},
size_bytes=123,
mtime=1234567890,
),
lakefs_sdk.ObjectStats(
path="fake_path_2.txt",
path_type="object",
physical_address="fake_address_2",
metadata={"key": "value", "key2": "value2"},
checksum="checksum2",
size_bytes=456,
mtime=1234567891,
),
]
with patch.object(Reference, "objects", return_value=fake_list):
yield


class TestLakeFSLoader(unittest.TestCase):
lakefs_access_key: str = "lakefs_access_key"
lakefs_secret_key: str = "lakefs_secret_key"
endpoint: str = "http://localhost:8000"
repo: str = "repo"
ref: str = "ref"
path: str = "path"

@pytest.mark.usefixtures("mock_unstructured_local", "mock_list_objects")
def test_non_presigned_loading(self) -> None:
loader = LakeFSLoader(
lakefs_access_key="lakefs_access_key",
lakefs_secret_key="lakefs_secret_key",
lakefs_endpoint=self.endpoint,
)
loader.set_repo(self.repo)
loader.set_ref(self.ref)
loader.set_path(self.path)
loader.load()

@pytest.mark.usefixtures("mock_list_objects", "mock_get_object", "mock_get_storage_id", "mock_get_reader")
def test_load(self) -> None:
loader = LakeFSLoader(
lakefs_access_key="lakefs_access_key",
lakefs_secret_key="lakefs_secret_key",
lakefs_endpoint=self.endpoint,
)

loader.set_repo(self.repo)
loader.set_ref(self.ref)
loader.set_path(self.path)
documents = loader.load()
self.assertEqual(len(documents), 2)
self.assertEqual(len(documents[0].metadata), 5)