diff --git a/build.sbt b/build.sbt index 69e44d9c9df..0e37fc9feaf 100644 --- a/build.sbt +++ b/build.sbt @@ -520,17 +520,21 @@ lazy val spark = (project in file("spark-unified")) // Set Test baseDirectory before sparkDependentSettings() so it uses the correct directory Test / baseDirectory := (sparkV1 / baseDirectory).value, - // Test sources from spark/ directory (sparkV1's directory) - // MUST be set BEFORE sparkDependentSettings() to avoid overwriting version-specific directories + // Test sources from spark/ directory (sparkV1's directory) AND spark-unified's own directory + // MUST be set BEFORE crossSparkSettings() to avoid overwriting version-specific directories Test / unmanagedSourceDirectories := { val sparkDir = (sparkV1 / baseDirectory).value + val unifiedDir = baseDirectory.value Seq( sparkDir / "src" / "test" / "scala", - sparkDir / "src" / "test" / "java" + sparkDir / "src" / "test" / "java", + unifiedDir / "src" / "test" / "scala", + unifiedDir / "src" / "test" / "java" ) }, Test / unmanagedResourceDirectories := Seq( - (sparkV1 / baseDirectory).value / "src" / "test" / "resources" + (sparkV1 / baseDirectory).value / "src" / "test" / "resources", + baseDirectory.value / "src" / "test" / "resources" ), CrossSparkVersions.sparkDependentSettings(sparkVersion), diff --git a/kernel-spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConfV2.scala b/kernel-spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConfV2.scala new file mode 100644 index 00000000000..f42f1a44048 --- /dev/null +++ b/kernel-spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConfV2.scala @@ -0,0 +1,47 @@ +/* + * Copyright (2025) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta.sources + +/** + * SQL configurations for Delta V2 connector (Kernel-based connector). + */ +object DeltaSQLConfV2 extends DeltaSQLConfUtils { + + /** + * Controls which connector implementation to use for Delta table operations. + * + * Valid values: + * - NONE: V2 connector is disabled, always use V1 connector (DeltaTableV2) - default + * - STRICT: V2 connector is strictly enforced, always use V2 connector (Kernel SparkTable). + * Intended for testing V2 connector capabilities + * + * V1 vs V2 Connectors: + * - V1 Connector (DeltaTableV2): Legacy Delta connector with full read/write support, + * uses DeltaLog for metadata management + * - V2 Connector (SparkTable): New Kernel-based connector with read-only support, + * uses Kernel's Table API for metadata management + */ + val V2_ENABLE_MODE = + buildConf("v2.enableMode") + .doc( + "Controls the Delta V2 connector enable mode. " + + "Valid values: NONE (disabled, default), STRICT (should ONLY be enabled for testing).") + .stringConf + .checkValues(Set("NONE", "STRICT")) + .createWithDefault("NONE") +} + diff --git a/spark-unified/src/main/java/org/apache/spark/sql/delta/catalog/DeltaCatalog.java b/spark-unified/src/main/java/org/apache/spark/sql/delta/catalog/DeltaCatalog.java index a67fa03ce1d..1bded769be0 100644 --- a/spark-unified/src/main/java/org/apache/spark/sql/delta/catalog/DeltaCatalog.java +++ b/spark-unified/src/main/java/org/apache/spark/sql/delta/catalog/DeltaCatalog.java @@ -16,6 +16,16 @@ package org.apache.spark.sql.delta.catalog; +import io.delta.kernel.spark.table.SparkTable; +import org.apache.spark.sql.delta.sources.DeltaSQLConfV2; +import java.util.HashMap; +import java.util.function.Supplier; +import org.apache.hadoop.fs.Path; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.catalyst.catalog.CatalogTable; +import org.apache.spark.sql.connector.catalog.Identifier; +import org.apache.spark.sql.connector.catalog.Table; + /** * A Spark catalog plugin for Delta Lake tables that implements the Spark DataSource V2 Catalog API. * @@ -46,9 +56,83 @@ * *

The unified module can access both implementations:

* + * + *

See {@link DeltaSQLConfV2#V2_ENABLE_MODE} for V1 vs V2 connector definitions and enable mode configuration.

*/ public class DeltaCatalog extends AbstractDeltaCatalog { + + /** + * Creates a catalog-based Delta table. + * + *

Routing logic based on {@link DeltaSQLConfV2#V2_ENABLE_MODE}: + *

+ * + * @param ident Table identifier + * @param catalogTable Catalog table metadata + * @return Table instance (SparkTable for V2, DeltaTableV2 for V1) + */ + @Override + public Table newDeltaCatalogBasedTable(Identifier ident, CatalogTable catalogTable) { + return createBasedOnV2Mode( + () -> new SparkTable(ident, catalogTable, new HashMap<>()), + () -> super.newDeltaCatalogBasedTable(ident, catalogTable)); + } + + /** + * Creates a path-based Delta table. + * + *

Routing logic based on {@link DeltaSQLConfV2#V2_ENABLE_MODE}: + *

+ * + * @param ident Table identifier containing table path + * @return Table instance (SparkTable for V2, DeltaTableV2 for V1) + */ + @Override + public Table newDeltaPathTable(Identifier ident) { + return createBasedOnV2Mode( + // delta.`/path/to/table`, where ident.name() is `/path/to/table` + () -> new SparkTable(ident, ident.name()), + () -> super.newDeltaPathTable(ident)); + } + + /** + * Routes table creation based on Delta V2 connector enable mode configuration. + * + *

This method checks {@link DeltaSQLConfV2#V2_ENABLE_MODE} and delegates to the + * appropriate supplier: + *

+ * + *

See {@link DeltaSQLConfV2#V2_ENABLE_MODE} for detailed V1 vs V2 connector definitions. + * + * @param v2ConnectorSupplier Supplier for V2 connector (Kernel SparkTable) - used in STRICT mode + * @param v1ConnectorSupplier Supplier for V1 connector (DeltaTableV2) - used in NONE mode (default) + * @return Table instance from the selected supplier + */ + private Table createBasedOnV2Mode( + Supplier v2ConnectorSupplier, + Supplier
v1ConnectorSupplier) { + String mode = + spark() + .conf() + .get(DeltaSQLConfV2.V2_ENABLE_MODE().key(), + DeltaSQLConfV2.V2_ENABLE_MODE().defaultValueString()); + switch (mode.toUpperCase()) { + case "STRICT": + return v2ConnectorSupplier.get(); + default: + return v1ConnectorSupplier.get(); + } + } } diff --git a/spark-unified/src/test/scala/org/apache/spark/sql/delta/DataFrameWriterV2WithV2ConnectorSuite.scala b/spark-unified/src/test/scala/org/apache/spark/sql/delta/DataFrameWriterV2WithV2ConnectorSuite.scala new file mode 100644 index 00000000000..20376f7b530 --- /dev/null +++ b/spark-unified/src/test/scala/org/apache/spark/sql/delta/DataFrameWriterV2WithV2ConnectorSuite.scala @@ -0,0 +1,70 @@ +/* + * Copyright (2025) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta + +import org.apache.spark.sql.delta.test.V2ForceTest + +/** + * Test suite that runs OpenSourceDataFrameWriterV2Tests with Delta V2 connector + * mode forced to STRICT. + */ +class DataFrameWriterV2WithV2ConnectorSuite + extends OpenSourceDataFrameWriterV2Tests + with V2ForceTest { + + /** + * Skip tests that require write operations after initial table creation. + * + * Kernel's SparkTable (V2 connector) only implements SupportsRead, not SupportsWrite. + * Tests that perform append/replace operations after table creation are skipped. + */ + override protected def shouldSkipTest(testName: String): Boolean = { + val skippedTests = Set( + // Append operations - require SupportsWrite + "Append: basic append", + "Append: by name not position", + + // Overwrite operations - require SupportsWrite + "Overwrite: overwrite by expression: true", + "Overwrite: overwrite by expression: id = 3", + "Overwrite: by name not position", + + // OverwritePartitions operations - require SupportsWrite + "OverwritePartitions: overwrite conflicting partitions", + "OverwritePartitions: overwrite all rows if not partitioned", + "OverwritePartitions: by name not position", + + // Create operations - TODO: fix SparkTable's name() to match DeltaTableV2 + // SparkTable.name() returns simple table name, but tests expect catalog.schema.table format + "Create: basic behavior", + "Create: with using", + "Create: with property", + "Create: identity partitioned table", + "Create: fail if table already exists", + + // Replace operations - require SupportsWrite + "Replace: basic behavior", + "Replace: partitioned table", + + // CreateOrReplace operations - require SupportsWrite + "CreateOrReplace: table does not exist", + "CreateOrReplace: table exists" + ) + + skippedTests.contains(testName) + } +} diff --git a/spark-unified/src/test/scala/org/apache/spark/sql/delta/catalog/DeltaCatalogSuite.scala b/spark-unified/src/test/scala/org/apache/spark/sql/delta/catalog/DeltaCatalogSuite.scala new file mode 100644 index 00000000000..af1957aa43e --- /dev/null +++ b/spark-unified/src/test/scala/org/apache/spark/sql/delta/catalog/DeltaCatalogSuite.scala @@ -0,0 +1,83 @@ +/* + * Copyright (2025) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta.catalog + +import io.delta.kernel.spark.table.SparkTable +import org.apache.spark.sql.delta.sources.DeltaSQLConfV2 +import org.apache.spark.sql.delta.test.DeltaSQLCommandTest + +import java.io.File +import java.util.Locale + +/** + * Unit tests for DeltaCatalog's V2 connector routing logic. + * + * Verifies that DeltaCatalog correctly routes table loading based on + * DeltaSQLConfV2.V2_ENABLE_MODE: + * - STRICT mode: Kernel's SparkTable (V2 connector) + * - NONE mode (default): DeltaTableV2 (V1 connector) + */ +class DeltaCatalogSuite extends DeltaSQLCommandTest { + + private val modeTestCases = Seq( + ("STRICT", classOf[SparkTable], "Kernel SparkTable"), + ("NONE", classOf[DeltaTableV2], "DeltaTableV2") + ) + + modeTestCases.foreach { case (mode, expectedClass, description) => + test(s"catalog-based table with mode=$mode returns $description") { + withTempDir { tempDir => + val tableName = s"test_catalog_${mode.toLowerCase(Locale.ROOT)}" + val location = new File(tempDir, tableName).getAbsolutePath + + withSQLConf(DeltaSQLConfV2.V2_ENABLE_MODE.key -> mode) { + sql(s"CREATE TABLE $tableName (id INT, name STRING) USING delta LOCATION '$location'") + + val catalog = spark.sessionState.catalogManager.v2SessionCatalog + .asInstanceOf[DeltaCatalog] + val ident = org.apache.spark.sql.connector.catalog.Identifier + .of(Array("default"), tableName) + val table = catalog.loadTable(ident) + + assert(table.getClass == expectedClass, + s"Mode $mode should return ${expectedClass.getSimpleName}") + } + } + } + } + + modeTestCases.foreach { case (mode, expectedClass, description) => + test(s"path-based table with mode=$mode returns $description") { + withTempDir { tempDir => + val path = tempDir.getAbsolutePath + + withSQLConf(DeltaSQLConfV2.V2_ENABLE_MODE.key -> mode) { + sql(s"CREATE TABLE delta.`$path` (id INT, name STRING) USING delta") + + val catalog = spark.sessionState.catalogManager.v2SessionCatalog + .asInstanceOf[DeltaCatalog] + val ident = org.apache.spark.sql.connector.catalog.Identifier + .of(Array("delta"), path) + val table = catalog.loadTable(ident) + + assert(table.getClass == expectedClass, + s"Mode $mode should return ${expectedClass.getSimpleName} for path-based table") + } + } + } + } +} diff --git a/spark-unified/src/test/scala/org/apache/spark/sql/delta/test/V2ForceTest.scala b/spark-unified/src/test/scala/org/apache/spark/sql/delta/test/V2ForceTest.scala new file mode 100644 index 00000000000..cdd927a82db --- /dev/null +++ b/spark-unified/src/test/scala/org/apache/spark/sql/delta/test/V2ForceTest.scala @@ -0,0 +1,87 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta.test + +import org.apache.spark.SparkConf +import org.apache.spark.sql.delta.sources.DeltaSQLConfV2 +import org.scalatest.Tag +import org.scalactic.source.Position + +import scala.collection.mutable + +/** + * Trait that forces Delta V2 connector mode to STRICT, ensuring all operations + * use the Kernel-based SparkTable implementation (V2 connector) instead of + * DeltaTableV2 (V1 connector). + * + * See [[DeltaSQLConfV2.V2_ENABLE_MODE]] for V1 vs V2 connector definitions. + * + * Usage: + * {{{ + * class MyKernelTest extends MyOriginalSuite with V2ForceTest { + * override protected def shouldSkipTest(testName: String): Boolean = { + * testName.contains("unsupported feature") + * } + * } + * }}} + */ +trait V2ForceTest extends DeltaSQLCommandTest { + + private val testsRun: mutable.Set[String] = mutable.Set.empty + + /** + * Override `test` to apply the `shouldSkipTest` logic. + * Tests that should be skipped are converted to ignored tests. + */ + abstract override protected def test( + testName: String, + testTags: Tag*)(testFun: => Any)(implicit pos: Position): Unit = { + if (shouldSkipTest(testName)) { + super.ignore( + s"$testName - skipped for Kernel-based V2 connector (not yet supported)")(testFun) + } else { + super.test(testName, testTags: _*) { + testsRun.add(testName) + testFun + } + } + } + + /** + * Determine if a test should be skipped based on the test name. + * Subclasses should override this method to define their skip logic. + * By default, no tests are skipped. + * + * @param testName The name of the test + * @return true if the test should be skipped, false otherwise + */ + protected def shouldSkipTest(testName: String): Boolean = false + + /** + * Override `sparkConf` to set V2_ENABLE_MODE to "STRICT". + * This ensures all catalog operations use Kernel SparkTable (V2 connector). + */ + abstract override protected def sparkConf: SparkConf = { + super.sparkConf + .set(DeltaSQLConfV2.V2_ENABLE_MODE.key, "STRICT") + } + + override def afterAll(): Unit = { + super.afterAll() + } +} + diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/catalog/AbstractDeltaCatalog.scala b/spark/src/main/scala/org/apache/spark/sql/delta/catalog/AbstractDeltaCatalog.scala index 43a90fde574..45170449dca 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/catalog/AbstractDeltaCatalog.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/catalog/AbstractDeltaCatalog.scala @@ -237,11 +237,7 @@ class AbstractDeltaCatalog extends DelegatingCatalogExtension try { super.loadTable(ident) match { case v1: V1Table if DeltaTableUtils.isDeltaTable(v1.catalogTable) => - DeltaTableV2( - spark, - new Path(v1.catalogTable.location), - catalogTable = Some(v1.catalogTable), - tableIdentifier = Some(ident.toString)) + newDeltaCatalogBasedTable(ident, v1.catalogTable) case o => o } } catch { @@ -319,7 +315,16 @@ class AbstractDeltaCatalog extends DelegatingCatalogExtension } } - protected def newDeltaPathTable(ident: Identifier): DeltaTableV2 = { + + protected def newDeltaCatalogBasedTable(ident: Identifier, catalogTable: CatalogTable): Table = { + DeltaTableV2( + spark, + new Path(catalogTable.location), + catalogTable = Some(catalogTable), + tableIdentifier = Some(ident.toString)) + } + + protected def newDeltaPathTable(ident: Identifier): Table = { DeltaTableV2(spark, new Path(ident.name())) } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala index 0e24d89f1f0..c52cf55423f 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala @@ -27,14 +27,24 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.storage.StorageLevel /** - * [[SQLConf]] entries for Delta features. + * Utility trait providing common configuration building methods for Delta SQL configs. + * + * This trait contains only utility methods and constants, no actual config entries. + * It is designed to be extended by multiple configuration objects without causing + * duplicate config registration. */ -trait DeltaSQLConfBase { +trait DeltaSQLConfUtils { val SQL_CONF_PREFIX = "spark.databricks.delta" def buildConf(key: String): ConfigBuilder = SQLConf.buildConf(s"$SQL_CONF_PREFIX.$key") def buildStaticConf(key: String): ConfigBuilder = SQLConf.buildStaticConf(s"spark.databricks.delta.$key") +} + +/** + * [[SQLConf]] entries for Delta features. + */ +trait DeltaSQLConfBase extends DeltaSQLConfUtils { val RESOLVE_TIME_TRAVEL_ON_IDENTIFIER = buildConf("timeTravel.resolveOnIdentifier.enabled")