From 90e1d9ba4b26d039bfa1b870e693e73204201750 Mon Sep 17 00:00:00 2001 From: Zikang Han Date: Fri, 5 Dec 2025 00:59:34 +0000 Subject: [PATCH] initial offset: squashed commit --- .../spark/read/SparkMicroBatchStream.java | 63 ++++++++-- .../spark/read/SparkMicroBatchStreamTest.java | 113 ++++++++++++++---- 2 files changed, 143 insertions(+), 33 deletions(-) diff --git a/kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkMicroBatchStream.java b/kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkMicroBatchStream.java index 7cb882a6ff6..cb77a90487a 100644 --- a/kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkMicroBatchStream.java +++ b/kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkMicroBatchStream.java @@ -64,6 +64,16 @@ public class SparkMicroBatchStream implements MicroBatchStream, SupportsAdmissio private final boolean shouldValidateOffsets; private final SparkSession spark; + /** + * Tracks whether this is the first batch for this stream (no checkpointed offset). + * + *

- First batch: initialOffset() -> latestOffset(Offset, ReadLimit) - Set `isFirstBatch` to + * true in initialOffset() - in latestOffset(Offset, ReadLimit), use `isFirstBatch` to determine + * whether to return null vs previousOffset (when no data is available) - set `isFirstBatch` to + * false - Subsequent batches: latestOffset(Offset, ReadLimit) + */ + private boolean isFirstBatch = false; + public SparkMicroBatchStream( DeltaSnapshotManager snapshotManager, Snapshot snapshotAtSourceInit, @@ -98,10 +108,28 @@ public SparkMicroBatchStream( // offset // //////////// + /** + * Returns the initial offset for a streaming query to start reading from (if there's no + * checkpointed offset). + */ @Override public Offset initialOffset() { - // TODO(#5318): Implement initialOffset - throw new UnsupportedOperationException("initialOffset is not supported"); + Optional startingVersionOpt = getStartingVersion(); + long version; + boolean isInitialSnapshot; + isFirstBatch = true; + + if (startingVersionOpt.isPresent()) { + version = startingVersionOpt.get(); + isInitialSnapshot = false; + } else { + // TODO(#5318): Support initial snapshot case (isInitialSnapshot == true) + throw new UnsupportedOperationException( + "initialOffset with initial snapshot is not supported yet"); + } + + return DeltaSourceOffset.apply( + tableId, version, DeltaSourceOffset.BASE_INDEX(), isInitialSnapshot); } @Override @@ -113,30 +141,29 @@ public Offset latestOffset() { /** * Get the latest offset with rate limiting (SupportsAdmissionControl). * - * @param startOffset The starting offset (can be null if initialOffset() returned null) + * @param startOffset The starting offset * @param limit The read limit for rate limiting * @return The latest offset, or null if no data is available to read. */ @Override public Offset latestOffset(Offset startOffset, ReadLimit limit) { - // For the first batch, initialOffset() should be called before latestOffset(). - // if startOffset is null: no data is available to read. - if (startOffset == null) { - return null; - } + Objects.requireNonNull(startOffset, "startOffset should not be null for MicroBatchStream"); + Objects.requireNonNull(limit, "limit should not be null for MicroBatchStream"); + // TODO(#5318): init trigger available now support DeltaSourceOffset deltaStartOffset = DeltaSourceOffset.apply(tableId, startOffset); Optional limits = ScalaUtils.toJavaOptional(DeltaSource.AdmissionLimits$.MODULE$.apply(options, limit)); Optional endOffset = - getNextOffsetFromPreviousOffset(deltaStartOffset, limits); + getNextOffsetFromPreviousOffset(deltaStartOffset, limits, isFirstBatch); + isFirstBatch = false; if (shouldValidateOffsets && endOffset.isPresent()) { DeltaSourceOffset.validateOffsets(deltaStartOffset, endOffset.get()); } - // endOffset is null: no data is available to read for this batch. + // Return null when no data is available for this batch. return endOffset.orElse(null); } @@ -156,10 +183,15 @@ public ReadLimit getDefaultReadLimit() { * * @param previousOffset The previous offset * @param limits Rate limits for this batch (Optional.empty() for no limits) - * @return The next offset, or the previous offset if no new data is available + * @param isFirstBatch Whether this is the first batch for this stream + * @return The next offset, or the previous offset if no new data is available (except on the + * initial batch where we return empty to match DSv1's + * getStartingOffsetFromSpecificDeltaVersion behavior) */ private Optional getNextOffsetFromPreviousOffset( - DeltaSourceOffset previousOffset, Optional limits) { + DeltaSourceOffset previousOffset, + Optional limits, + boolean isFirstBatch) { // TODO(#5319): Special handling for schema tracking. CloseableIterator changes = @@ -172,6 +204,11 @@ private Optional getNextOffsetFromPreviousOffset( Optional lastFileChange = Utils.iteratorLast(changes); if (!lastFileChange.isPresent()) { + // For the first batch, return empty to match DSv1's + // getStartingOffsetFromSpecificDeltaVersion + if (isFirstBatch) { + return Optional.empty(); + } return Optional.of(previousOffset); } // TODO(#5318): Check read-incompatible schema changes during stream start @@ -221,6 +258,8 @@ public void stop() { * Extracts whether users provided the option to time travel a relation. If a query restarts from * a checkpoint and the checkpoint has recorded the offset, this method should never be called. * + *

Returns Optional.empty() if no starting version is provided. + * *

This is the DSv2 Kernel-based implementation of DeltaSource.getStartingVersion. */ Optional getStartingVersion() { diff --git a/kernel-spark/src/test/java/io/delta/kernel/spark/read/SparkMicroBatchStreamTest.java b/kernel-spark/src/test/java/io/delta/kernel/spark/read/SparkMicroBatchStreamTest.java index f48f5109974..70fce317e90 100644 --- a/kernel-spark/src/test/java/io/delta/kernel/spark/read/SparkMicroBatchStreamTest.java +++ b/kernel-spark/src/test/java/io/delta/kernel/spark/read/SparkMicroBatchStreamTest.java @@ -32,10 +32,12 @@ import java.util.stream.Stream; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.spark.sql.catalyst.expressions.Expression; import org.apache.spark.sql.connector.read.streaming.Offset; import org.apache.spark.sql.connector.read.streaming.ReadLimit; import org.apache.spark.sql.delta.DeltaLog; import org.apache.spark.sql.delta.DeltaOptions; +import org.apache.spark.sql.delta.Snapshot; import org.apache.spark.sql.delta.sources.DeltaSource; import org.apache.spark.sql.delta.sources.DeltaSourceOffset; import org.apache.spark.sql.delta.sources.ReadMaxBytes; @@ -46,7 +48,9 @@ import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; import scala.Option; +import scala.collection.JavaConverters; import scala.collection.immutable.Map$; +import scala.collection.immutable.Seq; public class SparkMicroBatchStreamTest extends SparkDsv2TestBase { @@ -101,7 +105,8 @@ public void testInitialOffset_throwsUnsupportedOperationException(@TempDir File SparkMicroBatchStream microBatchStream = createTestStream(tempDir); UnsupportedOperationException exception = assertThrows(UnsupportedOperationException.class, () -> microBatchStream.initialOffset()); - assertEquals("initialOffset is not supported", exception.getMessage()); + assertEquals( + "initialOffset with initial snapshot is not supported yet", exception.getMessage()); } @Test @@ -130,6 +135,73 @@ public void testStop_throwsUnsupportedOperationException(@TempDir File tempDir) assertEquals("stop is not supported", exception.getMessage()); } + // ================================================================================================ + // Tests for initialOffset parity between DSv1 and DSv2 + // ================================================================================================ + + @ParameterizedTest + @MethodSource("initialOffsetParameters") + public void testInitialOffset_FirstBatchParity( + String startingVersion, + ReadLimitConfig limitConfig, + String testDescription, + @TempDir File tempDir) + throws Exception { + String testTablePath = tempDir.getAbsolutePath(); + String testTableName = "test_initial_" + System.nanoTime(); + createEmptyTestTable(testTablePath, testTableName); + insertVersions( + testTableName, + /* numVersions= */ 5, + /* rowsPerVersion= */ 10, + /* includeEmptyVersion= */ false); + + DeltaLog deltaLog = DeltaLog.forTable(spark, new Path(testTablePath)); + ReadLimit readLimit = limitConfig.toReadLimit(); + DeltaOptions options; + if (startingVersion == null) { + options = new DeltaOptions(Map$.MODULE$.empty(), spark.sessionState().conf()); + } else { + scala.collection.immutable.Map scalaMap = + Map$.MODULE$.empty().updated("startingVersion", startingVersion); + options = new DeltaOptions(scalaMap, spark.sessionState().conf()); + } + + // DSv1 + DeltaSource deltaSource = createDeltaSource(deltaLog, testTablePath, options); + // DSv1 sources don't have an initialOffset() method. + // Batch 0 is called with startOffset=null. + Offset dsv1Offset = deltaSource.latestOffset(/* startOffset= */ null, readLimit); + + // DSv2 + Configuration hadoopConf = new Configuration(); + PathBasedSnapshotManager snapshotManager = + new PathBasedSnapshotManager(testTablePath, hadoopConf); + io.delta.kernel.Snapshot snapshotAtSourceInit = snapshotManager.loadLatestSnapshot(); + SparkMicroBatchStream stream = + new SparkMicroBatchStream( + snapshotManager, snapshotAtSourceInit, hadoopConf, spark, options); + Offset initialOffset = stream.initialOffset(); + Offset dsv2Offset = stream.latestOffset(initialOffset, readLimit); + + compareOffsets(dsv1Offset, dsv2Offset, testDescription); + } + + /** Provides test parameters for the initialOffset parity test. */ + private static Stream initialOffsetParameters() { + return Stream.of( + Arguments.of("0", ReadLimitConfig.noLimit(), "NoLimit1"), + Arguments.of("1", ReadLimitConfig.noLimit(), "NoLimit2"), + Arguments.of("3", ReadLimitConfig.noLimit(), "NoLimit3"), + Arguments.of("latest", ReadLimitConfig.noLimit(), "LatestNoLimit"), + Arguments.of("latest", ReadLimitConfig.maxFiles(1000), "LatestMaxFiles"), + Arguments.of("latest", ReadLimitConfig.maxBytes(1000), "LatestMaxBytes"), + Arguments.of("0", ReadLimitConfig.maxFiles(5), "MaxFiles1"), + Arguments.of("1", ReadLimitConfig.maxFiles(10), "MaxFiles2"), + Arguments.of("0", ReadLimitConfig.maxBytes(1000), "MaxBytes1"), + Arguments.of("1", ReadLimitConfig.maxBytes(2000), "MaxBytes2")); + } + // ================================================================================================ // Tests for getFileChanges parity between DSv1 and DSv2 // ================================================================================================ @@ -1166,26 +1238,6 @@ private Optional createAdmissionLimits( return Optional.of(new DeltaSource.AdmissionLimits(options, scalaMaxFiles, scalaMaxBytes)); } - /** Helper method to create a DeltaSource instance for testing. */ - private DeltaSource createDeltaSource(DeltaLog deltaLog, String tablePath) { - DeltaOptions options = new DeltaOptions(Map$.MODULE$.empty(), spark.sessionState().conf()); - scala.collection.immutable.Seq emptySeq = - scala.collection.JavaConverters.asScalaBuffer( - new java.util.ArrayList()) - .toList(); - org.apache.spark.sql.delta.Snapshot snapshot = - deltaLog.update(false, scala.Option.empty(), scala.Option.empty()); - return new DeltaSource( - spark, - deltaLog, - /* catalogTableOpt= */ scala.Option.empty(), - options, - /* snapshotAtSourceInit= */ snapshot, - /* metadataPath= */ tablePath + "/_checkpoint", - /* metadataTrackingLog= */ scala.Option.empty(), - /* filters= */ emptySeq); - } - /** Helper method to format a DSv1 IndexedFile for debugging. */ private String formatIndexedFile(org.apache.spark.sql.delta.sources.IndexedFile file) { return String.format( @@ -1244,4 +1296,23 @@ private void compareOffsetSequence( String.format("%s (iteration %d)", testDescription, i)); } } + + private DeltaSource createDeltaSource(DeltaLog deltaLog, String tablePath) { + DeltaOptions options = new DeltaOptions(Map$.MODULE$.empty(), spark.sessionState().conf()); + return createDeltaSource(deltaLog, tablePath, options); + } + + private DeltaSource createDeltaSource(DeltaLog deltaLog, String tablePath, DeltaOptions options) { + Seq emptySeq = JavaConverters.asScalaBuffer(new ArrayList()).toList(); + Snapshot snapshot = deltaLog.update(false, Option.empty(), Option.empty()); + return new DeltaSource( + spark, + deltaLog, + /* catalogTableOpt= */ Option.empty(), + options, + /* snapshotAtSourceInit= */ snapshot, + /* metadataPath= */ tablePath + "/_checkpoint", + /* metadataTrackingLog= */ Option.empty(), + /* filters= */ emptySeq); + } }