Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,16 @@ public class SparkMicroBatchStream implements MicroBatchStream, SupportsAdmissio
private final boolean shouldValidateOffsets;
private final SparkSession spark;

/**
* Tracks whether this is the first batch for this stream (no checkpointed offset).
*
* <p>- First batch: initialOffset() -> latestOffset(Offset, ReadLimit) - Set `isFirstBatch` to
* true in initialOffset() - in latestOffset(Offset, ReadLimit), use `isFirstBatch` to determine
* whether to return null vs previousOffset (when no data is available) - set `isFirstBatch` to
* false - Subsequent batches: latestOffset(Offset, ReadLimit)
*/
private boolean isFirstBatch = false;

public SparkMicroBatchStream(
DeltaSnapshotManager snapshotManager,
Snapshot snapshotAtSourceInit,
Expand Down Expand Up @@ -98,10 +108,28 @@ public SparkMicroBatchStream(
// offset //
////////////

/**
* Returns the initial offset for a streaming query to start reading from (if there's no
* checkpointed offset).
*/
@Override
public Offset initialOffset() {
// TODO(#5318): Implement initialOffset
throw new UnsupportedOperationException("initialOffset is not supported");
Optional<Long> startingVersionOpt = getStartingVersion();
long version;
boolean isInitialSnapshot;
isFirstBatch = true;

if (startingVersionOpt.isPresent()) {
version = startingVersionOpt.get();
isInitialSnapshot = false;
} else {
// TODO(#5318): Support initial snapshot case (isInitialSnapshot == true)
throw new UnsupportedOperationException(
"initialOffset with initial snapshot is not supported yet");
}

return DeltaSourceOffset.apply(
tableId, version, DeltaSourceOffset.BASE_INDEX(), isInitialSnapshot);
}

@Override
Expand All @@ -113,30 +141,29 @@ public Offset latestOffset() {
/**
* Get the latest offset with rate limiting (SupportsAdmissionControl).
*
* @param startOffset The starting offset (can be null if initialOffset() returned null)
* @param startOffset The starting offset
* @param limit The read limit for rate limiting
* @return The latest offset, or null if no data is available to read.
*/
@Override
public Offset latestOffset(Offset startOffset, ReadLimit limit) {
// For the first batch, initialOffset() should be called before latestOffset().
// if startOffset is null: no data is available to read.
if (startOffset == null) {
return null;
}
Objects.requireNonNull(startOffset, "startOffset should not be null for MicroBatchStream");
Objects.requireNonNull(limit, "limit should not be null for MicroBatchStream");

// TODO(#5318): init trigger available now support

DeltaSourceOffset deltaStartOffset = DeltaSourceOffset.apply(tableId, startOffset);
Optional<DeltaSource.AdmissionLimits> limits =
ScalaUtils.toJavaOptional(DeltaSource.AdmissionLimits$.MODULE$.apply(options, limit));
Optional<DeltaSourceOffset> endOffset =
getNextOffsetFromPreviousOffset(deltaStartOffset, limits);
getNextOffsetFromPreviousOffset(deltaStartOffset, limits, isFirstBatch);
isFirstBatch = false;

if (shouldValidateOffsets && endOffset.isPresent()) {
DeltaSourceOffset.validateOffsets(deltaStartOffset, endOffset.get());
}

// endOffset is null: no data is available to read for this batch.
// Return null when no data is available for this batch.
return endOffset.orElse(null);
}

Expand All @@ -156,10 +183,15 @@ public ReadLimit getDefaultReadLimit() {
*
* @param previousOffset The previous offset
* @param limits Rate limits for this batch (Optional.empty() for no limits)
* @return The next offset, or the previous offset if no new data is available
* @param isFirstBatch Whether this is the first batch for this stream
* @return The next offset, or the previous offset if no new data is available (except on the
* initial batch where we return empty to match DSv1's
* getStartingOffsetFromSpecificDeltaVersion behavior)
*/
private Optional<DeltaSourceOffset> getNextOffsetFromPreviousOffset(
DeltaSourceOffset previousOffset, Optional<DeltaSource.AdmissionLimits> limits) {
DeltaSourceOffset previousOffset,
Optional<DeltaSource.AdmissionLimits> limits,
boolean isFirstBatch) {
// TODO(#5319): Special handling for schema tracking.

CloseableIterator<IndexedFile> changes =
Expand All @@ -172,6 +204,11 @@ private Optional<DeltaSourceOffset> getNextOffsetFromPreviousOffset(
Optional<IndexedFile> lastFileChange = Utils.iteratorLast(changes);

if (!lastFileChange.isPresent()) {
// For the first batch, return empty to match DSv1's
// getStartingOffsetFromSpecificDeltaVersion
if (isFirstBatch) {
return Optional.empty();
}
return Optional.of(previousOffset);
}
// TODO(#5318): Check read-incompatible schema changes during stream start
Expand Down Expand Up @@ -221,6 +258,8 @@ public void stop() {
* Extracts whether users provided the option to time travel a relation. If a query restarts from
* a checkpoint and the checkpoint has recorded the offset, this method should never be called.
*
* <p>Returns Optional.empty() if no starting version is provided.
*
* <p>This is the DSv2 Kernel-based implementation of DeltaSource.getStartingVersion.
*/
Optional<Long> getStartingVersion() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,12 @@
import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.spark.sql.catalyst.expressions.Expression;
import org.apache.spark.sql.connector.read.streaming.Offset;
import org.apache.spark.sql.connector.read.streaming.ReadLimit;
import org.apache.spark.sql.delta.DeltaLog;
import org.apache.spark.sql.delta.DeltaOptions;
import org.apache.spark.sql.delta.Snapshot;
import org.apache.spark.sql.delta.sources.DeltaSource;
import org.apache.spark.sql.delta.sources.DeltaSourceOffset;
import org.apache.spark.sql.delta.sources.ReadMaxBytes;
Expand All @@ -46,7 +48,9 @@
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import scala.Option;
import scala.collection.JavaConverters;
import scala.collection.immutable.Map$;
import scala.collection.immutable.Seq;

public class SparkMicroBatchStreamTest extends SparkDsv2TestBase {

Expand Down Expand Up @@ -101,7 +105,8 @@ public void testInitialOffset_throwsUnsupportedOperationException(@TempDir File
SparkMicroBatchStream microBatchStream = createTestStream(tempDir);
UnsupportedOperationException exception =
assertThrows(UnsupportedOperationException.class, () -> microBatchStream.initialOffset());
assertEquals("initialOffset is not supported", exception.getMessage());
assertEquals(
"initialOffset with initial snapshot is not supported yet", exception.getMessage());
}

@Test
Expand Down Expand Up @@ -130,6 +135,73 @@ public void testStop_throwsUnsupportedOperationException(@TempDir File tempDir)
assertEquals("stop is not supported", exception.getMessage());
}

// ================================================================================================
// Tests for initialOffset parity between DSv1 and DSv2
// ================================================================================================

@ParameterizedTest
@MethodSource("initialOffsetParameters")
public void testInitialOffset_FirstBatchParity(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this the write way to test this stuff?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

basically... there are other existing tests that already provide test coverage for this.. right? is it that we are unable to run those tests because the streaming source v2 is incomplete?

Copy link
Collaborator Author

@zikangh zikangh Dec 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we have a lot of end-to-end tests that we can enable once we have all the requisite pieces. This is a future PR that enables some of these tests: https://github.com/delta-io/delta/pull/5572/files/154897c75c21697300bd31e851b04147339ce466..f6980981137c5943fc590f0b46c70557adb4d161#diff-5b8b5b3f181cbc43ecdeffe4e814641b3e78801fb46d6aaf74a6b2928ba64791

String startingVersion,
ReadLimitConfig limitConfig,
String testDescription,
@TempDir File tempDir)
throws Exception {
String testTablePath = tempDir.getAbsolutePath();
String testTableName = "test_initial_" + System.nanoTime();
createEmptyTestTable(testTablePath, testTableName);
insertVersions(
testTableName,
/* numVersions= */ 5,
/* rowsPerVersion= */ 10,
/* includeEmptyVersion= */ false);

DeltaLog deltaLog = DeltaLog.forTable(spark, new Path(testTablePath));
ReadLimit readLimit = limitConfig.toReadLimit();
DeltaOptions options;
if (startingVersion == null) {
options = new DeltaOptions(Map$.MODULE$.empty(), spark.sessionState().conf());
} else {
scala.collection.immutable.Map<String, String> scalaMap =
Map$.MODULE$.<String, String>empty().updated("startingVersion", startingVersion);
options = new DeltaOptions(scalaMap, spark.sessionState().conf());
}

// DSv1
DeltaSource deltaSource = createDeltaSource(deltaLog, testTablePath, options);
// DSv1 sources don't have an initialOffset() method.
// Batch 0 is called with startOffset=null.
Offset dsv1Offset = deltaSource.latestOffset(/* startOffset= */ null, readLimit);

// DSv2
Configuration hadoopConf = new Configuration();
PathBasedSnapshotManager snapshotManager =
new PathBasedSnapshotManager(testTablePath, hadoopConf);
io.delta.kernel.Snapshot snapshotAtSourceInit = snapshotManager.loadLatestSnapshot();
SparkMicroBatchStream stream =
new SparkMicroBatchStream(
snapshotManager, snapshotAtSourceInit, hadoopConf, spark, options);
Offset initialOffset = stream.initialOffset();
Offset dsv2Offset = stream.latestOffset(initialOffset, readLimit);

compareOffsets(dsv1Offset, dsv2Offset, testDescription);
}

/** Provides test parameters for the initialOffset parity test. */
private static Stream<Arguments> initialOffsetParameters() {
return Stream.of(
Arguments.of("0", ReadLimitConfig.noLimit(), "NoLimit1"),
Arguments.of("1", ReadLimitConfig.noLimit(), "NoLimit2"),
Arguments.of("3", ReadLimitConfig.noLimit(), "NoLimit3"),
Arguments.of("latest", ReadLimitConfig.noLimit(), "LatestNoLimit"),
Arguments.of("latest", ReadLimitConfig.maxFiles(1000), "LatestMaxFiles"),
Arguments.of("latest", ReadLimitConfig.maxBytes(1000), "LatestMaxBytes"),
Arguments.of("0", ReadLimitConfig.maxFiles(5), "MaxFiles1"),
Arguments.of("1", ReadLimitConfig.maxFiles(10), "MaxFiles2"),
Arguments.of("0", ReadLimitConfig.maxBytes(1000), "MaxBytes1"),
Arguments.of("1", ReadLimitConfig.maxBytes(2000), "MaxBytes2"));
}

// ================================================================================================
// Tests for getFileChanges parity between DSv1 and DSv2
// ================================================================================================
Expand Down Expand Up @@ -1166,26 +1238,6 @@ private Optional<DeltaSource.AdmissionLimits> createAdmissionLimits(
return Optional.of(new DeltaSource.AdmissionLimits(options, scalaMaxFiles, scalaMaxBytes));
}

/** Helper method to create a DeltaSource instance for testing. */
private DeltaSource createDeltaSource(DeltaLog deltaLog, String tablePath) {
DeltaOptions options = new DeltaOptions(Map$.MODULE$.empty(), spark.sessionState().conf());
scala.collection.immutable.Seq<org.apache.spark.sql.catalyst.expressions.Expression> emptySeq =
scala.collection.JavaConverters.asScalaBuffer(
new java.util.ArrayList<org.apache.spark.sql.catalyst.expressions.Expression>())
.toList();
org.apache.spark.sql.delta.Snapshot snapshot =
deltaLog.update(false, scala.Option.empty(), scala.Option.empty());
return new DeltaSource(
spark,
deltaLog,
/* catalogTableOpt= */ scala.Option.empty(),
options,
/* snapshotAtSourceInit= */ snapshot,
/* metadataPath= */ tablePath + "/_checkpoint",
/* metadataTrackingLog= */ scala.Option.empty(),
/* filters= */ emptySeq);
}

/** Helper method to format a DSv1 IndexedFile for debugging. */
private String formatIndexedFile(org.apache.spark.sql.delta.sources.IndexedFile file) {
return String.format(
Expand Down Expand Up @@ -1244,4 +1296,23 @@ private void compareOffsetSequence(
String.format("%s (iteration %d)", testDescription, i));
}
}

private DeltaSource createDeltaSource(DeltaLog deltaLog, String tablePath) {
DeltaOptions options = new DeltaOptions(Map$.MODULE$.empty(), spark.sessionState().conf());
return createDeltaSource(deltaLog, tablePath, options);
}

private DeltaSource createDeltaSource(DeltaLog deltaLog, String tablePath, DeltaOptions options) {
Seq<Expression> emptySeq = JavaConverters.asScalaBuffer(new ArrayList<Expression>()).toList();
Snapshot snapshot = deltaLog.update(false, Option.empty(), Option.empty());
return new DeltaSource(
spark,
deltaLog,
/* catalogTableOpt= */ Option.empty(),
options,
/* snapshotAtSourceInit= */ snapshot,
/* metadataPath= */ tablePath + "/_checkpoint",
/* metadataTrackingLog= */ Option.empty(),
/* filters= */ emptySeq);
}
}
Loading