Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@
import static io.delta.kernel.internal.util.Preconditions.checkArgument;
import static io.delta.kernel.internal.util.Preconditions.checkState;

import io.delta.kernel.CommitActions;
import io.delta.kernel.data.ColumnVector;
import io.delta.kernel.data.ColumnarBatch;
import io.delta.kernel.data.Row;
import io.delta.kernel.engine.Engine;
import io.delta.kernel.internal.DeltaLogActionUtils;
import io.delta.kernel.internal.TableChangesUtils;
import io.delta.kernel.internal.actions.AddFile;
import io.delta.kernel.internal.actions.RemoveFile;
import io.delta.kernel.internal.commitrange.CommitRangeImpl;
Expand Down Expand Up @@ -98,29 +98,29 @@ public static Optional<RemoveFile> getDataChangeRemove(ColumnarBatch batch, int
}

/**
* Gets actions from a commit range without requiring a snapshot at the exact start version.
* Gets commit-level actions from a commit range without requiring a snapshot at the exact start
* version.
*
* <p>This method is "unsafe" because it bypasses the standard {@code CommitRange.getActions()}
* API which requires a snapshot at the exact start version for protocol validation.
* <p>Returns an iterator over {@link CommitActions}, where each CommitActions represents a single
* commit.
*
* <p>This is necessary for streaming scenarios where the start version might not have a
* recreatable snapshot (e.g., after log cleanup) or where {@code startingVersion} is used.
* <p>This method is "unsafe" because it bypasses the standard {@code
* CommitRange.getCommitActions()} API which requires a snapshot at the exact start version for
* protocol validation.
*
* @param engine the Delta engine
* @param commitRange the commit range to read actions from
* @param tablePath the path to the Delta table
* @param actionSet the set of actions to read (e.g., ADD, REMOVE)
* @return an iterator over columnar batches containing the requested actions
* @return an iterator over {@link CommitActions}, one per commit version
*/
public static CloseableIterator<ColumnarBatch> getActionsFromRangeUnsafe(
public static CloseableIterator<CommitActions> getCommitActionsFromRangeUnsafe(
Engine engine,
CommitRangeImpl commitRange,
String tablePath,
Set<DeltaLogActionUtils.DeltaAction> actionSet) {
return TableChangesUtils.flattenCommitsAndAddMetadata(
engine,
DeltaLogActionUtils.getActionsFromCommitFilesWithProtocolValidation(
engine, tablePath, commitRange.getDeltaFiles(), actionSet));
return DeltaLogActionUtils.getActionsFromCommitFilesWithProtocolValidation(
engine, tablePath, commitRange.getDeltaFiles(), actionSet);
}

/** Private constructor to prevent instantiation of this utility class. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,8 @@ object DeltaSQLConfV2 extends DeltaSQLConfUtils {
buildConf("v2.enableMode")
.doc(
"Controls the Delta V2 connector enable mode. " +
"Valid values: NONE (disabled, default), STRICT (should ONLY be enabled for testing).")
"Valid values: NONE (disabled, default), STRICT (should ONLY be enabled for testing).")
.stringConf
.checkValues(Set("NONE", "STRICT"))
.createWithDefault("NONE")
}

261 changes: 224 additions & 37 deletions kernel-spark/src/test/java/io/delta/kernel/spark/Dsv2BasicTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
import org.apache.spark.SparkConf;
import org.apache.spark.sql.*;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
import org.junit.jupiter.api.*;
import org.junit.jupiter.api.io.TempDir;

Expand All @@ -35,6 +35,13 @@ public class Dsv2BasicTest {
private SparkSession spark;
private String nameSpace;

private static final StructType TEST_SCHEMA =
DataTypes.createStructType(
Arrays.asList(
DataTypes.createStructField("id", DataTypes.IntegerType, false),
DataTypes.createStructField("name", DataTypes.StringType, false),
DataTypes.createStructField("value", DataTypes.DoubleType, false)));

@BeforeAll
public void setUp(@TempDir File tempDir) {
// Spark doesn't allow '-'
Expand Down Expand Up @@ -164,46 +171,146 @@ public void testColumnMappingRead(@TempDir File deltaTablePath) {
}

@Test
public void testStreamingRead(@TempDir File deltaTablePath) {
public void testStreamingReadMultipleVersions(@TempDir File deltaTablePath) throws Exception {
String tablePath = deltaTablePath.getAbsolutePath();
// Create test data using standard Delta Lake
Dataset<Row> testData =
spark.createDataFrame(
Arrays.asList(RowFactory.create(1, "Alice", 100.0), RowFactory.create(2, "Bob", 200.0)),
DataTypes.createStructType(
Arrays.asList(
DataTypes.createStructField("id", DataTypes.IntegerType, false),
DataTypes.createStructField("name", DataTypes.StringType, false),
DataTypes.createStructField("value", DataTypes.DoubleType, false))));
testData.write().format("delta").save(tablePath);

// Test streaming read using path-based table
Dataset<Row> streamingDF =
spark.readStream().table(String.format("dsv2.delta.`%s`", tablePath));
// Write version 0
writeInitialData(tablePath, Arrays.asList(RowFactory.create(1, "Alice", 100.0)));

// Write version 1
appendTestData(tablePath, Arrays.asList(RowFactory.create(2, "Bob", 200.0)));

// Write version 2
appendTestData(tablePath, Arrays.asList(RowFactory.create(3, "Charlie", 300.0)));

// Start streaming from version 0 - should read all three versions
Dataset<Row> streamingDF = createStreamingDF(tablePath, "0");
assertTrue(streamingDF.isStreaming(), "Dataset should be streaming");
StreamingQueryException exception =
assertThrows(
StreamingQueryException.class,
() -> {
StreamingQuery query =
streamingDF
.writeStream()
.format("memory")
.queryName("test_streaming_query")
.outputMode("append")
.start();
query.processAllAvailable();
query.stop();
});
Throwable rootCause = exception.getCause();
assertTrue(
rootCause instanceof UnsupportedOperationException,
"Root cause should be UnsupportedOperationException");
assertTrue(
rootCause.getMessage().contains("is not supported"),
"Root cause message should indicate that streaming operation is not supported: "
+ rootCause.getMessage());

// Process all batches - should have all data from versions 0, 1, and 2
List<Row> actualRows = processStreamingQuery(streamingDF, "test_multiple_versions");
List<Row> expectedRows =
Arrays.asList(
RowFactory.create(1, "Alice", 100.0),
RowFactory.create(2, "Bob", 200.0),
RowFactory.create(3, "Charlie", 300.0));

assertStreamingDataEquals(actualRows, expectedRows);
}

@Test
public void testStreamingReadWithStartingVersionLatest(@TempDir File deltaTablePath)
throws Exception {
String tablePath = deltaTablePath.getAbsolutePath();

// Write initial data (version 0)
List<Row> version0Rows =
Arrays.asList(RowFactory.create(1, "Alice", 100.0), RowFactory.create(2, "Bob", 200.0));
writeInitialData(tablePath, version0Rows);

// Start streaming from "latest" (should start reading from version 1 onwards)
Dataset<Row> streamingDF = createStreamingDF(tablePath, "latest");
assertTrue(streamingDF.isStreaming(), "Dataset should be streaming");

StreamingQuery query = null;
try {
query =
streamingDF
.writeStream()
.format("memory")
.queryName("test_latest_initial")
.outputMode("append")
.start();

query.processAllAvailable();

// Should have no data initially since we're starting after the current version
Dataset<Row> results = spark.sql("SELECT * FROM test_latest_initial");
List<Row> initialRows = results.collectAsList();
assertTrue(
initialRows.isEmpty(),
"Should have no data when starting from 'latest' before new data is added");

// Add more data (version 1)
List<Row> version1Rows =
Arrays.asList(
RowFactory.create(3, "Charlie", 300.0), RowFactory.create(4, "David", 400.0));
appendTestData(tablePath, version1Rows);

// Process the next batch
query.processAllAvailable();

// Now should only have the new data (version 1)
Dataset<Row> finalResults = spark.sql("SELECT * FROM test_latest_initial");
List<Row> finalRows = finalResults.collectAsList();

assertStreamingDataEquals(finalRows, version1Rows);
} finally {
if (query != null) {
query.stop();
}
}
}

@Test
public void testStreamingReadWithRateLimit(@TempDir File deltaTablePath) throws Exception {
String tablePath = deltaTablePath.getAbsolutePath();

// Write initial data with 3 rows (likely to be in a single file)
List<Row> initialRows =
Arrays.asList(
RowFactory.create(1, "Alice", 100.0),
RowFactory.create(2, "Bob", 200.0),
RowFactory.create(3, "Charlie", 300.0));
writeInitialData(tablePath, initialRows);

// Start streaming with maxFilesPerTrigger=1 to test rate limiting
Dataset<Row> streamingDF = createStreamingDF(tablePath, "0", "1");
assertTrue(streamingDF.isStreaming(), "Dataset should be streaming");

// Process with rate limiting - should still get all data, just potentially in multiple batches
List<Row> actualRows = processStreamingQuery(streamingDF, "test_rate_limit");
assertStreamingDataEquals(actualRows, initialRows);
}

@Test
public void testStreamingReadWithoutStartingVersion(@TempDir File deltaTablePath)
throws Exception {
String tablePath = deltaTablePath.getAbsolutePath();

// Write initial data
List<Row> initialRows =
Arrays.asList(RowFactory.create(1, "Alice", 100.0), RowFactory.create(2, "Bob", 200.0));
writeInitialData(tablePath, initialRows);

// Create streaming DataFrame without startingVersion using DSv2 path
// Using dsv2.delta.`path` syntax to force DSv2 (SparkMicroBatchStream) instead of DSv1
String dsv2TableRef = String.format("dsv2.delta.`%s`", tablePath);

StreamingQuery query = null;
try {
query =
spark
.readStream()
.table(dsv2TableRef) // No startingVersion option
.writeStream()
.format("memory")
.queryName("test_no_starting_version")
.outputMode("append")
.start();

query.processAllAvailable();

// Should successfully read all data from initial snapshot
Dataset<Row> results = spark.sql("SELECT * FROM test_no_starting_version");
List<Row> actualRows = results.collectAsList();

assertStreamingDataEquals(actualRows, initialRows);
} finally {
if (query != null) {
query.stop();
}
}
}

//////////////////////
Expand All @@ -216,4 +323,84 @@ private void assertDatasetEquals(Dataset<Row> actual, List<Row> expectedRows) {
actualRows,
() -> "Datasets differ: expected=" + expectedRows + "\nactual=" + actualRows);
}

private void appendData(String tablePath, Dataset<Row> data) {
data.write().format("delta").mode("append").save(tablePath);
}

private List<Row> processStreamingQuery(Dataset<Row> streamingDF, String queryName)
throws Exception {
StreamingQuery query = null;
try {
query =
streamingDF
.writeStream()
.format("memory")
.queryName(queryName)
.outputMode("append")
.start();

query.processAllAvailable();

// Query the memory sink to get results
Dataset<Row> results = spark.sql("SELECT * FROM " + queryName);
return results.collectAsList();
} finally {
if (query != null) {
query.stop();
}
}
}

private void assertStreamingDataEquals(List<Row> actualRows, List<Row> expectedRows) {
assertEquals(
expectedRows.size(),
actualRows.size(),
() ->
"Row count differs: expected="
+ expectedRows.size()
+ " actual="
+ actualRows.size()
+ "\nExpected rows: "
+ expectedRows
+ "\nActual rows: "
+ actualRows);

// Compare rows (order-independent for robustness)
assertTrue(
actualRows.containsAll(expectedRows) && expectedRows.containsAll(actualRows),
() -> "Streaming data differs:\nExpected: " + expectedRows + "\nActual: " + actualRows);
}

private Dataset<Row> createTestData(List<Row> rows) {
return spark.createDataFrame(rows, TEST_SCHEMA);
}

private void writeInitialData(String tablePath, List<Row> rows) {
createTestData(rows).write().format("delta").save(tablePath);
}

private void appendTestData(String tablePath, List<Row> rows) {
appendData(tablePath, createTestData(rows));
}

private Dataset<Row> createStreamingDF(String tablePath, String startingVersion) {
// Use dsv2.delta.`path` syntax to force DSv2 (SparkMicroBatchStream) instead of DSv1
String dsv2TableRef = String.format("dsv2.delta.`%s`", tablePath);
return spark.readStream().option("startingVersion", startingVersion).table(dsv2TableRef);
}

private Dataset<Row> createStreamingDF(
String tablePath, String startingVersion, String maxFilesPerTrigger) {
// Use dsv2.delta.`path` syntax to force DSv2 (SparkMicroBatchStream) instead of DSv1
String dsv2TableRef = String.format("dsv2.delta.`%s`", tablePath);
if (maxFilesPerTrigger != null) {
return spark
.readStream()
.option("startingVersion", startingVersion)
.option("maxFilesPerTrigger", maxFilesPerTrigger)
.table(dsv2TableRef);
}
return createStreamingDF(tablePath, startingVersion);
}
}
Loading
Loading