Skip to content

Commit bf0efa8

Browse files
committed
initial snapshot
1 parent 45cb033 commit bf0efa8

File tree

4 files changed

+260
-56
lines changed

4 files changed

+260
-56
lines changed

kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkMicroBatchStream.java

Lines changed: 80 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,11 @@
1717

1818
import io.delta.kernel.CommitActions;
1919
import io.delta.kernel.CommitRange;
20+
import io.delta.kernel.Scan;
2021
import io.delta.kernel.Snapshot;
2122
import io.delta.kernel.data.ColumnarBatch;
23+
import io.delta.kernel.data.FilteredColumnarBatch;
24+
import io.delta.kernel.data.Row;
2225
import io.delta.kernel.defaults.engine.DefaultEngine;
2326
import io.delta.kernel.engine.Engine;
2427
import io.delta.kernel.exceptions.UnsupportedTableFeatureException;
@@ -35,6 +38,7 @@
3538
import java.io.IOException;
3639
import java.time.ZoneId;
3740
import java.util.*;
41+
import java.util.Comparator;
3842
import org.apache.hadoop.conf.Configuration;
3943
import org.apache.spark.sql.SparkSession;
4044
import org.apache.spark.sql.connector.read.InputPartition;
@@ -154,9 +158,10 @@ public Offset initialOffset() {
154158
version = startingVersionOpt.get();
155159
isInitialSnapshot = false;
156160
} else {
157-
// TODO(#5318): Support initial snapshot case (isInitialSnapshot == true)
158-
throw new UnsupportedOperationException(
159-
"initialOffset with initial snapshot is not supported yet");
161+
// No starting version - create initial snapshot at latest version
162+
Snapshot latestSnapshot = snapshotManager.loadLatestSnapshot();
163+
version = latestSnapshot.getVersion();
164+
isInitialSnapshot = true;
160165
}
161166

162167
return DeltaSourceOffset.apply(
@@ -468,8 +473,16 @@ CloseableIterator<IndexedFile> getFileChanges(
468473
CloseableIterator<IndexedFile> result;
469474

470475
if (isInitialSnapshot) {
471-
// TODO(#5318): Implement initial snapshot
472-
throw new UnsupportedOperationException("initial snapshot is not supported yet");
476+
CloseableIterator<IndexedFile> snapshotFiles = getSnapshotFiles(fromVersion);
477+
long latestVersion = snapshotAtSourceInit.getVersion();
478+
if (latestVersion > fromVersion) {
479+
// Start reading delta logs from fromVersion + 1 to avoid duplicating snapshot files
480+
CloseableIterator<IndexedFile> deltaChanges = filterDeltaLogs(fromVersion + 1, endOffset);
481+
// Lazily combine snapshot files and delta changes
482+
result = snapshotFiles.combine(deltaChanges);
483+
} else {
484+
result = snapshotFiles;
485+
}
473486
} else {
474487
result = filterDeltaLogs(fromVersion, endOffset);
475488
}
@@ -651,4 +664,66 @@ private long extractIndexedFilesFromBatch(
651664

652665
return index;
653666
}
667+
668+
/**
669+
* Get all files from a snapshot at the specified version, sorted by modificationTime and path,
670+
* with indices assigned sequentially, and wrapped with BEGIN/END sentinels.
671+
*
672+
* <p>Mimics DeltaSourceSnapshot in DSv1.
673+
*
674+
* @param version The snapshot version to read
675+
* @return An iterator of IndexedFile representing the snapshot files
676+
*/
677+
private CloseableIterator<IndexedFile> getSnapshotFiles(long version) {
678+
// Load snapshot at the specified version
679+
Snapshot snapshot = snapshotManager.loadSnapshotAt(version);
680+
681+
// Build scan to get all files
682+
Scan scan = snapshot.getScanBuilder().build();
683+
684+
// Collect all AddFile actions
685+
List<AddFile> addFiles = new ArrayList<>();
686+
try (CloseableIterator<FilteredColumnarBatch> filesIter = scan.getScanFiles(engine)) {
687+
while (filesIter.hasNext()) {
688+
FilteredColumnarBatch batch = filesIter.next();
689+
try (CloseableIterator<Row> rowIter = batch.getRows()) {
690+
while (rowIter.hasNext()) {
691+
Row fileRow = rowIter.next();
692+
// Extract AddFile from the "add" column (index 0)
693+
if (!fileRow.isNullAt(0)) {
694+
Row addFileRow = fileRow.getStruct(0);
695+
AddFile addFile = new AddFile(addFileRow);
696+
// Only include files with dataChange=true
697+
if (addFile.getDataChange()) {
698+
addFiles.add(addFile);
699+
}
700+
}
701+
}
702+
}
703+
}
704+
} catch (IOException e) {
705+
throw new RuntimeException(
706+
String.format("Failed to read snapshot files at version %d", version), e);
707+
}
708+
709+
// CRITICAL: Sort by modificationTime, then path for deterministic ordering
710+
addFiles.sort(
711+
Comparator.comparing(AddFile::getModificationTime).thenComparing(AddFile::getPath));
712+
713+
// Build IndexedFile list with sentinels
714+
List<IndexedFile> indexedFiles = new ArrayList<>();
715+
716+
// Add BEGIN sentinel
717+
indexedFiles.add(new IndexedFile(version, DeltaSourceOffset.BASE_INDEX(), null));
718+
719+
// Add data files with sequential indices starting from 0
720+
for (int i = 0; i < addFiles.size(); i++) {
721+
indexedFiles.add(new IndexedFile(version, i, addFiles.get(i)));
722+
}
723+
724+
// Add END sentinel
725+
indexedFiles.add(new IndexedFile(version, DeltaSourceOffset.END_INDEX(), null));
726+
727+
return Utils.toCloseableIterator(indexedFiles.iterator());
728+
}
654729
}

kernel-spark/src/test/java/io/delta/kernel/spark/Dsv2BasicTest.java

Lines changed: 27 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -247,45 +247,43 @@ public void testStreamingReadWithRateLimit(@TempDir File deltaTablePath) throws
247247
}
248248

249249
@Test
250-
public void testStreamingReadWithoutStartingVersionThrowsException(@TempDir File deltaTablePath) {
250+
public void testStreamingReadWithoutStartingVersion(@TempDir File deltaTablePath)
251+
throws Exception {
251252
String tablePath = deltaTablePath.getAbsolutePath();
252253

253254
// Write initial data
254255
List<Row> initialRows =
255256
Arrays.asList(RowFactory.create(1, "Alice", 100.0), RowFactory.create(2, "Bob", 200.0));
256257
writeInitialData(tablePath, initialRows);
257258

258-
// Try to create streaming DataFrame without startingVersion using DSv2 path
259+
// Create streaming DataFrame without startingVersion using DSv2 path
259260
// Using dsv2.delta.`path` syntax to force DSv2 (SparkMicroBatchStream) instead of DSv1
260261
String dsv2TableRef = String.format("dsv2.delta.`%s`", tablePath);
261262

262-
// Should throw UnsupportedOperationException when trying to process
263-
org.apache.spark.sql.streaming.StreamingQueryException exception =
264-
assertThrows(
265-
org.apache.spark.sql.streaming.StreamingQueryException.class,
266-
() -> {
267-
StreamingQuery query =
268-
spark
269-
.readStream()
270-
.table(dsv2TableRef)
271-
.writeStream()
272-
.format("memory")
273-
.queryName("test_no_starting_version")
274-
.outputMode("append")
275-
.start();
276-
query.processAllAvailable();
277-
query.stop();
278-
});
279-
280-
// Verify the root cause is UnsupportedOperationException
281-
Throwable rootCause = exception.getCause();
282-
assertTrue(
283-
rootCause instanceof UnsupportedOperationException,
284-
"Root cause should be UnsupportedOperationException, but was: "
285-
+ (rootCause != null ? rootCause.getClass().getName() : "null"));
286-
assertTrue(
287-
rootCause.getMessage().contains("is not supported"),
288-
"Exception message should indicate operation is not supported: " + rootCause.getMessage());
263+
StreamingQuery query = null;
264+
try {
265+
query =
266+
spark
267+
.readStream()
268+
.table(dsv2TableRef) // No startingVersion option
269+
.writeStream()
270+
.format("memory")
271+
.queryName("test_no_starting_version")
272+
.outputMode("append")
273+
.start();
274+
275+
query.processAllAvailable();
276+
277+
// Should successfully read all data from initial snapshot
278+
Dataset<Row> results = spark.sql("SELECT * FROM test_no_starting_version");
279+
List<Row> actualRows = results.collectAsList();
280+
281+
assertStreamingDataEquals(actualRows, initialRows);
282+
} finally {
283+
if (query != null) {
284+
query.stop();
285+
}
286+
}
289287
}
290288

291289
//////////////////////

0 commit comments

Comments
 (0)