Skip to content

Commit f515063

Browse files
committed
Add support for archive files
1 parent f38a8d2 commit f515063

File tree

4 files changed

+173
-2
lines changed

4 files changed

+173
-2
lines changed
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
name: Archive
2+
slug: archive
3+
description: Read data from an archive
4+
pipeline:
5+
source:
6+
slug: archive
7+
provider_slug: promptly
8+
transformations:
9+
- slug: splitter
10+
provider_slug: unstructured
11+
embedding:
12+
slug: embeddings-generator
13+
provider_slug: promptly
14+
destination:
15+
slug: vector-store
16+
provider_slug: promptly

llmstack/data/apis.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,12 @@ def templates(self, request):
7575
return DRFResponse(templates_data)
7676

7777
def sources(self, request):
78-
from llmstack.data.sources import FileSchema, TextSchema, URLSchema
78+
from llmstack.data.sources import (
79+
ArchiveFileSchema,
80+
FileSchema,
81+
TextSchema,
82+
URLSchema,
83+
)
7984

8085
return DRFResponse(
8186
[
@@ -97,6 +102,12 @@ def sources(self, request):
97102
"schema": URLSchema.get_schema(),
98103
"ui_schema": URLSchema.get_ui_schema(),
99104
},
105+
{
106+
"slug": ArchiveFileSchema.slug(),
107+
"provider_slug": ArchiveFileSchema.provider_slug(),
108+
"schema": ArchiveFileSchema.get_schema(),
109+
"ui_schema": ArchiveFileSchema.get_ui_schema(),
110+
},
100111
]
101112
)
102113

llmstack/data/sources/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from functools import cache
22

3+
from llmstack.data.sources.files.archive import ArchiveFileSchema
34
from llmstack.data.sources.files.csv import CSVFileSchema
45
from llmstack.data.sources.files.file import FileSchema
56
from llmstack.data.sources.files.pdf import PdfSchema
@@ -11,7 +12,7 @@
1112

1213
@cache
1314
def get_source_cls(slug, provider_slug):
14-
for cls in [CSVFileSchema, FileSchema, PdfSchema, URLSchema, TextSchema]:
15+
for cls in [CSVFileSchema, FileSchema, PdfSchema, URLSchema, TextSchema, ArchiveFileSchema]:
1516
if cls.slug() == slug and cls.provider_slug() == provider_slug:
1617
return cls
1718

Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
import base64
2+
import io
3+
import logging
4+
import mimetypes
5+
import tarfile
6+
import uuid
7+
import zipfile
8+
from typing import List
9+
10+
from pydantic import Field
11+
12+
from llmstack.common.utils.text_extract import extract_text_elements
13+
from llmstack.common.utils.utils import validate_parse_data_uri
14+
from llmstack.data.sources.base import BaseSource, DataDocument
15+
from llmstack.data.sources.utils import (
16+
create_source_document_asset,
17+
get_source_document_asset_by_objref,
18+
)
19+
20+
logger = logging.getLogger(__name__)
21+
22+
23+
def extract_archive_files(mime_type, file_name, file_data):
24+
extracted_files = []
25+
if mime_type == "application/zip":
26+
with zipfile.ZipFile(io.BytesIO(base64.b64decode(file_data))) as archive:
27+
for file_info in archive.infolist():
28+
if file_info.is_dir() or file_info.file_size == 0 or file_info.filename.startswith("__MACOSX"):
29+
continue
30+
with archive.open(file_info) as file:
31+
file_mime_type = mimetypes.guess_type(file_info.filename)[0]
32+
data_uri = f"data:{file_mime_type};name={file_info.filename};base64,{base64.b64encode(file.read()).decode()}"
33+
extracted_files.append(data_uri)
34+
elif mime_type in ["application/x-tar", "application/gzip", "application/x-bzip2"]:
35+
with tarfile.open(fileobj=io.BytesIO(file_data), mode="r:*") as archive:
36+
for member in archive.getmembers():
37+
if member.isfile():
38+
file = archive.extractfile(member)
39+
file_mime_type = mimetypes.guess_type(member.name)[0]
40+
data_uri = (
41+
f"data:{file_mime_type};name={member.name};base64,{base64.b64encode(file.read()).decode()}"
42+
)
43+
extracted_files.append(data_uri)
44+
else:
45+
logger.warning(f"Unsupported archive mime type: {mime_type}")
46+
return extracted_files
47+
48+
49+
class ArchiveFileSchema(BaseSource):
50+
file: str = Field(
51+
description="File to be processed",
52+
json_schema_extra={
53+
"advanced_parameter": False,
54+
"widget": "file",
55+
"maxSize": 25000000,
56+
"maxFiles": 1,
57+
"accepts": {
58+
"application/zip": [],
59+
},
60+
},
61+
)
62+
split_files: bool = Field(
63+
default=False,
64+
description="Split the archive into individual files",
65+
json_schema_extra={"advanced_parameter": True},
66+
)
67+
68+
@classmethod
69+
def slug(cls):
70+
return "archive"
71+
72+
@classmethod
73+
def provider_slug(cls):
74+
return "promptly"
75+
76+
def get_data_documents(self, **kwargs) -> List[DataDocument]:
77+
if self.split_files:
78+
files = extract_archive_files(*validate_parse_data_uri(self.file))
79+
else:
80+
files = [self.file]
81+
82+
documents = []
83+
for file in files:
84+
file_id = str(uuid.uuid4())
85+
mime_type, file_name, file_data = validate_parse_data_uri(file)
86+
file_objref = create_source_document_asset(
87+
file, datasource_uuid=kwargs["datasource_uuid"], document_id=file_id
88+
)
89+
documents.append(
90+
DataDocument(
91+
id_=file_id,
92+
name=file_name,
93+
content=file_objref,
94+
mimetype=mime_type,
95+
metadata={
96+
"file_name": file_name,
97+
"mime_type": mime_type,
98+
"source": file_name,
99+
"datasource_uuid": kwargs["datasource_uuid"],
100+
},
101+
datasource_uuid=kwargs["datasource_uuid"],
102+
extra_info={"extra_data": self.get_extra_data()},
103+
)
104+
)
105+
return documents
106+
107+
@classmethod
108+
def process_document(cls, document: DataDocument) -> DataDocument:
109+
data_uri = get_source_document_asset_by_objref(document.content)
110+
mime_type, file_name, file_data = validate_parse_data_uri(data_uri)
111+
112+
if mime_type == "application/zip":
113+
extracted_files = extract_archive_files(mime_type, file_name, file_data)
114+
elements = []
115+
text_content = ""
116+
for extracted_file in extracted_files:
117+
mime_type, file_name, extracted_file_data = validate_parse_data_uri(extracted_file)
118+
text_content += f"File: {file_name}\n"
119+
decoded_file_data = base64.b64decode(extracted_file_data)
120+
elements += extract_text_elements(
121+
mime_type=mime_type,
122+
data=decoded_file_data,
123+
file_name=file_name,
124+
extra_params=None,
125+
)
126+
text_content += "".join([element.text for element in elements])
127+
text_content += "\n\n"
128+
else:
129+
decoded_file_data = base64.b64decode(file_data)
130+
elements = extract_text_elements(
131+
mime_type=mime_type, data=decoded_file_data, file_name=file_name, extra_params=None
132+
)
133+
text_content = "".join([element.text for element in elements])
134+
135+
text_data_uri = (
136+
f"data:text/plain;name={document.id_}_text.txt;base64,{base64.b64encode(text_content.encode()).decode()}"
137+
)
138+
text_file_objref = create_source_document_asset(
139+
text_data_uri,
140+
datasource_uuid=document.metadata["datasource_uuid"],
141+
document_id=str(uuid.uuid4()),
142+
)
143+
return document.model_copy(update={"text": text_content, "text_objref": text_file_objref})

0 commit comments

Comments
 (0)