Skip to content

Commit 112226c

Browse files
authored
Fix(be): Avoid decompression race condition (#718)
1 parent f990376 commit 112226c

File tree

1 file changed

+17
-1
lines changed

1 file changed

+17
-1
lines changed

src/litdata/streaming/config.py

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,10 @@
1414
import logging
1515
import os
1616
from collections import defaultdict
17+
from time import sleep, time
1718
from typing import Any, Optional
1819

19-
from litdata.constants import _INDEX_FILENAME
20+
from litdata.constants import _INDEX_FILENAME, _MAX_WAIT_TIME
2021
from litdata.debugger import ChromeTraceColors, _get_log_msg
2122
from litdata.streaming.compression import _COMPRESSORS, Compressor
2223
from litdata.streaming.downloader import get_downloader
@@ -183,6 +184,21 @@ def try_decompress(self, local_chunkpath: str) -> None:
183184
if os.path.exists(target_local_chunkpath):
184185
return
185186

187+
# Ensure that the compressed file exists and is fully downloaded
188+
start_time = time()
189+
assert self._chunks is not None
190+
191+
filename = os.path.basename(local_chunkpath)
192+
chunk_index = self._get_chunk_index_from_filename(filename)
193+
chunk_bytes = self._chunks[chunk_index]["chunk_size"]
194+
exists = os.path.exists(local_chunkpath) and os.stat(local_chunkpath).st_size >= chunk_bytes
195+
while not exists:
196+
sleep(0.1)
197+
exists = os.path.exists(local_chunkpath) and os.stat(local_chunkpath).st_size >= chunk_bytes
198+
199+
if (time() - start_time) > _MAX_WAIT_TIME:
200+
raise FileNotFoundError(f"The {local_chunkpath} hasn't been found.")
201+
186202
with open(local_chunkpath, "rb") as f:
187203
data = f.read()
188204

0 commit comments

Comments
 (0)