diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/catalog/AbstractDeltaCatalog.scala b/spark/src/main/scala/org/apache/spark/sql/delta/catalog/AbstractDeltaCatalog.scala index 4f0611bcdb1..d4640669bf1 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/catalog/AbstractDeltaCatalog.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/catalog/AbstractDeltaCatalog.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.delta.skipping.clustering.ClusteredTableUtils import org.apache.spark.sql.delta.skipping.clustering.temp.{ClusterBy, ClusterBySpec} import org.apache.spark.sql.delta.skipping.clustering.temp.{ClusterByTransform => TempClusterByTransform} import org.apache.spark.sql.delta.{ColumnWithDefaultExprUtils, DeltaConfigs, DeltaErrors, DeltaTableUtils} -import org.apache.spark.sql.delta.{DeltaLog, DeltaOptions, IdentityColumn} +import org.apache.spark.sql.delta.{DeltaOptions, IdentityColumn} import org.apache.spark.sql.delta.DeltaTableIdentifier.gluePermissionError import org.apache.spark.sql.delta.commands._ import org.apache.spark.sql.delta.constraints.{AddConstraint, DropConstraint} @@ -37,16 +37,16 @@ import org.apache.spark.sql.delta.logging.DeltaLogKeys import org.apache.spark.sql.delta.metering.DeltaLogging import org.apache.spark.sql.delta.redirect.RedirectFeature import org.apache.spark.sql.delta.schema.SchemaUtils +import org.apache.spark.sql.delta.serverSidePlanning.ServerSidePlannedTable import org.apache.spark.sql.delta.sources.{DeltaDataSource, DeltaSourceUtils, DeltaSQLConf} import org.apache.spark.sql.delta.stats.StatisticsCollection import org.apache.spark.sql.delta.tablefeatures.DropFeature import org.apache.spark.sql.delta.util.{Utils => DeltaUtils} import org.apache.spark.sql.delta.util.PartitionUtils -import org.apache.spark.sql.util.ScalaExtensions._ import org.apache.hadoop.fs.Path import org.apache.spark.SparkException -import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.MDC import org.apache.spark.sql.{AnalysisException, DataFrame, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchNamespaceException, NoSuchTableException, UnresolvedAttribute, UnresolvedFieldName, UnresolvedFieldPosition} @@ -235,7 +235,13 @@ class AbstractDeltaCatalog extends DelegatingCatalogExtension override def loadTable(ident: Identifier): Table = recordFrameProfile( "DeltaCatalog", "loadTable") { try { - super.loadTable(ident) match { + val table = super.loadTable(ident) + + ServerSidePlannedTable.tryCreate(spark, ident, table, isUnityCatalog).foreach { sspt => + return sspt + } + + table match { case v1: V1Table if DeltaTableUtils.isDeltaTable(v1.catalogTable) => loadCatalogTable(ident, v1.catalogTable) case o => o diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/serverSidePlanning/ServerSidePlannedTable.scala b/spark/src/main/scala/org/apache/spark/sql/delta/serverSidePlanning/ServerSidePlannedTable.scala new file mode 100644 index 00000000000..fdf849a8550 --- /dev/null +++ b/spark/src/main/scala/org/apache/spark/sql/delta/serverSidePlanning/ServerSidePlannedTable.scala @@ -0,0 +1,359 @@ +/* + * Copyright (2025) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta.serverSidePlanning + +import java.util +import java.util.Locale + +import scala.collection.JavaConverters._ + +import org.apache.spark.paths.SparkPath +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.connector.catalog.Identifier +import org.apache.spark.sql.delta.metering.DeltaLogging +import org.apache.spark.sql.delta.sources.DeltaSQLConf +import org.apache.spark.sql.connector.catalog.{SupportsRead, Table, TableCapability} +import org.apache.spark.sql.connector.read._ +import org.apache.spark.sql.execution.datasources.{FileFormat, PartitionedFile} +import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat +import org.apache.spark.sql.sources.{And, Filter} +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +/** + * Companion object for ServerSidePlannedTable with factory methods. + */ +object ServerSidePlannedTable extends DeltaLogging { + /** + * Property keys that indicate table credentials are available. + * Unity Catalog tables may expose temporary credentials via these properties. + */ + private val CREDENTIAL_PROPERTY_KEYS = Seq( + "storage.credential", + "aws.temporary.credentials", + "azure.temporary.credentials", + "gcs.temporary.credentials", + "credential" + ) + + /** + * Determine if server-side planning should be used based on catalog type, + * credential availability, and configuration. + * + * Decision logic: + * - Use server-side planning if forceServerSidePlanning is true (config override) + * - Use server-side planning if Unity Catalog table lacks credentials + * - Otherwise use normal table loading path + * + * @param isUnityCatalog Whether this is a Unity Catalog instance + * @param hasCredentials Whether the table has credentials available + * @param forceServerSidePlanning Whether to force server-side planning (config flag) + * @return true if server-side planning should be used + */ + private[serverSidePlanning] def shouldUseServerSidePlanning( + isUnityCatalog: Boolean, + hasCredentials: Boolean, + forceServerSidePlanning: Boolean): Boolean = { + (isUnityCatalog && !hasCredentials) || forceServerSidePlanning + } + + /** + * Try to create a ServerSidePlannedTable if server-side planning is needed. + * Returns None if not needed or if the planning client factory is not available. + * + * This method encapsulates all the logic to decide whether to use server-side planning: + * - Checks if Unity Catalog table lacks credentials + * - Checks if server-side planning is forced via config (for testing) + * - Extracts catalog name and table identifiers + * - Attempts to create the planning client + * + * Test coverage: ServerSidePlanningSuite tests verify the decision logic through + * shouldUseServerSidePlanning() method with different input combinations. + * + * @param spark The SparkSession + * @param ident The table identifier + * @param table The loaded table from the delegate catalog + * @param isUnityCatalog Whether this is a Unity Catalog instance + * @return Some(ServerSidePlannedTable) if server-side planning should be used, None otherwise + */ + def tryCreate( + spark: SparkSession, + ident: Identifier, + table: Table, + isUnityCatalog: Boolean): Option[ServerSidePlannedTable] = { + // Check if we should force server-side planning (for testing) + val forceServerSidePlanning = + spark.conf.get(DeltaSQLConf.ENABLE_SERVER_SIDE_PLANNING.key, "false").toBoolean + val hasTableCredentials = hasCredentials(table) + + // Check if we should use server-side planning + if (shouldUseServerSidePlanning(isUnityCatalog, hasTableCredentials, forceServerSidePlanning)) { + val namespace = ident.namespace().mkString(".") + val tableName = ident.name() + + // Extract catalog name from identifier namespace, or default to spark_catalog + // + // Spark Identifier structure: + // - For "catalog.database.table": namespace() = ["catalog", "database"], name() = "table" + // - For "database.table": namespace() = ["database"], name() = "table" + // - For "table": namespace() = [], name() = "table" + // + // Note: We check namespace().length > 1 (not >= 1) because a single-element namespace + // represents just the database name without an explicit catalog, so we use the default. + // See Spark's LookupCatalog, CatalogAndIdentifier and ResolveSessionCatalog. + val catalogName = if (ident.namespace().length > 1) { + ident.namespace().head + } else { + "spark_catalog" + } + + // Try to create ServerSidePlannedTable with server-side planning + try { + val client = ServerSidePlanningClientFactory.buildForCatalog(spark, catalogName) + Some(new ServerSidePlannedTable(spark, namespace, tableName, table.schema(), client)) + } catch { + case _: IllegalStateException => + // Factory not registered - fall through to normal path + logWarning(s"Server-side planning not available for catalog $catalogName. " + + "Falling back to normal table loading.") + None + } + } else { + None + } + } + + /** + * Check if a table has credentials available. + * Unity Catalog tables may lack credentials when accessed without proper permissions. + * UC injects credentials as table properties, see: + * https://github.com/unitycatalog/unitycatalog/blob/main/connectors/spark/src/main/scala/ + * io/unitycatalog/spark/UCSingleCatalog.scala#L260 + */ + private def hasCredentials(table: Table): Boolean = { + // Check table properties for credential information + val properties = table.properties() + CREDENTIAL_PROPERTY_KEYS.exists(key => properties.containsKey(key)) + } +} + +/** + * A Spark Table implementation that uses server-side scan planning + * to get the list of files to read. Used as a fallback when Unity Catalog + * doesn't provide credentials. + * + * Similar to DeltaTableV2, we accept SparkSession as a constructor parameter + * since Tables are created on the driver and are not serialized to executors. + */ +class ServerSidePlannedTable( + spark: SparkSession, + databaseName: String, + tableName: String, + tableSchema: StructType, + planningClient: ServerSidePlanningClient) + extends Table with SupportsRead with DeltaLogging { + + // Returns fully qualified name (e.g., "catalog.database.table"). + // The databaseName parameter receives ident.namespace().mkString(".") from DeltaCatalog, + // which includes the catalog name when present, similar to DeltaTableV2's name() method. + override def name(): String = s"$databaseName.$tableName" + + override def schema(): StructType = tableSchema + + override def capabilities(): util.Set[TableCapability] = { + Set(TableCapability.BATCH_READ).asJava + } + + override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = { + new ServerSidePlannedScanBuilder(spark, databaseName, tableName, tableSchema, planningClient) + } +} + +/** + * ScanBuilder that uses ServerSidePlanningClient to plan the scan. + * Implements SupportsPushDownFilters to enable WHERE clause pushdown to the server. + */ +class ServerSidePlannedScanBuilder( + spark: SparkSession, + databaseName: String, + tableName: String, + tableSchema: StructType, + planningClient: ServerSidePlanningClient) + extends ScanBuilder with SupportsPushDownFilters { + + // Filters that have been pushed down and will be sent to the server + private var _pushedFilters: Array[Filter] = Array.empty + + override def pushFilters(filters: Array[Filter]): Array[Filter] = { + // Store filters to send to catalog, but return all as residuals. + // Since we don't know what the catalog can handle yet, we conservatively claim we handle + // none. Even if the catalog applies some filters, Spark will redundantly re-apply them. + // This is inefficient but guarantees correct results. The alternative (claiming we handle + // filters we don't support) would produce wrong output, which is unacceptable. + // TODO: Quantify performance impact and add residual filter handling with catalog capabilities. + _pushedFilters = filters + filters // Return all as residuals + } + + override def pushedFilters(): Array[Filter] = _pushedFilters + + override def build(): Scan = { + new ServerSidePlannedScan( + spark, databaseName, tableName, tableSchema, planningClient, _pushedFilters) + } +} + +/** + * Scan implementation that calls the server-side planning API to get file list. + */ +class ServerSidePlannedScan( + spark: SparkSession, + databaseName: String, + tableName: String, + tableSchema: StructType, + planningClient: ServerSidePlanningClient, + pushedFilters: Array[Filter]) extends Scan with Batch { + + override def readSchema(): StructType = tableSchema + + override def toBatch: Batch = this + + // Convert pushed filters to a single Spark Filter for the API call. + // If no filters, pass None. If filters exist, combine them into a single filter. + private val combinedFilter: Option[Filter] = { + if (pushedFilters.isEmpty) { + None + } else if (pushedFilters.length == 1) { + Some(pushedFilters.head) + } else { + // Combine multiple filters with And + Some(pushedFilters.reduce((left, right) => And(left, right))) + } + } + + override def planInputPartitions(): Array[InputPartition] = { + // Call the server-side planning API to get the scan plan + val scanPlan = planningClient.planScan(databaseName, tableName, combinedFilter) + + // Convert each file to an InputPartition + scanPlan.files.map { file => + ServerSidePlannedFileInputPartition(file.filePath, file.fileSizeInBytes, file.fileFormat) + }.toArray + } + + override def createReaderFactory(): PartitionReaderFactory = { + new ServerSidePlannedFilePartitionReaderFactory(spark, tableSchema) + } +} + +/** + * InputPartition representing a single file from the server-side scan plan. + */ +case class ServerSidePlannedFileInputPartition( + filePath: String, + fileSizeInBytes: Long, + fileFormat: String) extends InputPartition + +/** + * Factory for creating PartitionReaders that read server-side planned files. + * Builds reader functions on the driver for Parquet files. + */ +class ServerSidePlannedFilePartitionReaderFactory( + spark: SparkSession, + tableSchema: StructType) + extends PartitionReaderFactory { + + import org.apache.spark.util.SerializableConfiguration + + // scalastyle:off deltahadoopconfiguration + // We use sessionState.newHadoopConf() here instead of deltaLog.newDeltaHadoopConf(). + // This means DataFrame options (like custom S3 credentials) passed by users will NOT be + // included in the Hadoop configuration. This is intentional: + // - Server-side planning uses server-provided credentials, not user-specified credentials + // - ServerSidePlannedTable is NOT a Delta table, so we don't want Delta-specific options + // from deltaLog.newDeltaHadoopConf() + // - General Spark options from spark.hadoop.* are included and work for all tables + private val hadoopConf = new SerializableConfiguration(spark.sessionState.newHadoopConf()) + // scalastyle:on deltahadoopconfiguration + + // Pre-build reader function for Parquet on the driver + // This function will be serialized and sent to executors + private val parquetReaderBuilder = new ParquetFileFormat().buildReaderWithPartitionValues( + sparkSession = spark, + dataSchema = tableSchema, + partitionSchema = StructType(Nil), + requiredSchema = tableSchema, + filters = Seq.empty, + options = Map( + FileFormat.OPTION_RETURNING_BATCH -> "false" + ), + hadoopConf = hadoopConf.value + ) + + override def createReader(partition: InputPartition): PartitionReader[InternalRow] = { + val filePartition = partition.asInstanceOf[ServerSidePlannedFileInputPartition] + + // Verify file format is Parquet + // Scalastyle suppression needed: the caselocale regex incorrectly flags even correct usage + // of toLowerCase(Locale.ROOT). Similar to PartitionUtils.scala and SchemaUtils.scala. + // scalastyle:off caselocale + if (filePartition.fileFormat.toLowerCase(Locale.ROOT) != "parquet") { + // scalastyle:on caselocale + throw new UnsupportedOperationException( + s"File format '${filePartition.fileFormat}' is not supported. Only Parquet is supported.") + } + + new ServerSidePlannedFilePartitionReader(filePartition, parquetReaderBuilder) + } +} + +/** + * PartitionReader that reads a single file using a pre-built reader function. + * The reader function was created on the driver and is executed on the executor. + */ +class ServerSidePlannedFilePartitionReader( + partition: ServerSidePlannedFileInputPartition, + readerBuilder: PartitionedFile => Iterator[InternalRow]) + extends PartitionReader[InternalRow] { + + // Create PartitionedFile for this file + private val partitionedFile = PartitionedFile( + partitionValues = InternalRow.empty, + filePath = SparkPath.fromPathString(partition.filePath), + start = 0, + length = partition.fileSizeInBytes + ) + + // Call the pre-built reader function with our PartitionedFile + // This happens on the executor and doesn't need SparkSession + private lazy val readerIterator: Iterator[InternalRow] = { + readerBuilder(partitionedFile) + } + + override def next(): Boolean = { + readerIterator.hasNext + } + + override def get(): InternalRow = { + readerIterator.next() + } + + override def close(): Unit = { + // Reader cleanup is handled by Spark + } +} diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/serverSidePlanning/ServerSidePlanningClient.scala b/spark/src/main/scala/org/apache/spark/sql/delta/serverSidePlanning/ServerSidePlanningClient.scala new file mode 100644 index 00000000000..30d168fb579 --- /dev/null +++ b/spark/src/main/scala/org/apache/spark/sql/delta/serverSidePlanning/ServerSidePlanningClient.scala @@ -0,0 +1,130 @@ +/* + * Copyright (2025) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta.serverSidePlanning + +import org.apache.spark.sql.SparkSession + +/** + * Simple data class representing a file to scan. + * No dependencies on Iceberg types. + */ +case class ScanFile( + filePath: String, + fileSizeInBytes: Long, + fileFormat: String // "parquet", "orc", etc. +) + +/** + * Result of a table scan plan operation. + */ +case class ScanPlan( + files: Seq[ScanFile] +) + +/** + * Interface for planning table scans via server-side planning (e.g., Iceberg REST catalog). + * This interface is intentionally simple and has no dependencies + * on Iceberg libraries, allowing it to live in delta-spark module. + * + * Filter Conversion Pattern: + * This interface uses Spark's standard `org.apache.spark.sql.sources.Filter` as the universal + * representation for filter pushdown. This keeps the interface catalog-agnostic while allowing + * each catalog implementation to convert filters to their own native format: + * - Iceberg catalogs: Convert Spark Filter to Iceberg Expression (for REST API) + * - Unity Catalog: Convert Spark Filter to UC's filter format + * - Other catalogs: Implement their own conversion logic as needed + * + * Each catalog's implementation module (e.g., `iceberg/`) provides its own converter utility + * as a private implementation detail. + * + * Note: Server-side planning only supports reading the current snapshot. + */ +trait ServerSidePlanningClient { + /** + * Plan a table scan and return the list of files to read. + * + * @param databaseName The database or schema name + * @param table The table name + * @param filter Optional filter expression to push down to server (Spark Filter format). + * Each catalog implementation is responsible for converting this to their + * native filter format (e.g., Iceberg Expression, UC filter format, etc.) + * @return ScanPlan containing files to read + */ + def planScan( + databaseName: String, + table: String, + filter: Option[org.apache.spark.sql.sources.Filter] = None): ScanPlan +} + +/** + * Factory for creating ServerSidePlanningClient instances. + * This allows for configurable implementations (REST, mock, Spark-based, etc.) + */ +private[serverSidePlanning] trait ServerSidePlanningClientFactory { + /** + * Create a client for a specific catalog by reading catalog-specific configuration. + * This method reads configuration from spark.sql.catalog..uri and + * spark.sql.catalog..token. + * + * @param spark The SparkSession + * @param catalogName The name of the catalog (e.g., "spark_catalog", "unity") + * @return A ServerSidePlanningClient configured for the specified catalog + */ + def buildForCatalog(spark: SparkSession, catalogName: String): ServerSidePlanningClient +} + +/** + * Registry for client factories. Can be configured for testing or to provide + * production implementations (e.g., IcebergRESTCatalogPlanningClientFactory). + * + * By default, no factory is registered. Production code should register an appropriate + * factory implementation before attempting to create clients. + */ +private[serverSidePlanning] object ServerSidePlanningClientFactory { + @volatile private var registeredFactory: Option[ServerSidePlanningClientFactory] = None + + /** + * Set a factory for production use or testing. + */ + private[serverSidePlanning] def setFactory(factory: ServerSidePlanningClientFactory): Unit = { + registeredFactory = Some(factory) + } + + /** + * Clear the registered factory. + */ + private[serverSidePlanning] def clearFactory(): Unit = { + registeredFactory = None + } + + /** + * Get a client for a specific catalog using the registered factory. + * This is the single public entry point for obtaining a ServerSidePlanningClient. + * + * @param spark The SparkSession + * @param catalogName The name of the catalog (e.g., "spark_catalog", "unity") + * @return A ServerSidePlanningClient configured for the specified catalog + * @throws IllegalStateException if no factory has been registered + */ + def getClient(spark: SparkSession, catalogName: String): ServerSidePlanningClient = { + registeredFactory.getOrElse { + throw new IllegalStateException( + "No ServerSidePlanningClientFactory has been registered. " + + "Call ServerSidePlanningClientFactory.setFactory() to register an implementation.") + }.buildForCatalog(spark, catalogName) + } +} diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala index c52cf55423f..81e9322b18a 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala @@ -2905,6 +2905,15 @@ trait DeltaSQLConfBase extends DeltaSQLConfUtils { |When enabled, it's decided by a per-command flag.""".stripMargin) .booleanConf .createWithDefault(false) + + val ENABLE_SERVER_SIDE_PLANNING = + buildConf("catalog.enableServerSidePlanning") + .internal() + .doc( + """When enabled, DeltaCatalog will use server-side scan planning path + |instead of normal table loading.""".stripMargin) + .booleanConf + .createWithDefault(false) } object DeltaSQLConf extends DeltaSQLConfBase diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/serverSidePlanning/ServerSidePlannedTableSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/serverSidePlanning/ServerSidePlannedTableSuite.scala new file mode 100644 index 00000000000..afaa64302aa --- /dev/null +++ b/spark/src/test/scala/org/apache/spark/sql/delta/serverSidePlanning/ServerSidePlannedTableSuite.scala @@ -0,0 +1,327 @@ +/* + * Copyright (2025) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta.serverSidePlanning + +import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SparkSession} +import org.apache.spark.sql.delta.sources.DeltaSQLConf +import org.apache.spark.sql.delta.test.DeltaSQLCommandTest + +/** + * Test client that captures the filter passed to planScan() for verification. + * Stores captured filter in thread-local variable accessible via companion object. + */ +class FilterCapturingTestClient(spark: SparkSession) extends ServerSidePlanningClient { + override def planScan( + database: String, + table: String, + filter: Option[org.apache.spark.sql.sources.Filter] = None): ScanPlan = { + // Capture the filter for test verification + FilterCapturingTestClient.capturedFilter.set(filter) + + // Delegate to TestServerSidePlanningClient for actual file discovery + new TestServerSidePlanningClient(spark).planScan(database, table, filter) + } +} + +object FilterCapturingTestClient { + private val capturedFilter = + new ThreadLocal[Option[org.apache.spark.sql.sources.Filter]]() + + def getCapturedFilter: Option[org.apache.spark.sql.sources.Filter] = capturedFilter.get() + def clearCapturedFilter(): Unit = capturedFilter.remove() +} + +/** + * Factory for creating FilterCapturingTestClient instances. + */ +class FilterCapturingTestClientFactory extends ServerSidePlanningClientFactory { + override def buildForCatalog( + spark: SparkSession, + catalogName: String): ServerSidePlanningClient = { + new FilterCapturingTestClient(spark) + } +} + +/** + * Tests for server-side planning with a mock client. + */ +class ServerSidePlannedTableSuite extends QueryTest with DeltaSQLCommandTest { + + override def beforeAll(): Unit = { + super.beforeAll() + // Create test database and shared table once for all tests + sql("CREATE DATABASE IF NOT EXISTS test_db") + sql(""" + CREATE TABLE test_db.shared_test ( + id INT, + name STRING, + value INT + ) USING parquet + """) + sql(""" + INSERT INTO test_db.shared_test (id, name, value) VALUES + (1, 'alpha', 10), + (2, 'beta', 20), + (3, 'gamma', 30) + """) + } + + /** + * Helper method to run tests with server-side planning enabled. + * Automatically sets up the test factory and config, then cleans up afterwards. + * This prevents test pollution from leaked configuration. + */ + private def withServerSidePlanningEnabled(f: => Unit): Unit = { + val originalConfig = spark.conf.getOption(DeltaSQLConf.ENABLE_SERVER_SIDE_PLANNING.key) + ServerSidePlanningClientFactory.setFactory(new TestServerSidePlanningClientFactory()) + spark.conf.set(DeltaSQLConf.ENABLE_SERVER_SIDE_PLANNING.key, "true") + try { + f + } finally { + // Reset factory + ServerSidePlanningClientFactory.clearFactory() + // Restore original config + originalConfig match { + case Some(value) => spark.conf.set(DeltaSQLConf.ENABLE_SERVER_SIDE_PLANNING.key, value) + case None => spark.conf.unset(DeltaSQLConf.ENABLE_SERVER_SIDE_PLANNING.key) + } + } + } + + test("full query through DeltaCatalog with server-side planning") { + // This test verifies server-side planning works end-to-end by checking: + // (1) DeltaCatalog returns ServerSidePlannedTable (not normal table) + // (2) Query execution returns correct results + // If both are true, the server-side planning client worked correctly - that's the only way + // ServerSidePlannedTable can read data. + + withServerSidePlanningEnabled { + // (1) Verify that DeltaCatalog actually returns ServerSidePlannedTable + val catalog = spark.sessionState.catalogManager.catalog("spark_catalog") + .asInstanceOf[org.apache.spark.sql.connector.catalog.TableCatalog] + val loadedTable = catalog.loadTable( + org.apache.spark.sql.connector.catalog.Identifier.of( + Array("test_db"), "shared_test")) + assert(loadedTable.isInstanceOf[ServerSidePlannedTable], + s"Expected ServerSidePlannedTable but got ${loadedTable.getClass.getName}") + + // (2) Execute query - should go through full server-side planning stack + checkAnswer( + sql("SELECT id, name, value FROM test_db.shared_test ORDER BY id"), + Seq( + Row(1, "alpha", 10), + Row(2, "beta", 20), + Row(3, "gamma", 30) + ) + ) + } + } + + test("verify normal path unchanged when feature disabled") { + // Explicitly disable server-side planning + spark.conf.set(DeltaSQLConf.ENABLE_SERVER_SIDE_PLANNING.key, "false") + + // Verify that DeltaCatalog returns normal table, not ServerSidePlannedTable + val catalog = spark.sessionState.catalogManager.catalog("spark_catalog") + .asInstanceOf[org.apache.spark.sql.connector.catalog.TableCatalog] + val loadedTable = catalog.loadTable( + org.apache.spark.sql.connector.catalog.Identifier.of( + Array("test_db"), "shared_test")) + assert(!loadedTable.isInstanceOf[ServerSidePlannedTable], + s"Expected normal table but got ServerSidePlannedTable when config is disabled") + } + + test("shouldUseServerSidePlanning() decision logic") { + // Case 1: Force flag enabled -> should always use server-side planning + assert(ServerSidePlannedTable.shouldUseServerSidePlanning( + isUnityCatalog = false, + hasCredentials = true, + forceServerSidePlanning = true), + "Should use server-side planning when force flag is true") + + // Case 2: Unity Catalog without credentials -> should use server-side planning + assert(ServerSidePlannedTable.shouldUseServerSidePlanning( + isUnityCatalog = true, + hasCredentials = false, + forceServerSidePlanning = false), + "Should use server-side planning for UC table without credentials") + + // Case 3: Unity Catalog with credentials -> should NOT use server-side planning + assert(!ServerSidePlannedTable.shouldUseServerSidePlanning( + isUnityCatalog = true, + hasCredentials = true, + forceServerSidePlanning = false), + "Should NOT use server-side planning for UC table with credentials") + + // Case 4: Non-UC catalog -> should NOT use server-side planning + assert(!ServerSidePlannedTable.shouldUseServerSidePlanning( + isUnityCatalog = false, + hasCredentials = true, + forceServerSidePlanning = false), + "Should NOT use server-side planning for non-UC catalog") + + assert(!ServerSidePlannedTable.shouldUseServerSidePlanning( + isUnityCatalog = false, + hasCredentials = false, + forceServerSidePlanning = false), + "Should NOT use server-side planning for non-UC catalog (even without credentials)") + } + + test("ServerSidePlannedTable is read-only") { + withTable("readonly_test") { + sql(""" + CREATE TABLE readonly_test ( + id INT, + data STRING + ) USING parquet + """) + + // First insert WITHOUT server-side planning should succeed + sql("INSERT INTO readonly_test VALUES (1, 'initial')") + checkAnswer( + sql("SELECT * FROM readonly_test"), + Seq(Row(1, "initial")) + ) + + // Try to insert WITH server-side planning enabled - should fail + withServerSidePlanningEnabled { + val exception = intercept[AnalysisException] { + sql("INSERT INTO readonly_test VALUES (2, 'should_fail')") + } + assert(exception.getMessage.contains("does not support append")) + } + + // Verify data unchanged - second insert didn't happen + checkAnswer( + sql("SELECT * FROM readonly_test"), + Seq(Row(1, "initial")) + ) + } + } + + test("filter pushdown - simple EqualTo filter") { + val originalConfig = spark.conf.getOption(DeltaSQLConf.ENABLE_SERVER_SIDE_PLANNING.key) + ServerSidePlanningClientFactory.setFactory(new FilterCapturingTestClientFactory()) + spark.conf.set(DeltaSQLConf.ENABLE_SERVER_SIDE_PLANNING.key, "true") + + try { + // Clear any previous captured filter + FilterCapturingTestClient.clearCapturedFilter() + + // Execute query with WHERE clause + sql("SELECT id, name, value FROM test_db.shared_test WHERE id = 2").collect() + + // Verify filter was captured + val capturedFilter = FilterCapturingTestClient.getCapturedFilter + assert(capturedFilter.isDefined, "Filter should be pushed down") + + // Spark may wrap EqualTo with IsNotNull check: And(IsNotNull("id"), EqualTo("id", 2)) + // We need to handle both cases + val filter = capturedFilter.get + val equalToFilter = filter match { + case and: org.apache.spark.sql.sources.And => + // Wrapped case - extract the EqualTo from the And + and.right match { + case eq: org.apache.spark.sql.sources.EqualTo => eq + case _ => and.left.asInstanceOf[org.apache.spark.sql.sources.EqualTo] + } + case eq: org.apache.spark.sql.sources.EqualTo => + // Unwrapped case + eq + case other => + fail(s"Expected EqualTo or And(IsNotNull, EqualTo) but got ${other.getClass.getName}") + } + + assert(equalToFilter.attribute == "id", + s"Expected attribute 'id' but got '${equalToFilter.attribute}'") + assert(equalToFilter.value == 2, s"Expected value 2 but got ${equalToFilter.value}") + } finally { + FilterCapturingTestClient.clearCapturedFilter() + ServerSidePlanningClientFactory.clearFactory() + originalConfig match { + case Some(value) => spark.conf.set(DeltaSQLConf.ENABLE_SERVER_SIDE_PLANNING.key, value) + case None => spark.conf.unset(DeltaSQLConf.ENABLE_SERVER_SIDE_PLANNING.key) + } + } + } + + test("filter pushdown - compound And filter") { + val originalConfig = spark.conf.getOption(DeltaSQLConf.ENABLE_SERVER_SIDE_PLANNING.key) + ServerSidePlanningClientFactory.setFactory(new FilterCapturingTestClientFactory()) + spark.conf.set(DeltaSQLConf.ENABLE_SERVER_SIDE_PLANNING.key, "true") + + try { + // Clear any previous captured filter + FilterCapturingTestClient.clearCapturedFilter() + + // Execute query with compound WHERE clause + sql("SELECT id, name, value FROM test_db.shared_test WHERE id > 1 AND value < 30").collect() + + // Verify filter was captured + val capturedFilter = FilterCapturingTestClient.getCapturedFilter + assert(capturedFilter.isDefined, "Filter should be pushed down") + + // Spark may wrap filters with IsNotNull checks in nested And structures: + // And(And(IsNotNull("id"), GreaterThan("id", 1)), And(IsNotNull("value"), LessThan("value", 30))) + // We just verify that the top-level is an And filter and contains the expected predicates + val filter = capturedFilter.get + assert(filter.isInstanceOf[org.apache.spark.sql.sources.And], + s"Expected And filter but got ${filter.getClass.getName}") + + // Convert filter to string and verify it contains both predicates + val filterStr = filter.toString + assert(filterStr.contains("GreaterThan") && filterStr.contains("id"), + s"Filter should contain GreaterThan on id: $filterStr") + assert(filterStr.contains("LessThan") && filterStr.contains("value"), + s"Filter should contain LessThan on value: $filterStr") + } finally { + FilterCapturingTestClient.clearCapturedFilter() + ServerSidePlanningClientFactory.clearFactory() + originalConfig match { + case Some(value) => spark.conf.set(DeltaSQLConf.ENABLE_SERVER_SIDE_PLANNING.key, value) + case None => spark.conf.unset(DeltaSQLConf.ENABLE_SERVER_SIDE_PLANNING.key) + } + } + } + + test("filter pushdown - no filter when no WHERE clause") { + val originalConfig = spark.conf.getOption(DeltaSQLConf.ENABLE_SERVER_SIDE_PLANNING.key) + ServerSidePlanningClientFactory.setFactory(new FilterCapturingTestClientFactory()) + spark.conf.set(DeltaSQLConf.ENABLE_SERVER_SIDE_PLANNING.key, "true") + + try { + // Clear any previous captured filter + FilterCapturingTestClient.clearCapturedFilter() + + // Execute query without WHERE clause + sql("SELECT id, name, value FROM test_db.shared_test").collect() + + // Verify no filter was pushed + // getCapturedFilter returns Option[Filter] - should be None when no WHERE clause + val capturedFilter = FilterCapturingTestClient.getCapturedFilter + assert(capturedFilter != null, "planScan should have been called") + assert(capturedFilter.isEmpty, s"Expected no filter (None) but got ${capturedFilter}") + } finally { + FilterCapturingTestClient.clearCapturedFilter() + ServerSidePlanningClientFactory.clearFactory() + originalConfig match { + case Some(value) => spark.conf.set(DeltaSQLConf.ENABLE_SERVER_SIDE_PLANNING.key, value) + case None => spark.conf.unset(DeltaSQLConf.ENABLE_SERVER_SIDE_PLANNING.key) + } + } + } +} diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/serverSidePlanning/TestServerSidePlanningClient.scala b/spark/src/test/scala/org/apache/spark/sql/delta/serverSidePlanning/TestServerSidePlanningClient.scala new file mode 100644 index 00000000000..eaa7878bb19 --- /dev/null +++ b/spark/src/test/scala/org/apache/spark/sql/delta/serverSidePlanning/TestServerSidePlanningClient.scala @@ -0,0 +1,98 @@ +/* + * Copyright (2025) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta.serverSidePlanning + +import java.util.Locale + +import org.apache.hadoop.fs.Path +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.delta.sources.DeltaSQLConf +import org.apache.spark.sql.functions.input_file_name + +/** + * Implementation of ServerSidePlanningClient that uses Spark SQL with input_file_name() + * to discover the list of files in a table. This allows end-to-end testing without + * a real server that can do server-side planning. + */ +class TestServerSidePlanningClient(spark: SparkSession) extends ServerSidePlanningClient { + + override def planScan( + databaseName: String, + table: String, + filter: Option[org.apache.spark.sql.sources.Filter] = None): ScanPlan = { + // Ignore filter parameter - test client doesn't apply filtering + // Filter verification is done via FilterCapturingTestClient in tests + val fullTableName = s"$databaseName.$table" + + // Temporarily disable server-side planning to avoid infinite recursion + // when this test client internally loads the table + val originalConfigValue = spark.conf.getOption(DeltaSQLConf.ENABLE_SERVER_SIDE_PLANNING.key) + spark.conf.set(DeltaSQLConf.ENABLE_SERVER_SIDE_PLANNING.key, "false") + + try { + // Use input_file_name() to get the list of files + // Query: SELECT DISTINCT input_file_name() FROM table + val filesDF = spark.table(fullTableName) + .select(input_file_name().as("file_path")) + .distinct() + + // Collect file paths + val filePaths = filesDF.collect().map(_.getString(0)) + + // Get file metadata (size, format) from filesystem + // scalastyle:off deltahadoopconfiguration + // The rule prevents accessing Hadoop conf on executors where it could use wrong credentials + // for multi-catalog scenarios. Safe here: test-only code simulating server filesystem access. + val hadoopConf = spark.sessionState.newHadoopConf() + // scalastyle:on deltahadoopconfiguration + val files = filePaths.map { filePath => + // input_file_name() returns URL-encoded paths, decode them + val decodedPath = java.net.URLDecoder.decode(filePath, "UTF-8") + val path = new Path(decodedPath) + val fs = path.getFileSystem(hadoopConf) + val fileStatus = fs.getFileStatus(path) + + ScanFile( + filePath = decodedPath, + fileSizeInBytes = fileStatus.getLen, + fileFormat = getFileFormat(path) + ) + }.toSeq + + ScanPlan(files = files) + } finally { + // Restore original config value + originalConfigValue match { + case Some(value) => spark.conf.set(DeltaSQLConf.ENABLE_SERVER_SIDE_PLANNING.key, value) + case None => spark.conf.unset(DeltaSQLConf.ENABLE_SERVER_SIDE_PLANNING.key) + } + } + } + + private def getFileFormat(path: Path): String = "parquet" +} + +/** + * Factory for creating TestServerSidePlanningClient instances. + */ +class TestServerSidePlanningClientFactory extends ServerSidePlanningClientFactory { + override def buildForCatalog( + spark: SparkSession, + catalogName: String): ServerSidePlanningClient = { + new TestServerSidePlanningClient(spark) + } +}