diff --git a/kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkMicroBatchStream.java b/kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkMicroBatchStream.java index 4c3a9048b44..5c5a51665e1 100644 --- a/kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkMicroBatchStream.java +++ b/kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkMicroBatchStream.java @@ -93,6 +93,10 @@ public class SparkMicroBatchStream implements MicroBatchStream, SupportsAdmissio */ private boolean isFirstBatch = false; + // Cached starting version to ensure idempotent behavior for "latest" starting version. + // getStartingVersion() must return the same value across multiple calls. + private volatile Optional cachedStartingVersion = null; + public SparkMicroBatchStream( DeltaSnapshotManager snapshotManager, Snapshot snapshotAtSourceInit, @@ -334,7 +338,11 @@ public void stop() { * *

This is the DSv2 Kernel-based implementation of DeltaSource.getStartingVersion. */ - Optional getStartingVersion() { + synchronized Optional getStartingVersion() { + if (cachedStartingVersion != null) { + return cachedStartingVersion; + } + // TODO(#5319): DeltaSource.scala uses `allowOutOfRange` parameter from // DeltaSQLConf.DELTA_CDF_ALLOW_OUT_OF_RANGE_TIMESTAMP. if (options.startingVersion().isDefined()) { @@ -342,7 +350,8 @@ Optional getStartingVersion() { if (startingVersion instanceof StartingVersionLatest$) { Snapshot latestSnapshot = snapshotManager.loadLatestSnapshot(); // "latest": start reading from the next commit - return Optional.of(latestSnapshot.getVersion() + 1); + cachedStartingVersion = Optional.of(latestSnapshot.getVersion() + 1); + return cachedStartingVersion; } else if (startingVersion instanceof StartingVersion) { long version = ((StartingVersion) startingVersion).version(); if (!validateProtocolAt(spark, snapshotManager, engine, version)) { @@ -354,11 +363,13 @@ Optional getStartingVersion() { snapshotManager.checkVersionExists( version, /* mustBeRecreatable= */ false, /* allowOutOfRange= */ false); } - return Optional.of(version); + cachedStartingVersion = Optional.of(version); + return cachedStartingVersion; } } // TODO(#5319): Implement startingTimestamp support - return Optional.empty(); + cachedStartingVersion = Optional.empty(); + return cachedStartingVersion; } /** @@ -490,6 +501,20 @@ private CloseableIterator filterDeltaLogs( Optional endVersionOpt = endOffset.isPresent() ? Optional.of(endOffset.get().reservoirVersion()) : Optional.empty(); + // Cap endVersion to the latest available version. The Kernel's getTableChanges requires + // endVersion to be an actual existing version or empty. + if (endVersionOpt.isPresent()) { + long latestVersion = snapshotAtSourceInit.getVersion(); + if (endVersionOpt.get() > latestVersion) { + // This could happen because: + // 1. data could be added after snapshotAtSourceInit was captured. + // 2. buildOffsetFromIndexedFile bumps the version up by one when we hit the END_INDEX. + // TODO(#5318): consider caching the latest version to avoid loading a new snapshot. + // TODO(#5318): kernel should ideally relax this constraint. + endVersionOpt = Optional.of(snapshotManager.loadLatestSnapshot().getVersion()); + } + } + CommitRange commitRange; try { commitRange = snapshotManager.getTableChanges(engine, startVersion, endVersionOpt); diff --git a/kernel-spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConfV2.scala b/kernel-spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConfV2.scala index f42f1a44048..e2f8aac7459 100644 --- a/kernel-spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConfV2.scala +++ b/kernel-spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConfV2.scala @@ -39,9 +39,8 @@ object DeltaSQLConfV2 extends DeltaSQLConfUtils { buildConf("v2.enableMode") .doc( "Controls the Delta V2 connector enable mode. " + - "Valid values: NONE (disabled, default), STRICT (should ONLY be enabled for testing).") + "Valid values: NONE (disabled, default), STRICT (should ONLY be enabled for testing).") .stringConf .checkValues(Set("NONE", "STRICT")) .createWithDefault("NONE") } - diff --git a/kernel-spark/src/test/java/io/delta/kernel/spark/Dsv2BasicTest.java b/kernel-spark/src/test/java/io/delta/kernel/spark/Dsv2BasicTest.java index 70c73df663f..4285d0217ff 100644 --- a/kernel-spark/src/test/java/io/delta/kernel/spark/Dsv2BasicTest.java +++ b/kernel-spark/src/test/java/io/delta/kernel/spark/Dsv2BasicTest.java @@ -24,8 +24,8 @@ import org.apache.spark.SparkConf; import org.apache.spark.sql.*; import org.apache.spark.sql.streaming.StreamingQuery; -import org.apache.spark.sql.streaming.StreamingQueryException; import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.StructType; import org.junit.jupiter.api.*; import org.junit.jupiter.api.io.TempDir; @@ -35,6 +35,13 @@ public class Dsv2BasicTest { private SparkSession spark; private String nameSpace; + private static final StructType TEST_SCHEMA = + DataTypes.createStructType( + Arrays.asList( + DataTypes.createStructField("id", DataTypes.IntegerType, false), + DataTypes.createStructField("name", DataTypes.StringType, false), + DataTypes.createStructField("value", DataTypes.DoubleType, false))); + @BeforeAll public void setUp(@TempDir File tempDir) { // Spark doesn't allow '-' @@ -164,46 +171,148 @@ public void testColumnMappingRead(@TempDir File deltaTablePath) { } @Test - public void testStreamingRead(@TempDir File deltaTablePath) { + public void testStreamingReadMultipleVersions(@TempDir File deltaTablePath) throws Exception { + String tablePath = deltaTablePath.getAbsolutePath(); + + // Write version 0 + writeInitialData(tablePath, Arrays.asList(RowFactory.create(1, "Alice", 100.0))); + + // Write version 1 + appendTestData(tablePath, Arrays.asList(RowFactory.create(2, "Bob", 200.0))); + + // Write version 2 + appendTestData(tablePath, Arrays.asList(RowFactory.create(3, "Charlie", 300.0))); + + // Start streaming from version 0 - should read all three versions + Dataset streamingDF = createStreamingDF(tablePath, "0"); + assertTrue(streamingDF.isStreaming(), "Dataset should be streaming"); + + // Process all batches - should have all data from versions 0, 1, and 2 + List actualRows = processStreamingQuery(streamingDF, "test_multiple_versions"); + List expectedRows = + Arrays.asList( + RowFactory.create(1, "Alice", 100.0), + RowFactory.create(2, "Bob", 200.0), + RowFactory.create(3, "Charlie", 300.0)); + + assertStreamingDataEquals(actualRows, expectedRows); + } + + @Test + public void testStreamingReadWithStartingVersionLatest(@TempDir File deltaTablePath) + throws Exception { + String tablePath = deltaTablePath.getAbsolutePath(); + + // Write initial data (version 0) + List version0Rows = + Arrays.asList(RowFactory.create(1, "Alice", 100.0), RowFactory.create(2, "Bob", 200.0)); + writeInitialData(tablePath, version0Rows); + + // Start streaming from "latest" (should start reading from version 1 onwards) + Dataset streamingDF = createStreamingDF(tablePath, "latest"); + assertTrue(streamingDF.isStreaming(), "Dataset should be streaming"); + + StreamingQuery query = null; + try { + query = + streamingDF + .writeStream() + .format("memory") + .queryName("test_latest_initial") + .outputMode("append") + .start(); + + query.processAllAvailable(); + + // Should have no data initially since we're starting after the current version + Dataset results = spark.sql("SELECT * FROM test_latest_initial"); + List initialRows = results.collectAsList(); + assertTrue( + initialRows.isEmpty(), + "Should have no data when starting from 'latest' before new data is added"); + + // Add more data (version 1) + List version1Rows = + Arrays.asList( + RowFactory.create(3, "Charlie", 300.0), RowFactory.create(4, "David", 400.0)); + appendTestData(tablePath, version1Rows); + + // Process the next batch + query.processAllAvailable(); + + // Now should only have the new data (version 1) + Dataset finalResults = spark.sql("SELECT * FROM test_latest_initial"); + List finalRows = finalResults.collectAsList(); + + assertStreamingDataEquals(finalRows, version1Rows); + } finally { + if (query != null) { + query.stop(); + } + } + } + + @Test + public void testStreamingReadWithRateLimit(@TempDir File deltaTablePath) throws Exception { String tablePath = deltaTablePath.getAbsolutePath(); - // Create test data using standard Delta Lake - Dataset testData = - spark.createDataFrame( - Arrays.asList(RowFactory.create(1, "Alice", 100.0), RowFactory.create(2, "Bob", 200.0)), - DataTypes.createStructType( - Arrays.asList( - DataTypes.createStructField("id", DataTypes.IntegerType, false), - DataTypes.createStructField("name", DataTypes.StringType, false), - DataTypes.createStructField("value", DataTypes.DoubleType, false)))); - testData.write().format("delta").save(tablePath); - // Test streaming read using path-based table - Dataset streamingDF = - spark.readStream().table(String.format("dsv2.delta.`%s`", tablePath)); + // Write initial data with 3 rows (likely to be in a single file) + List initialRows = + Arrays.asList( + RowFactory.create(1, "Alice", 100.0), + RowFactory.create(2, "Bob", 200.0), + RowFactory.create(3, "Charlie", 300.0)); + writeInitialData(tablePath, initialRows); + // Start streaming with maxFilesPerTrigger=1 to test rate limiting + Dataset streamingDF = createStreamingDF(tablePath, "0", "1"); assertTrue(streamingDF.isStreaming(), "Dataset should be streaming"); - StreamingQueryException exception = + + // Process with rate limiting - should still get all data, just potentially in multiple batches + List actualRows = processStreamingQuery(streamingDF, "test_rate_limit"); + assertStreamingDataEquals(actualRows, initialRows); + } + + @Test + public void testStreamingReadWithoutStartingVersionThrowsException(@TempDir File deltaTablePath) { + String tablePath = deltaTablePath.getAbsolutePath(); + + // Write initial data + List initialRows = + Arrays.asList(RowFactory.create(1, "Alice", 100.0), RowFactory.create(2, "Bob", 200.0)); + writeInitialData(tablePath, initialRows); + + // Try to create streaming DataFrame without startingVersion using DSv2 path + // Using dsv2.delta.`path` syntax to force DSv2 (SparkMicroBatchStream) instead of DSv1 + String dsv2TableRef = String.format("dsv2.delta.`%s`", tablePath); + + // Should throw UnsupportedOperationException when trying to process + org.apache.spark.sql.streaming.StreamingQueryException exception = assertThrows( - StreamingQueryException.class, + org.apache.spark.sql.streaming.StreamingQueryException.class, () -> { StreamingQuery query = - streamingDF + spark + .readStream() + .table(dsv2TableRef) .writeStream() .format("memory") - .queryName("test_streaming_query") + .queryName("test_no_starting_version") .outputMode("append") .start(); query.processAllAvailable(); query.stop(); }); + + // Verify the root cause is UnsupportedOperationException Throwable rootCause = exception.getCause(); assertTrue( rootCause instanceof UnsupportedOperationException, - "Root cause should be UnsupportedOperationException"); + "Root cause should be UnsupportedOperationException, but was: " + + (rootCause != null ? rootCause.getClass().getName() : "null")); assertTrue( rootCause.getMessage().contains("is not supported"), - "Root cause message should indicate that streaming operation is not supported: " - + rootCause.getMessage()); + "Exception message should indicate operation is not supported: " + rootCause.getMessage()); } ////////////////////// @@ -216,4 +325,84 @@ private void assertDatasetEquals(Dataset actual, List expectedRows) { actualRows, () -> "Datasets differ: expected=" + expectedRows + "\nactual=" + actualRows); } + + private void appendData(String tablePath, Dataset data) { + data.write().format("delta").mode("append").save(tablePath); + } + + private List processStreamingQuery(Dataset streamingDF, String queryName) + throws Exception { + StreamingQuery query = null; + try { + query = + streamingDF + .writeStream() + .format("memory") + .queryName(queryName) + .outputMode("append") + .start(); + + query.processAllAvailable(); + + // Query the memory sink to get results + Dataset results = spark.sql("SELECT * FROM " + queryName); + return results.collectAsList(); + } finally { + if (query != null) { + query.stop(); + } + } + } + + private void assertStreamingDataEquals(List actualRows, List expectedRows) { + assertEquals( + expectedRows.size(), + actualRows.size(), + () -> + "Row count differs: expected=" + + expectedRows.size() + + " actual=" + + actualRows.size() + + "\nExpected rows: " + + expectedRows + + "\nActual rows: " + + actualRows); + + // Compare rows (order-independent for robustness) + assertTrue( + actualRows.containsAll(expectedRows) && expectedRows.containsAll(actualRows), + () -> "Streaming data differs:\nExpected: " + expectedRows + "\nActual: " + actualRows); + } + + private Dataset createTestData(List rows) { + return spark.createDataFrame(rows, TEST_SCHEMA); + } + + private void writeInitialData(String tablePath, List rows) { + createTestData(rows).write().format("delta").save(tablePath); + } + + private void appendTestData(String tablePath, List rows) { + appendData(tablePath, createTestData(rows)); + } + + private Dataset createStreamingDF(String tablePath, String startingVersion) { + // Use dsv2.delta.`path` syntax to force DSv2 (SparkMicroBatchStream) instead of DSv1 + String dsv2TableRef = String.format("dsv2.delta.`%s`", tablePath); + return spark.readStream().option("startingVersion", startingVersion).table(dsv2TableRef); + } + + private Dataset createStreamingDF( + String tablePath, String startingVersion, String maxFilesPerTrigger) { + // Use dsv2.delta.`path` syntax to force DSv2 (SparkMicroBatchStream) instead of DSv1 + String dsv2TableRef = String.format("dsv2.delta.`%s`", tablePath); + if (maxFilesPerTrigger != null) { + return spark + .readStream() + .option("startingVersion", startingVersion) + .option("maxFilesPerTrigger", maxFilesPerTrigger) + .table(dsv2TableRef); + } + return createStreamingDF(tablePath, startingVersion); + } } diff --git a/kernel-spark/src/test/scala/io/delta/kernel/spark/snapshot/unitycatalog/UCUtilsSuite.scala b/kernel-spark/src/test/scala/io/delta/kernel/spark/snapshot/unitycatalog/UCUtilsSuite.scala index fa29ca500ff..5f8ec4288b9 100644 --- a/kernel-spark/src/test/scala/io/delta/kernel/spark/snapshot/unitycatalog/UCUtilsSuite.scala +++ b/kernel-spark/src/test/scala/io/delta/kernel/spark/snapshot/unitycatalog/UCUtilsSuite.scala @@ -208,7 +208,9 @@ class UCUtilsSuite extends SparkFunSuite with SharedSparkSession { assert( info.getUcUri == ucUriBeta, s"Should use catalogBeta's URI, got: ${info.getUcUri}") - assert(info.getUcToken == ucTokenBeta, s"Should use catalogBeta's token, got: ${info.getUcToken}") + assert( + info.getUcToken == ucTokenBeta, + s"Should use catalogBeta's token, got: ${info.getUcToken}") assert(info.getTableId == tableIdBeta, s"Should extract tableIdBeta, got: ${info.getTableId}") assert( info.getTablePath == tablePathBeta, diff --git a/spark-unified/src/test/scala/org/apache/spark/sql/delta/DataFrameWriterV2WithV2ConnectorSuite.scala b/spark-unified/src/test/scala/org/apache/spark/sql/delta/DataFrameWriterV2WithV2ConnectorSuite.scala index 20376f7b530..f345947d983 100644 --- a/spark-unified/src/test/scala/org/apache/spark/sql/delta/DataFrameWriterV2WithV2ConnectorSuite.scala +++ b/spark-unified/src/test/scala/org/apache/spark/sql/delta/DataFrameWriterV2WithV2ConnectorSuite.scala @@ -27,13 +27,13 @@ class DataFrameWriterV2WithV2ConnectorSuite with V2ForceTest { /** - * Skip tests that require write operations after initial table creation. + * Tests that we expect to fail because they require write operations after initial table creation. * * Kernel's SparkTable (V2 connector) only implements SupportsRead, not SupportsWrite. - * Tests that perform append/replace operations after table creation are skipped. + * Tests that perform append/replace operations after table creation are expected to fail. */ - override protected def shouldSkipTest(testName: String): Boolean = { - val skippedTests = Set( + override protected def shouldFail(testName: String): Boolean = { + val expectedToFailTests = Set( // Append operations - require SupportsWrite "Append: basic append", "Append: by name not position", @@ -65,6 +65,6 @@ class DataFrameWriterV2WithV2ConnectorSuite "CreateOrReplace: table exists" ) - skippedTests.contains(testName) + expectedToFailTests.contains(testName) } } diff --git a/spark-unified/src/test/scala/org/apache/spark/sql/delta/test/DeltaSourceDSv2Suite.scala b/spark-unified/src/test/scala/org/apache/spark/sql/delta/test/DeltaSourceDSv2Suite.scala new file mode 100644 index 00000000000..41fc58d18dd --- /dev/null +++ b/spark-unified/src/test/scala/org/apache/spark/sql/delta/test/DeltaSourceDSv2Suite.scala @@ -0,0 +1,150 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta.test + +import org.apache.spark.sql.delta.DeltaLog +import org.apache.spark.sql.delta.DeltaConfigs +import org.apache.spark.sql.delta.DeltaOperations +import org.apache.spark.sql.delta.DeltaSourceSuite + +/** + * Test suite that runs DeltaSourceSuite using the V2 connector (V2_ENABLE_MODE=STRICT). + */ +class DeltaSourceDSv2Suite extends DeltaSourceSuite with V2ForceTest { + + override protected def useDsv2: Boolean = true + + /** + * Override disableLogCleanup to use DeltaLog API instead of SQL ALTER TABLE. + * Path-based ALTER TABLE doesn't work properly with V2_ENABLE_MODE=STRICT. + */ + override protected def disableLogCleanup(tablePath: String): Unit = { + val deltaLog = DeltaLog.forTable(spark, tablePath) + val metadata = deltaLog.snapshot.metadata + val newConfiguration = metadata.configuration ++ Map( + DeltaConfigs.ENABLE_EXPIRED_LOG_CLEANUP.key -> "false" + ) + deltaLog.startTransaction().commit( + metadata.copy(configuration = newConfiguration) :: Nil, + DeltaOperations.SetTableProperties( + Map(DeltaConfigs.ENABLE_EXPIRED_LOG_CLEANUP.key -> "false")) + ) + } + + private lazy val shouldPassTests = Set( + "startingVersion", + "startingVersion latest", + "startingVersion latest defined before started", + "startingVersion latest works on defined but empty table", + "new commits arrive after stream initialization - with explicit startingVersion" + ) + + private lazy val shouldFailTests = Set( + // === Schema Evolution === + "allow to change schema before starting a streaming query", + "restarting a query should pick up latest table schema and recover", + "handling nullability schema changes", + "allow user specified schema if consistent: v1 source", + "disallow user specified schema", + "createSource should create source with empty or matching table schema provided", + + // === Null Type Column Handling === + "streaming delta source should not drop null columns", + "streaming delta source should drop null columns without feature flag", + "DeltaLog.createDataFrame should drop null columns with feature flag", + "DeltaLog.createDataFrame should not drop null columns without feature flag", + + // === read options === + "skip change commits", + "excludeRegex works and doesn't mess up offsets across restarts - parquet version", + "startingVersion: user defined start works with mergeSchema", + "startingVersion latest calls update when starting", + "startingVersion should be ignored when restarting from a checkpoint, withRowTracking = true", + "startingVersion should be ignored when restarting from a checkpoint, withRowTracking = false", + "startingTimestamp", + "startingVersion and startingTimestamp are both set", + + // === Other tests that bypass V2 by not using loadStreamWithOptions === + "disallow to change schema after starting a streaming query", + "maxFilesPerTrigger: invalid parameter", + "maxBytesPerTrigger: invalid parameter", + "recreate the reservoir should fail the query", + "excludeRegex throws good error on bad regex pattern", + "SC-46515: deltaSourceIgnoreChangesError contains removeFile, version, tablePath", + "SC-46515: deltaSourceIgnoreDeleteError contains removeFile, version, tablePath", + "Delta sources should verify the protocol reader version", + "can delete old files of a snapshot without update", + "Delta source advances with non-data inserts and generates empty dataframe for non-data operations", + + // === Data Loss Detection === + "fail on data loss - starting from missing files", + "fail on data loss - gaps of files", + "fail on data loss - starting from missing files with option off", + "fail on data loss - gaps of files with option off", + + // === Rate Limiting / Trigger Options === + "maxFilesPerTrigger", + "maxFilesPerTrigger: metadata checkpoint", + "maxFilesPerTrigger: change and restart", + "maxFilesPerTrigger: ignored when using Trigger.Once", + "maxFilesPerTrigger: Trigger.AvailableNow respects read limits", + "maxBytesPerTrigger: process at least one file", + "maxBytesPerTrigger: metadata checkpoint", + "maxBytesPerTrigger: change and restart", + "maxBytesPerTrigger: Trigger.AvailableNow respects read limits", + "maxBytesPerTrigger: max bytes and max files together", + "Trigger.AvailableNow with an empty table", + "startingVersion should work with rate time", + "Rate limited Delta source advances with non-data inserts", + "ES-445863: delta source should not hang or reprocess data when using AvailableNow", + + // === Source Offset / Version Handling === + "unknown sourceVersion value", + "invalid sourceVersion value", + "missing sourceVersion", + "unmatched reservoir id", + "isInitialSnapshot serializes as isStartingVersion", + "DeltaSourceOffset deserialization", + "DeltaSourceOffset deserialization error", + "DeltaSourceOffset serialization", + "DeltaSourceOffset.validateOffsets", + + // === Misc === + "no schema should throw an exception", + "basic", + "initial snapshot ends at base index of next version", + "new commits arrive after stream initialization", + "Delta sources don't write offsets with null json", + "Delta source advances with non-data inserts and generates empty dataframe for addl files", + "a fast writer should not starve a Delta source", + "start from corrupt checkpoint", + "SC-11561: can consume new data without update", + "make sure that the delta sources works fine", + "should not attempt to read a non exist version", + "self union a Delta table should pass the catalog table assert" + ) + + override protected def shouldFail(testName: String): Boolean = { + val inPassList = shouldPassTests.contains(testName) + val inFailList = shouldFailTests.contains(testName) + + assert(inPassList || inFailList, s"Test '$testName' not in shouldPassTests or shouldFailTests") + assert(!(inPassList && inFailList), s"Test '$testName' in both shouldPassTests and shouldFailTests") + + inFailList + } +} diff --git a/spark-unified/src/test/scala/org/apache/spark/sql/delta/test/V2ForceTest.scala b/spark-unified/src/test/scala/org/apache/spark/sql/delta/test/V2ForceTest.scala index cdd927a82db..43cda8c995f 100644 --- a/spark-unified/src/test/scala/org/apache/spark/sql/delta/test/V2ForceTest.scala +++ b/spark-unified/src/test/scala/org/apache/spark/sql/delta/test/V2ForceTest.scala @@ -44,15 +44,15 @@ trait V2ForceTest extends DeltaSQLCommandTest { private val testsRun: mutable.Set[String] = mutable.Set.empty /** - * Override `test` to apply the `shouldSkipTest` logic. - * Tests that should be skipped are converted to ignored tests. + * Override `test` to apply the `shouldFail` logic. + * Tests that are expected to fail are converted to ignored tests. */ abstract override protected def test( testName: String, testTags: Tag*)(testFun: => Any)(implicit pos: Position): Unit = { - if (shouldSkipTest(testName)) { + if (shouldFail(testName)) { super.ignore( - s"$testName - skipped for Kernel-based V2 connector (not yet supported)")(testFun) + s"$testName - expected to fail with Kernel-based V2 connector (not yet supported)")(testFun) } else { super.test(testName, testTags: _*) { testsRun.add(testName) @@ -62,14 +62,14 @@ trait V2ForceTest extends DeltaSQLCommandTest { } /** - * Determine if a test should be skipped based on the test name. - * Subclasses should override this method to define their skip logic. - * By default, no tests are skipped. + * Determine if a test is expected to fail based on the test name. + * Subclasses should override this method to define which tests are expected to fail. + * By default, no tests are expected to fail. * * @param testName The name of the test - * @return true if the test should be skipped, false otherwise + * @return true if the test is expected to fail, false otherwise */ - protected def shouldSkipTest(testName: String): Boolean = false + protected def shouldFail(testName: String): Boolean = false /** * Override `sparkConf` to set V2_ENABLE_MODE to "STRICT". diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSourceSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSourceSuite.scala index b8d9f3baa54..fb60ab89c85 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSourceSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSourceSuite.scala @@ -253,9 +253,7 @@ class DeltaSourceSuite extends DeltaSourceSuiteBase val deltaLog = DeltaLog.forTable(spark, new Path(inputDir.toURI)) withMetadata(deltaLog, StructType.fromDDL("value STRING")) - val df = spark.readStream - .format("delta") - .load(inputDir.getCanonicalPath) + val df = loadStreamWithOptions(inputDir.getCanonicalPath, Map.empty) .filter($"value" contains "keep") testStream(df)( @@ -1315,6 +1313,89 @@ class DeltaSourceSuite extends DeltaSourceSuiteBase } } + test("new commits arrive after stream initialization") { + withTempDirs { (inputDir, outputDir, checkpointDir) => + // Add version 0 + Seq(1, 2, 3).toDF("value").write.format("delta").save(inputDir.getCanonicalPath) + + val df = spark.readStream + .format("delta") + .option(DeltaOptions.MAX_FILES_PER_TRIGGER_OPTION, "1") + .load(inputDir.getCanonicalPath) + + val q = df.writeStream + .format("delta") + .option("checkpointLocation", checkpointDir.getCanonicalPath) + .start(outputDir.getCanonicalPath) + + try { + // Process the initial snapshot + q.processAllAvailable() + checkAnswer( + spark.read.format("delta").load(outputDir.getCanonicalPath), + Seq(1, 2, 3).toDF("value")) + + // Add version 1 and version 2 (after snapshotAtSourceInit was captured) + Seq(4, 5, 6).toDF("value").write + .format("delta").mode("append").save(inputDir.getCanonicalPath) + Seq(7, 8, 9).toDF("value").write + .format("delta").mode("append").save(inputDir.getCanonicalPath) + + q.processAllAvailable() + + checkAnswer( + spark.read.format("delta").load(outputDir.getCanonicalPath), + (1 to 9).toDF("value")) + } finally { + q.stop() + } + } + } + + test("new commits arrive after stream initialization - with explicit startingVersion") { + withTempDirs { (inputDir, outputDir, checkpointDir) => + // Add version 0 and version 1 + Seq(1, 2, 3).toDF("value").write.format("delta").save(inputDir.getCanonicalPath) + Seq(4, 5, 6).toDF("value").write + .format("delta").mode("append").save(inputDir.getCanonicalPath) + + // Start streaming from version 1 + val df = loadStreamWithOptions( + inputDir.getCanonicalPath, + Map( + "startingVersion" -> "1", + DeltaOptions.MAX_FILES_PER_TRIGGER_OPTION -> "1" + ) + ) + + val q = df.writeStream + .format("delta") + .option("checkpointLocation", checkpointDir.getCanonicalPath) + .start(outputDir.getCanonicalPath) + + try { + // Process version 1 only + q.processAllAvailable() + checkAnswer( + spark.read.format("delta").load(outputDir.getCanonicalPath), + Seq(4, 5, 6).toDF("value")) + + // Add version 2 and version 3 (after snapshotAtSourceInit was captured) + Seq(7, 8, 9).toDF("value").write + .format("delta").mode("append").save(inputDir.getCanonicalPath) + Seq(10, 11, 12).toDF("value").write + .format("delta").mode("append").save(inputDir.getCanonicalPath) + + q.processAllAvailable() + checkAnswer( + spark.read.format("delta").load(outputDir.getCanonicalPath), + (4 to 12).toDF("value")) + } finally { + q.stop() + } + } + } + test( "can delete old files of a snapshot without update" ) { @@ -1576,7 +1657,7 @@ class DeltaSourceSuite extends DeltaSourceSuiteBase } /** Disable log cleanup to avoid deleting logs we are testing. */ - private def disableLogCleanup(tablePath: String): Unit = { + protected def disableLogCleanup(tablePath: String): Unit = { sql(s"alter table delta.`$tablePath` " + s"set tblproperties (${DeltaConfigs.ENABLE_EXPIRED_LOG_CLEANUP.key} = false)") } @@ -1588,11 +1669,9 @@ class DeltaSourceSuite extends DeltaSourceSuiteBase generateCommits(tablePath, start, start + 20.minutes) def testStartingVersion(startingVersion: Long): Unit = { - val q = spark.readStream - .format("delta") - .option("startingVersion", startingVersion) - .load(tablePath) - .writeStream + val df = loadStreamWithOptions( + tablePath, Map("startingVersion" -> startingVersion.toString)) + val q = df.writeStream .format("memory") .queryName("startingVersion_test") .start() @@ -1865,11 +1944,10 @@ class DeltaSourceSuite extends DeltaSourceSuiteBase .save(inputDir.getCanonicalPath) // check answer from version 1 - val q = spark.readStream - .format("delta") - .option("startingVersion", "1") - .load(inputDir.getCanonicalPath) - .writeStream + val q = loadStreamWithOptions( + inputDir.getCanonicalPath, + Map("startingVersion" -> "1") + ).writeStream .format("memory") .queryName("startingVersionTest") .start() @@ -1897,10 +1975,7 @@ class DeltaSourceSuite extends DeltaSourceSuiteBase withTempView("startingVersionTest") { val path = dir.getAbsolutePath spark.range(0, 10).write.format("delta").save(path) - val q = spark.readStream - .format("delta") - .option("startingVersion", "latest") - .load(path) + val q = loadStreamWithOptions(path, Map("startingVersion" -> "latest")) .writeStream .format("memory") .queryName("startingVersionLatest") @@ -1933,10 +2008,7 @@ class DeltaSourceSuite extends DeltaSourceSuiteBase // Define the stream, but don't start it, before a second write. The startingVersion // latest should be resolved when the query *starts*, so there'll be no data even though // some was added after the stream was defined. - val streamDef = spark.readStream - .format("delta") - .option("startingVersion", "latest") - .load(path) + val streamDef = loadStreamWithOptions(path, Map("startingVersion" -> "latest")) .writeStream .format("memory") .queryName("startingVersionLatest") @@ -1961,10 +2033,7 @@ class DeltaSourceSuite extends DeltaSourceSuiteBase withTempView("startingVersionTest") { val path = dir.getAbsolutePath spark.range(0).write.format("delta").save(path) - val streamDef = spark.readStream - .format("delta") - .option("startingVersion", "latest") - .load(path) + val streamDef = loadStreamWithOptions(path, Map("startingVersion" -> "latest")) .writeStream .format("memory") .queryName("startingVersionLatest") @@ -1989,11 +2058,10 @@ class DeltaSourceSuite extends DeltaSourceSuiteBase val path = dir.getAbsolutePath spark.range(0).write.format("delta").save(path) - val streamDef = spark.readStream - .format("delta") - .option("startingVersion", "latest") - .load(path) - .writeStream + val streamDef = loadStreamWithOptions( + path, + Map("startingVersion" -> "latest") + ).writeStream .format("memory") .queryName("startingVersionLatest") val log = DeltaLog.forTable(spark, path) @@ -2029,12 +2097,13 @@ class DeltaSourceSuite extends DeltaSourceSuiteBase spark.range(0, 5).repartition(2).write.mode("append").format("delta").save(path) spark.range(5, 10).repartition(2).write.mode("append").format("delta").save(path) - val q = spark.readStream - .format("delta") - .option("startingVersion", 1) - .option("maxFilesPerTrigger", 1) - .load(path) - .writeStream + val q = loadStreamWithOptions( + path, + Map( + "startingVersion" -> "1", + "maxFilesPerTrigger" -> "1" + ) + ).writeStream .format("memory") .queryName("startingVersionWithRateLimit") .start() diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSourceSuiteBase.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSourceSuiteBase.scala index f54ce5dc9f1..f80a58d1bac 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSourceSuiteBase.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSourceSuiteBase.scala @@ -26,10 +26,33 @@ import org.apache.spark.sql.delta.test.DeltaSQLTestUtils import org.apache.spark.sql.{Column, DataFrame} import org.apache.spark.sql.streaming.StreamTest import org.apache.spark.sql.types.StructType +import org.scalactic.source.Position +import org.scalatest.Tag + +/** + * Trait that provides abstraction for testing both DSv1 and DSv2 connectors. + */ +trait DeltaSourceConnectorTrait { + self: DeltaSQLTestUtils => + + protected def useDsv2: Boolean = false + + protected def loadStreamWithOptions(path: String, options: Map[String, String]): DataFrame = { + val reader = spark.readStream + options.foreach { case (k, v) => reader.option(k, v) } + if (useDsv2) { + // This will route through DeltaCatalog which checks V2_ENABLE_MODE + reader.table(s"delta.`$path`") + } else { + reader.format("delta").load(path) + } + } +} trait DeltaSourceSuiteBase extends StreamTest with DeltaSQLTestUtils - with CoordinatedCommitsBaseSuite { + with CoordinatedCommitsBaseSuite + with DeltaSourceConnectorTrait { /** * Creates 3 temporary directories for use within a function.