Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -520,17 +520,21 @@ lazy val spark = (project in file("spark-unified"))
// Set Test baseDirectory before sparkDependentSettings() so it uses the correct directory
Test / baseDirectory := (sparkV1 / baseDirectory).value,

// Test sources from spark/ directory (sparkV1's directory)
// MUST be set BEFORE sparkDependentSettings() to avoid overwriting version-specific directories
// Test sources from spark/ directory (sparkV1's directory) AND spark-unified's own directory
// MUST be set BEFORE crossSparkSettings() to avoid overwriting version-specific directories
Test / unmanagedSourceDirectories := {
val sparkDir = (sparkV1 / baseDirectory).value
val unifiedDir = baseDirectory.value
Seq(
sparkDir / "src" / "test" / "scala",
sparkDir / "src" / "test" / "java"
sparkDir / "src" / "test" / "java",
unifiedDir / "src" / "test" / "scala",
unifiedDir / "src" / "test" / "java"
)
},
Test / unmanagedResourceDirectories := Seq(
(sparkV1 / baseDirectory).value / "src" / "test" / "resources"
(sparkV1 / baseDirectory).value / "src" / "test" / "resources",
baseDirectory.value / "src" / "test" / "resources"
),

CrossSparkVersions.sparkDependentSettings(sparkVersion),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright (2025) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.delta.sources

/**
* SQL configurations for Delta V2 connector (Kernel-based connector).
*/
object DeltaSQLConfV2 extends DeltaSQLConfUtils {

/**
* Controls which connector implementation to use for Delta table operations.
*
* Valid values:
* - NONE: V2 connector is disabled, always use V1 connector (DeltaTableV2) - default
* - STRICT: V2 connector is strictly enforced, always use V2 connector (Kernel SparkTable).
* Intended for testing V2 connector capabilities
*
* V1 vs V2 Connectors:
* - V1 Connector (DeltaTableV2): Legacy Delta connector with full read/write support,
* uses DeltaLog for metadata management
* - V2 Connector (SparkTable): New Kernel-based connector with read-only support,
* uses Kernel's Table API for metadata management
*/
val V2_ENABLE_MODE =
buildConf("v2.enableMode")
.doc(
"Controls the Delta V2 connector enable mode. " +
"Valid values: NONE (disabled, default), STRICT (should ONLY be enabled for testing).")
.stringConf
.checkValues(Set("NONE", "STRICT"))
Copy link
Contributor

@tdas tdas Nov 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where did the AUTO mode go?

.createWithDefault("NONE")
}

Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,16 @@

package org.apache.spark.sql.delta.catalog;

import io.delta.kernel.spark.table.SparkTable;
import org.apache.spark.sql.delta.sources.DeltaSQLConfV2;
import java.util.HashMap;
import java.util.function.Supplier;
import org.apache.hadoop.fs.Path;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.catalog.CatalogTable;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.Table;

/**
* A Spark catalog plugin for Delta Lake tables that implements the Spark DataSource V2 Catalog API.
*
Expand Down Expand Up @@ -46,9 +56,83 @@
*
* <p>The unified module can access both implementations:</p>
* <ul>
* <li>V1 (hybrid DSv1/DSv2): org.apache.spark.sql.delta.catalog.DeltaTableV2 - Connector using DeltaLog</li>
* <li>V2 (Pure DSv2): io.delta.kernel.spark.SparkTable - Kernel-backed connector</li>
* <li>V1 connector: {@link DeltaTableV2} - Legacy connector using DeltaLog, full read/write support</li>
* <li>V2 connector: {@link SparkTable} - Kernel-backed connector, read-only support</li>
* </ul>
*
* <p>See {@link DeltaSQLConfV2#V2_ENABLE_MODE} for V1 vs V2 connector definitions and enable mode configuration.</p>
*/
public class DeltaCatalog extends AbstractDeltaCatalog {

/**
* Creates a catalog-based Delta table.
*
* <p>Routing logic based on {@link DeltaSQLConfV2#V2_ENABLE_MODE}:
* <ul>
* <li>STRICT: Returns Kernel {@link SparkTable} (V2 connector)</li>
* <li>NONE (default): Returns {@link DeltaTableV2} (V1 connector)</li>
* </ul>
*
* @param ident Table identifier
* @param catalogTable Catalog table metadata
* @return Table instance (SparkTable for V2, DeltaTableV2 for V1)
*/
@Override
public Table newDeltaCatalogBasedTable(Identifier ident, CatalogTable catalogTable) {
return createBasedOnV2Mode(
() -> new SparkTable(ident, catalogTable, new HashMap<>()),
() -> super.newDeltaCatalogBasedTable(ident, catalogTable));
}

/**
* Creates a path-based Delta table.
*
* <p>Routing logic based on {@link DeltaSQLConfV2#V2_ENABLE_MODE}:
* <ul>
* <li>STRICT: Returns Kernel {@link SparkTable} (V2 connector)</li>
* <li>NONE (default): Returns {@link DeltaTableV2} (V1 connector)</li>
* </ul>
*
* @param ident Table identifier containing table path
* @return Table instance (SparkTable for V2, DeltaTableV2 for V1)
*/
@Override
public Table newDeltaPathTable(Identifier ident) {
return createBasedOnV2Mode(
// delta.`/path/to/table`, where ident.name() is `/path/to/table`
() -> new SparkTable(ident, ident.name()),
() -> super.newDeltaPathTable(ident));
}

/**
* Routes table creation based on Delta V2 connector enable mode configuration.
*
* <p>This method checks {@link DeltaSQLConfV2#V2_ENABLE_MODE} and delegates to the
* appropriate supplier:
* <ul>
* <li>STRICT mode: Uses V2 connector (Kernel SparkTable) - for testing V2 capabilities</li>
* <li>NONE mode (default): Uses V1 connector (DeltaTableV2) - production default with full features</li>
* </ul>
*
* <p>See {@link DeltaSQLConfV2#V2_ENABLE_MODE} for detailed V1 vs V2 connector definitions.
*
* @param v2ConnectorSupplier Supplier for V2 connector (Kernel SparkTable) - used in STRICT mode
* @param v1ConnectorSupplier Supplier for V1 connector (DeltaTableV2) - used in NONE mode (default)
* @return Table instance from the selected supplier
*/
private Table createBasedOnV2Mode(
Supplier<Table> v2ConnectorSupplier,
Supplier<Table> v1ConnectorSupplier) {
String mode =
spark()
.conf()
.get(DeltaSQLConfV2.V2_ENABLE_MODE().key(),
DeltaSQLConfV2.V2_ENABLE_MODE().defaultValueString());
switch (mode.toUpperCase()) {
case "STRICT":
return v2ConnectorSupplier.get();
default:
return v1ConnectorSupplier.get();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright (2025) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.delta

import org.apache.spark.sql.delta.test.V2ForceTest

/**
* Test suite that runs OpenSourceDataFrameWriterV2Tests with Delta V2 connector
* mode forced to STRICT.
*/
class DataFrameWriterV2WithV2ConnectorSuite
extends OpenSourceDataFrameWriterV2Tests
with V2ForceTest {

/**
* Skip tests that require write operations after initial table creation.
*
* Kernel's SparkTable (V2 connector) only implements SupportsRead, not SupportsWrite.
* Tests that perform append/replace operations after table creation are skipped.
*/
override protected def shouldSkipTest(testName: String): Boolean = {
val skippedTests = Set(
// Append operations - require SupportsWrite
"Append: basic append",
"Append: by name not position",

// Overwrite operations - require SupportsWrite
"Overwrite: overwrite by expression: true",
"Overwrite: overwrite by expression: id = 3",
"Overwrite: by name not position",

// OverwritePartitions operations - require SupportsWrite
"OverwritePartitions: overwrite conflicting partitions",
"OverwritePartitions: overwrite all rows if not partitioned",
"OverwritePartitions: by name not position",

// Create operations - TODO: fix SparkTable's name() to match DeltaTableV2
// SparkTable.name() returns simple table name, but tests expect catalog.schema.table format
"Create: basic behavior",
"Create: with using",
"Create: with property",
"Create: identity partitioned table",
"Create: fail if table already exists",

// Replace operations - require SupportsWrite
"Replace: basic behavior",
"Replace: partitioned table",

// CreateOrReplace operations - require SupportsWrite
"CreateOrReplace: table does not exist",
"CreateOrReplace: table exists"
)

skippedTests.contains(testName)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Copyright (2025) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.delta.catalog

import io.delta.kernel.spark.table.SparkTable
import org.apache.spark.sql.delta.sources.DeltaSQLConfV2
import org.apache.spark.sql.delta.test.DeltaSQLCommandTest

import java.io.File
import java.util.Locale

/**
* Unit tests for DeltaCatalog's V2 connector routing logic.
*
* Verifies that DeltaCatalog correctly routes table loading based on
* DeltaSQLConfV2.V2_ENABLE_MODE:
* - STRICT mode: Kernel's SparkTable (V2 connector)
* - NONE mode (default): DeltaTableV2 (V1 connector)
*/
class DeltaCatalogSuite extends DeltaSQLCommandTest {

private val modeTestCases = Seq(
("STRICT", classOf[SparkTable], "Kernel SparkTable"),
("NONE", classOf[DeltaTableV2], "DeltaTableV2")
)

modeTestCases.foreach { case (mode, expectedClass, description) =>
test(s"catalog-based table with mode=$mode returns $description") {
withTempDir { tempDir =>
val tableName = s"test_catalog_${mode.toLowerCase(Locale.ROOT)}"
val location = new File(tempDir, tableName).getAbsolutePath

withSQLConf(DeltaSQLConfV2.V2_ENABLE_MODE.key -> mode) {
sql(s"CREATE TABLE $tableName (id INT, name STRING) USING delta LOCATION '$location'")

val catalog = spark.sessionState.catalogManager.v2SessionCatalog
.asInstanceOf[DeltaCatalog]
val ident = org.apache.spark.sql.connector.catalog.Identifier
.of(Array("default"), tableName)
val table = catalog.loadTable(ident)

assert(table.getClass == expectedClass,
s"Mode $mode should return ${expectedClass.getSimpleName}")
}
}
}
}

modeTestCases.foreach { case (mode, expectedClass, description) =>
test(s"path-based table with mode=$mode returns $description") {
withTempDir { tempDir =>
val path = tempDir.getAbsolutePath

withSQLConf(DeltaSQLConfV2.V2_ENABLE_MODE.key -> mode) {
sql(s"CREATE TABLE delta.`$path` (id INT, name STRING) USING delta")

val catalog = spark.sessionState.catalogManager.v2SessionCatalog
.asInstanceOf[DeltaCatalog]
val ident = org.apache.spark.sql.connector.catalog.Identifier
.of(Array("delta"), path)
val table = catalog.loadTable(ident)

assert(table.getClass == expectedClass,
s"Mode $mode should return ${expectedClass.getSimpleName} for path-based table")
}
}
}
}
}
Loading
Loading