Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ public class SparkMicroBatchStream implements MicroBatchStream, SupportsAdmissio
*/
private boolean isFirstBatch = false;

// Cached starting version to ensure idempotent behavior for "latest" starting version.
// getStartingVersion() must return the same value across multiple calls.
private volatile Optional<Long> cachedStartingVersion = null;

public SparkMicroBatchStream(
DeltaSnapshotManager snapshotManager,
Snapshot snapshotAtSourceInit,
Expand Down Expand Up @@ -334,15 +338,20 @@ public void stop() {
*
* <p>This is the DSv2 Kernel-based implementation of DeltaSource.getStartingVersion.
*/
Optional<Long> getStartingVersion() {
synchronized Optional<Long> getStartingVersion() {
if (cachedStartingVersion != null) {
return cachedStartingVersion;
}

// TODO(#5319): DeltaSource.scala uses `allowOutOfRange` parameter from
// DeltaSQLConf.DELTA_CDF_ALLOW_OUT_OF_RANGE_TIMESTAMP.
if (options.startingVersion().isDefined()) {
DeltaStartingVersion startingVersion = options.startingVersion().get();
if (startingVersion instanceof StartingVersionLatest$) {
Snapshot latestSnapshot = snapshotManager.loadLatestSnapshot();
// "latest": start reading from the next commit
return Optional.of(latestSnapshot.getVersion() + 1);
cachedStartingVersion = Optional.of(latestSnapshot.getVersion() + 1);
return cachedStartingVersion;
} else if (startingVersion instanceof StartingVersion) {
long version = ((StartingVersion) startingVersion).version();
if (!validateProtocolAt(spark, snapshotManager, engine, version)) {
Expand All @@ -354,11 +363,13 @@ Optional<Long> getStartingVersion() {
snapshotManager.checkVersionExists(
version, /* mustBeRecreatable= */ false, /* allowOutOfRange= */ false);
}
return Optional.of(version);
cachedStartingVersion = Optional.of(version);
return cachedStartingVersion;
}
}
// TODO(#5319): Implement startingTimestamp support
return Optional.empty();
cachedStartingVersion = Optional.empty();
return cachedStartingVersion;
}

/**
Expand Down Expand Up @@ -490,6 +501,20 @@ private CloseableIterator<IndexedFile> filterDeltaLogs(
Optional<Long> endVersionOpt =
endOffset.isPresent() ? Optional.of(endOffset.get().reservoirVersion()) : Optional.empty();

// Cap endVersion to the latest available version. The Kernel's getTableChanges requires
// endVersion to be an actual existing version or empty.
if (endVersionOpt.isPresent()) {
long latestVersion = snapshotAtSourceInit.getVersion();
if (endVersionOpt.get() > latestVersion) {
// This could happen because:
// 1. data could be added after snapshotAtSourceInit was captured.
// 2. buildOffsetFromIndexedFile bumps the version up by one when we hit the END_INDEX.
// TODO(#5318): consider caching the latest version to avoid loading a new snapshot.
// TODO(#5318): kernel should ideally relax this constraint.
endVersionOpt = Optional.of(snapshotManager.loadLatestSnapshot().getVersion());
}
}

CommitRange commitRange;
try {
commitRange = snapshotManager.getTableChanges(engine, startVersion, endVersionOpt);
Expand Down
233 changes: 211 additions & 22 deletions kernel-spark/src/test/java/io/delta/kernel/spark/Dsv2BasicTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
import org.apache.spark.SparkConf;
import org.apache.spark.sql.*;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
import org.junit.jupiter.api.*;
import org.junit.jupiter.api.io.TempDir;

Expand All @@ -35,6 +35,13 @@ public class Dsv2BasicTest {
private SparkSession spark;
private String nameSpace;

private static final StructType TEST_SCHEMA =
DataTypes.createStructType(
Arrays.asList(
DataTypes.createStructField("id", DataTypes.IntegerType, false),
DataTypes.createStructField("name", DataTypes.StringType, false),
DataTypes.createStructField("value", DataTypes.DoubleType, false)));

@BeforeAll
public void setUp(@TempDir File tempDir) {
// Spark doesn't allow '-'
Expand Down Expand Up @@ -164,46 +171,148 @@ public void testColumnMappingRead(@TempDir File deltaTablePath) {
}

@Test
public void testStreamingRead(@TempDir File deltaTablePath) {
public void testStreamingReadMultipleVersions(@TempDir File deltaTablePath) throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@huan233usc what is the scope of the Dsv2BasicTest class? are all operations supposed to be added there? i want some structure and scope to each test suite, so that we dont end up with randomization in what test is where.

why not make a separate test suite for streaming?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think we should separate the test suites for streaming, and probably write and DML in the future.

However, for those tests kernel-spark/src/test/java/io/delta/kernel/spark/* I was thinking about consolidate it with existing v1 tests(which now could run with both v1 and v2 connector now) in the end

Dsv2BasicTest was added before we made the module change to enable running v1 and v2 connector with DeltaCatalog. It has a setup that we could write using v1 connector but read with v2 connector(with a special test catalog impl io.delta.kernel.spark.catalog.TestCatalog), and so we could cover some simple e2e scenario in the very beginning.

String tablePath = deltaTablePath.getAbsolutePath();

// Write version 0
writeInitialData(tablePath, Arrays.asList(RowFactory.create(1, "Alice", 100.0)));

// Write version 1
appendTestData(tablePath, Arrays.asList(RowFactory.create(2, "Bob", 200.0)));

// Write version 2
appendTestData(tablePath, Arrays.asList(RowFactory.create(3, "Charlie", 300.0)));

// Start streaming from version 0 - should read all three versions
Dataset<Row> streamingDF = createStreamingDF(tablePath, "0");
assertTrue(streamingDF.isStreaming(), "Dataset should be streaming");

// Process all batches - should have all data from versions 0, 1, and 2
List<Row> actualRows = processStreamingQuery(streamingDF, "test_multiple_versions");
List<Row> expectedRows =
Arrays.asList(
RowFactory.create(1, "Alice", 100.0),
RowFactory.create(2, "Bob", 200.0),
RowFactory.create(3, "Charlie", 300.0));

assertStreamingDataEquals(actualRows, expectedRows);
}

@Test
public void testStreamingReadWithStartingVersionLatest(@TempDir File deltaTablePath)
throws Exception {
String tablePath = deltaTablePath.getAbsolutePath();

// Write initial data (version 0)
List<Row> version0Rows =
Arrays.asList(RowFactory.create(1, "Alice", 100.0), RowFactory.create(2, "Bob", 200.0));
writeInitialData(tablePath, version0Rows);

// Start streaming from "latest" (should start reading from version 1 onwards)
Dataset<Row> streamingDF = createStreamingDF(tablePath, "latest");
assertTrue(streamingDF.isStreaming(), "Dataset should be streaming");

StreamingQuery query = null;
try {
query =
streamingDF
.writeStream()
.format("memory")
.queryName("test_latest_initial")
.outputMode("append")
.start();

query.processAllAvailable();

// Should have no data initially since we're starting after the current version
Dataset<Row> results = spark.sql("SELECT * FROM test_latest_initial");
List<Row> initialRows = results.collectAsList();
assertTrue(
initialRows.isEmpty(),
"Should have no data when starting from 'latest' before new data is added");

// Add more data (version 1)
List<Row> version1Rows =
Arrays.asList(
RowFactory.create(3, "Charlie", 300.0), RowFactory.create(4, "David", 400.0));
appendTestData(tablePath, version1Rows);

// Process the next batch
query.processAllAvailable();

// Now should only have the new data (version 1)
Dataset<Row> finalResults = spark.sql("SELECT * FROM test_latest_initial");
List<Row> finalRows = finalResults.collectAsList();

assertStreamingDataEquals(finalRows, version1Rows);
} finally {
if (query != null) {
query.stop();
}
}
}

@Test
public void testStreamingReadWithRateLimit(@TempDir File deltaTablePath) throws Exception {
String tablePath = deltaTablePath.getAbsolutePath();
// Create test data using standard Delta Lake
Dataset<Row> testData =
spark.createDataFrame(
Arrays.asList(RowFactory.create(1, "Alice", 100.0), RowFactory.create(2, "Bob", 200.0)),
DataTypes.createStructType(
Arrays.asList(
DataTypes.createStructField("id", DataTypes.IntegerType, false),
DataTypes.createStructField("name", DataTypes.StringType, false),
DataTypes.createStructField("value", DataTypes.DoubleType, false))));
testData.write().format("delta").save(tablePath);

// Test streaming read using path-based table
Dataset<Row> streamingDF =
spark.readStream().table(String.format("dsv2.delta.`%s`", tablePath));
// Write initial data with 3 rows (likely to be in a single file)
List<Row> initialRows =
Arrays.asList(
RowFactory.create(1, "Alice", 100.0),
RowFactory.create(2, "Bob", 200.0),
RowFactory.create(3, "Charlie", 300.0));
writeInitialData(tablePath, initialRows);

// Start streaming with maxFilesPerTrigger=1 to test rate limiting
Dataset<Row> streamingDF = createStreamingDF(tablePath, "0", "1");
assertTrue(streamingDF.isStreaming(), "Dataset should be streaming");
StreamingQueryException exception =

// Process with rate limiting - should still get all data, just potentially in multiple batches
List<Row> actualRows = processStreamingQuery(streamingDF, "test_rate_limit");
assertStreamingDataEquals(actualRows, initialRows);
}

@Test
public void testStreamingReadWithoutStartingVersionThrowsException(@TempDir File deltaTablePath) {
String tablePath = deltaTablePath.getAbsolutePath();

// Write initial data
List<Row> initialRows =
Arrays.asList(RowFactory.create(1, "Alice", 100.0), RowFactory.create(2, "Bob", 200.0));
writeInitialData(tablePath, initialRows);

// Try to create streaming DataFrame without startingVersion using DSv2 path
// Using dsv2.delta.`path` syntax to force DSv2 (SparkMicroBatchStream) instead of DSv1
String dsv2TableRef = String.format("dsv2.delta.`%s`", tablePath);

// Should throw UnsupportedOperationException when trying to process
org.apache.spark.sql.streaming.StreamingQueryException exception =
assertThrows(
StreamingQueryException.class,
org.apache.spark.sql.streaming.StreamingQueryException.class,
() -> {
StreamingQuery query =
streamingDF
spark
.readStream()
.table(dsv2TableRef)
.writeStream()
.format("memory")
.queryName("test_streaming_query")
.queryName("test_no_starting_version")
.outputMode("append")
.start();
query.processAllAvailable();
query.stop();
});

// Verify the root cause is UnsupportedOperationException
Throwable rootCause = exception.getCause();
assertTrue(
rootCause instanceof UnsupportedOperationException,
"Root cause should be UnsupportedOperationException");
"Root cause should be UnsupportedOperationException, but was: "
+ (rootCause != null ? rootCause.getClass().getName() : "null"));
assertTrue(
rootCause.getMessage().contains("is not supported"),
"Root cause message should indicate that streaming operation is not supported: "
+ rootCause.getMessage());
"Exception message should indicate operation is not supported: " + rootCause.getMessage());
}

//////////////////////
Expand All @@ -216,4 +325,84 @@ private void assertDatasetEquals(Dataset<Row> actual, List<Row> expectedRows) {
actualRows,
() -> "Datasets differ: expected=" + expectedRows + "\nactual=" + actualRows);
}

private void appendData(String tablePath, Dataset<Row> data) {
data.write().format("delta").mode("append").save(tablePath);
}

private List<Row> processStreamingQuery(Dataset<Row> streamingDF, String queryName)
throws Exception {
StreamingQuery query = null;
try {
query =
streamingDF
.writeStream()
.format("memory")
.queryName(queryName)
.outputMode("append")
.start();

query.processAllAvailable();

// Query the memory sink to get results
Dataset<Row> results = spark.sql("SELECT * FROM " + queryName);
return results.collectAsList();
} finally {
if (query != null) {
query.stop();
}
}
}

private void assertStreamingDataEquals(List<Row> actualRows, List<Row> expectedRows) {
assertEquals(
expectedRows.size(),
actualRows.size(),
() ->
"Row count differs: expected="
+ expectedRows.size()
+ " actual="
+ actualRows.size()
+ "\nExpected rows: "
+ expectedRows
+ "\nActual rows: "
+ actualRows);

// Compare rows (order-independent for robustness)
assertTrue(
actualRows.containsAll(expectedRows) && expectedRows.containsAll(actualRows),
() -> "Streaming data differs:\nExpected: " + expectedRows + "\nActual: " + actualRows);
}

private Dataset<Row> createTestData(List<Row> rows) {
return spark.createDataFrame(rows, TEST_SCHEMA);
}

private void writeInitialData(String tablePath, List<Row> rows) {
createTestData(rows).write().format("delta").save(tablePath);
}

private void appendTestData(String tablePath, List<Row> rows) {
appendData(tablePath, createTestData(rows));
}

private Dataset<Row> createStreamingDF(String tablePath, String startingVersion) {
// Use dsv2.delta.`path` syntax to force DSv2 (SparkMicroBatchStream) instead of DSv1
String dsv2TableRef = String.format("dsv2.delta.`%s`", tablePath);
return spark.readStream().option("startingVersion", startingVersion).table(dsv2TableRef);
}

private Dataset<Row> createStreamingDF(
String tablePath, String startingVersion, String maxFilesPerTrigger) {
// Use dsv2.delta.`path` syntax to force DSv2 (SparkMicroBatchStream) instead of DSv1
String dsv2TableRef = String.format("dsv2.delta.`%s`", tablePath);
if (maxFilesPerTrigger != null) {
return spark
.readStream()
.option("startingVersion", startingVersion)
.option("maxFilesPerTrigger", maxFilesPerTrigger)
.table(dsv2TableRef);
}
return createStreamingDF(tablePath, startingVersion);
}
}
Loading
Loading