Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
## 1.2.2

* **Fix**: prevent S3 path conflicts using tempfile for directory isolation

## 1.2.1

* **Fix**: Embeddings are properly assigned when embedding in batches
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"directory_structure": [
"s3_keys": [
"wiki_movie_plots_small.csv"
]
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"directory_structure": [
"s3_keys": [
"Why_is_the_sky_blue?.txt",
"[test]?*.txt"
]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"directory_structure": [
"s3_keys": [
"2023-Jan-economic-outlook.pdf",
"Silent-Giant-(1).pdf",
"page-with-formula.pdf",
Expand Down
23 changes: 18 additions & 5 deletions test/integration/connectors/utils/validation/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,11 +167,24 @@ def run_expected_download_files_validation(


def run_directory_structure_validation(expected_output_dir: Path, download_files: list[str]):
directory_record = expected_output_dir / "directory_structure.json"
with directory_record.open("r") as directory_file:
directory_file_contents = json.load(directory_file)
directory_structure = directory_file_contents["directory_structure"]
assert directory_structure == download_files
s3_keys_file = expected_output_dir / "expected_s3_keys.json"

if s3_keys_file.exists():
with s3_keys_file.open("r") as f:
s3_keys = json.load(f)["s3_keys"]

expected_filenames = {Path(s3_key).name for s3_key in s3_keys}
actual_filenames = {Path(download_file).name for download_file in download_files}

assert expected_filenames == actual_filenames, (
f"Expected filenames: {sorted(expected_filenames)}, "
f"Got filenames: {sorted(actual_filenames)}"
)
else:
directory_record = expected_output_dir / "directory_structure.json"
with directory_record.open("r") as f:
directory_structure = json.load(f)["directory_structure"]
assert directory_structure == download_files


def update_fixtures(
Expand Down
2 changes: 1 addition & 1 deletion unstructured_ingest/__version__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "1.2.1" # pragma: no cover
__version__ = "1.2.2" # pragma: no cover
2 changes: 2 additions & 0 deletions unstructured_ingest/interfaces/downloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,11 @@ class Downloader(BaseProcess, BaseConnector, ABC):
def get_download_path(self, file_data: FileData) -> Optional[Path]:
if not file_data.source_identifiers:
return None

rel_path = file_data.source_identifiers.relative_path
if not rel_path:
return None

rel_path = rel_path[1:] if rel_path.startswith("/") else rel_path
return self.download_dir / Path(rel_path)

Expand Down
19 changes: 19 additions & 0 deletions unstructured_ingest/processes/connectors/fsspec/fsspec.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,12 +264,31 @@ class FsspecDownloaderConfig(DownloaderConfig):

@dataclass
class FsspecDownloader(Downloader):
TEMP_DIR_PREFIX = "unstructured_"

protocol: str
connection_config: FsspecConnectionConfigT
connector_type: str = CONNECTOR_TYPE
download_config: Optional[FsspecDownloaderConfigT] = field(
default_factory=lambda: FsspecDownloaderConfig()
)

def get_download_path(self, file_data: FileData) -> Optional[Path]:
has_source_identifiers = file_data.source_identifiers is not None
has_filename = has_source_identifiers and file_data.source_identifiers.filename

if not (has_source_identifiers and has_filename):
return None

filename = file_data.source_identifiers.filename

mkdir_concurrent_safe(self.download_dir)

temp_dir = tempfile.mkdtemp(
prefix=self.TEMP_DIR_PREFIX,
dir=self.download_dir
)
return Path(temp_dir) / filename

def is_async(self) -> bool:
with self.connection_config.get_client(protocol=self.protocol) as client:
Expand Down