-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdocument_loaders.py
163 lines (139 loc) · 5.31 KB
/
document_loaders.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
"""LakeFS document loader using the official lakeFS Python SDK."""
import tempfile
import os
from typing import List, Optional, Any
import lakefs.branch
from lakefs.client import Client
from langchain_core.document_loaders.base import BaseLoader
from langchain_core.documents import Document
from langchain_community.document_loaders.unstructured import UnstructuredBaseLoader
from unstructured.partition.auto import partition
class LakeFSLoader(BaseLoader):
"""
LakeFS document loader integration using the official lakeFS Python SDK.
Setup:
Install ``langchain-lakefs`` and ``lakefs``:
.. code-block:: bash
pip install -U langchain-lakefs lakefs-client
Instantiate:
.. code-block:: python
from langchain_lakefs.document_loaders import LakeFSLoader
loader = LakeFSLoader(
lakefs_endpoint="https://example.my-lakefs.com",
lakefs_access_key="your-access-key",
lakefs_secret_key="your-secret-key",
repo="my-repo",
ref="main",
path="path/to/files"
)
"""
def __init__(
self,
lakefs_endpoint: str,
lakefs_access_key: str,
lakefs_secret_key: str,
repo: str = "",
ref: str = "main",
path: str = "",
):
self.client = Client(
host=lakefs_endpoint,
username=lakefs_access_key,
password=lakefs_secret_key
)
self.repo = repo
self.ref = ref
self.path = path
self.user_metadata = False
def set_path(self, path: str) -> None:
"""Set the path to load documents from."""
self.path = path
def set_ref(self, ref: str) -> None:
"""Set the ref to load documents from."""
self.ref = ref
def set_repo(self, repo: str) -> None:
"""Set the repository to load documents from."""
self.repo = repo
def set_user_metadata(self, user_metadata: bool) -> None:
"""Set whether to load user metadata."""
self.user_metadata = user_metadata
def load(self) -> List[Document]:
"""Load documents from lakeFS using presigned URLs if supported."""
self.__validate_instance()
objects = lakefs.repository(self.repo, client=self.client).ref(self.ref).objects(user_metadata=True, prefix=self.path)
documents = [
doc
for obj in objects # Iterate over ObjectInfo instances
for doc in UnstructuredLakeFSLoader(
obj.physical_address, # Extract physical_address
self.repo,
self.ref,
obj.path, # Extract path
user_metadata=obj.metadata, # Extract metadata
client=self.client,
).load()
]
return documents
def __validate_instance(self) -> None:
if self.repo is None or self.repo == "":
raise ValueError(
"no repository was provided. use `set_repo` to specify a repository"
)
if self.ref is None or self.ref == "":
raise ValueError("no ref was provided. use `set_ref` to specify a ref")
if self.path is None:
raise ValueError("no path was provided. use `set_path` to specify a path")
class UnstructuredLakeFSLoader(UnstructuredBaseLoader):
"""Load from `lakeFS` as unstructured data."""
def __init__(
self,
url: str,
repo: str,
ref: str = "main",
path: str = "",
presign: bool = True,
client: Optional[Client] = None,
# presign: bool = False,
user_metadata: Optional[dict[str,str]] = None,
**unstructured_kwargs: Any,
):
"""Initialize UnstructuredLakeFSLoader.
Args:
:param url:
:param repo:
:param ref:
:param path:
:param presign:
:param user_metadata:
:param lakefs_access_key:
:param lakefs_secret_key:
:param lakefs_endpoint:
"""
super().__init__(**unstructured_kwargs)
self.user_metadata = user_metadata
self.url = url
self.repo = repo
self.ref = ref
self.path = path
self.presign = presign
self.client = client
def _get_metadata(self) -> dict[str, any]:
metadata = {"repo": self.repo, "ref": self.ref, "path": self.path}
if self.user_metadata:
for key, value in self.user_metadata.items():
if key not in metadata:
metadata[key] = value
return metadata
def _get_elements(self) -> List:
local_prefix = "local://"
if self.url.startswith(local_prefix):
local_path = self.url[len(local_prefix):]
return partition(filename=local_path)
else:
with tempfile.TemporaryDirectory() as temp_dir:
file_path = f"{temp_dir}/{self.path.split('/')[-1]}"
os.makedirs(os.path.dirname(file_path), exist_ok=True)
obj = lakefs.repository(self.repo, client=self.client).ref(self.ref).object(self.path)
with open(file_path, mode="wb") as file:
file.write(obj.reader().read())
return partition(filename=file_path)