diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/serverSidePlanning/ServerSidePlannedTable.scala b/spark/src/main/scala/org/apache/spark/sql/delta/serverSidePlanning/ServerSidePlannedTable.scala index 6c2e6cc4303..bfd3c6516c2 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/serverSidePlanning/ServerSidePlannedTable.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/serverSidePlanning/ServerSidePlannedTable.scala @@ -105,38 +105,49 @@ object ServerSidePlannedTable extends DeltaLogging { val namespace = ident.namespace().mkString(".") val tableName = ident.name() - // Extract catalog name from identifier namespace, or default to spark_catalog - // - // Spark Identifier structure: - // - For "catalog.database.table": namespace() = ["catalog", "database"], name() = "table" - // - For "database.table": namespace() = ["database"], name() = "table" - // - For "table": namespace() = [], name() = "table" - // - // Note: We check namespace().length > 1 (not >= 1) because a single-element namespace - // represents just the database name without an explicit catalog, so we use the default. - // See Spark's LookupCatalog, CatalogAndIdentifier and ResolveSessionCatalog. - val catalogName = if (ident.namespace().length > 1) { - ident.namespace().head - } else { - "spark_catalog" - } + // Create metadata from table + val metadata = ServerSidePlanningMetadata.fromTable(table, spark, ident, isUnityCatalog) // Try to create ServerSidePlannedTable with server-side planning - try { - val client = ServerSidePlanningClientFactory.getClient(spark, catalogName) - Some(new ServerSidePlannedTable(spark, namespace, tableName, table.schema(), client)) - } catch { - case _: IllegalStateException => - // Factory not registered - fall through to normal path - logWarning(s"Server-side planning not available for catalog $catalogName. " + + val plannedTable = tryCreate(spark, namespace, tableName, table.schema(), metadata) + if (plannedTable.isEmpty) { + logWarning( + s"Server-side planning not available for catalog ${metadata.catalogName}. " + "Falling back to normal table loading.") - None } + plannedTable } else { None } } + /** + * Try to create a ServerSidePlannedTable with server-side planning. + * Returns None if the planning client factory is not available. + * + * @param spark The SparkSession + * @param databaseName The database name (may include catalog prefix) + * @param tableName The table name + * @param tableSchema The table schema + * @param metadata Metadata extracted from loadTable response + * @return Some(ServerSidePlannedTable) if successful, None if factory not registered + */ + private def tryCreate( + spark: SparkSession, + databaseName: String, + tableName: String, + tableSchema: StructType, + metadata: ServerSidePlanningMetadata): Option[ServerSidePlannedTable] = { + try { + val client = ServerSidePlanningClientFactory.buildClient(spark, metadata) + Some(new ServerSidePlannedTable(spark, databaseName, tableName, tableSchema, client)) + } catch { + case _: IllegalStateException => + // Factory not registered - this shouldn't happen in production but could during testing + None + } + } + /** * Check if a table has credentials available. * Unity Catalog tables may lack credentials when accessed without proper permissions. @@ -212,10 +223,10 @@ class ServerSidePlannedScan( override def toBatch: Batch = this - override def planInputPartitions(): Array[InputPartition] = { - // Call the server-side planning API to get the scan plan - val scanPlan = planningClient.planScan(databaseName, tableName) + // Call the server-side planning API once and store the result + private val scanPlan = planningClient.planScan(databaseName, tableName) + override def planInputPartitions(): Array[InputPartition] = { // Convert each file to an InputPartition scanPlan.files.map { file => ServerSidePlannedFileInputPartition(file.filePath, file.fileSizeInBytes, file.fileFormat) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/serverSidePlanning/ServerSidePlanningClient.scala b/spark/src/main/scala/org/apache/spark/sql/delta/serverSidePlanning/ServerSidePlanningClient.scala index 2472164858c..dc3fc1579c7 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/serverSidePlanning/ServerSidePlanningClient.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/serverSidePlanning/ServerSidePlanningClient.scala @@ -57,15 +57,15 @@ trait ServerSidePlanningClient { */ private[serverSidePlanning] trait ServerSidePlanningClientFactory { /** - * Create a client for a specific catalog by reading catalog-specific configuration. - * This method reads configuration from spark.sql.catalog..uri and - * spark.sql.catalog..token. + * Create a client using metadata necessary for server-side planning. * * @param spark The SparkSession - * @param catalogName The name of the catalog (e.g., "spark_catalog", "unity") - * @return A ServerSidePlanningClient configured for the specified catalog + * @param metadata Metadata necessary for server-side planning + * @return A ServerSidePlanningClient configured with the metadata */ - def buildForCatalog(spark: SparkSession, catalogName: String): ServerSidePlanningClient + def buildClient( + spark: SparkSession, + metadata: ServerSidePlanningMetadata): ServerSidePlanningClient } /** @@ -93,19 +93,23 @@ private[serverSidePlanning] object ServerSidePlanningClientFactory { } /** - * Get a client for a specific catalog using the registered factory. - * This is the single public entry point for obtaining a ServerSidePlanningClient. - * - * @param spark The SparkSession - * @param catalogName The name of the catalog (e.g., "spark_catalog", "unity") - * @return A ServerSidePlanningClient configured for the specified catalog - * @throws IllegalStateException if no factory has been registered + * Get the currently registered factory. + * Throws IllegalStateException if no factory has been registered. */ - def getClient(spark: SparkSession, catalogName: String): ServerSidePlanningClient = { + def getFactory(): ServerSidePlanningClientFactory = { registeredFactory.getOrElse { throw new IllegalStateException( "No ServerSidePlanningClientFactory has been registered. " + "Call ServerSidePlanningClientFactory.setFactory() to register an implementation.") - }.buildForCatalog(spark, catalogName) + } + } + + /** + * Convenience method to create a client from metadata using the registered factory. + */ + def buildClient( + spark: SparkSession, + metadata: ServerSidePlanningMetadata): ServerSidePlanningClient = { + getFactory().buildClient(spark, metadata) } } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/serverSidePlanning/ServerSidePlanningMetadata.scala b/spark/src/main/scala/org/apache/spark/sql/delta/serverSidePlanning/ServerSidePlanningMetadata.scala new file mode 100644 index 00000000000..80ce34453b3 --- /dev/null +++ b/spark/src/main/scala/org/apache/spark/sql/delta/serverSidePlanning/ServerSidePlanningMetadata.scala @@ -0,0 +1,87 @@ +/* + * Copyright (2025) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta.serverSidePlanning + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.connector.catalog.{Identifier, Table} + +/** + * Metadata required for creating a server-side planning client. + * + * This interface captures all information from the catalog's loadTable response + * that is needed to create and configure a ServerSidePlanningClient. + */ +private[serverSidePlanning] trait ServerSidePlanningMetadata { + /** + * The base URI for the planning endpoint. + */ + def planningEndpointUri: String + + /** + * Authentication token for the planning endpoint. + */ + def authToken: Option[String] + + /** + * Catalog name for configuration lookups. + */ + def catalogName: String + + /** + * Additional table properties that may be needed. + * For example, table UUID, credential hints, etc. + */ + def tableProperties: Map[String, String] +} + +/** + * Default metadata for non-UC catalogs. + * Used when server-side planning is force-enabled for testing/development. + */ +private[serverSidePlanning] case class DefaultMetadata( + catalogName: String, + tableProps: Map[String, String] = Map.empty) extends ServerSidePlanningMetadata { + override def planningEndpointUri: String = "" + override def authToken: Option[String] = None + override def tableProperties: Map[String, String] = tableProps +} + +object ServerSidePlanningMetadata { + /** + * Create metadata from a loaded table. + * + * Currently returns DefaultMetadata for all catalogs. + * Unity Catalog-specific implementation will be added in a later PR. + */ + def fromTable( + table: Table, + spark: SparkSession, + ident: Identifier, + isUnityCatalog: Boolean): ServerSidePlanningMetadata = { + + val catalogName = extractCatalogName(ident) + DefaultMetadata(catalogName, Map.empty) + } + + private def extractCatalogName(ident: Identifier): String = { + if (ident.namespace().length > 1) { + ident.namespace().head + } else { + "spark_catalog" + } + } +} diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/serverSidePlanning/ServerSidePlannedTableSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/serverSidePlanning/ServerSidePlannedTableSuite.scala index d064fb89486..fe8fb428cf7 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/serverSidePlanning/ServerSidePlannedTableSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/serverSidePlanning/ServerSidePlannedTableSuite.scala @@ -176,4 +176,25 @@ class ServerSidePlannedTableSuite extends QueryTest with DeltaSQLCommandTest { ) } } + + test("fromTable returns metadata with empty defaults for non-UC catalogs") { + import org.apache.spark.sql.connector.catalog.Identifier + + // Create a simple identifier for testing + val ident = Identifier.of(Array("my_catalog", "my_schema"), "my_table") + + // Call fromTable with a null table (we only use the identifier for catalog name extraction) + val metadata = ServerSidePlanningMetadata.fromTable( + table = null, + spark = spark, + ident = ident, + isUnityCatalog = false + ) + + // Verify the metadata has expected defaults + assert(metadata.catalogName == "my_catalog") + assert(metadata.planningEndpointUri == "") + assert(metadata.authToken.isEmpty) + assert(metadata.tableProperties.isEmpty) + } } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/serverSidePlanning/TestServerSidePlanningClient.scala b/spark/src/test/scala/org/apache/spark/sql/delta/serverSidePlanning/TestServerSidePlanningClient.scala index 2aa0b9e5454..bf861ac7132 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/serverSidePlanning/TestServerSidePlanningClient.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/serverSidePlanning/TestServerSidePlanningClient.scala @@ -16,8 +16,6 @@ package org.apache.spark.sql.delta.serverSidePlanning -import java.util.Locale - import org.apache.hadoop.fs.Path import org.apache.spark.sql.SparkSession import org.apache.spark.sql.delta.sources.DeltaSQLConf @@ -85,9 +83,9 @@ class TestServerSidePlanningClient(spark: SparkSession) extends ServerSidePlanni * Factory for creating TestServerSidePlanningClient instances. */ class TestServerSidePlanningClientFactory extends ServerSidePlanningClientFactory { - override def buildForCatalog( + override def buildClient( spark: SparkSession, - catalogName: String): ServerSidePlanningClient = { + metadata: ServerSidePlanningMetadata): ServerSidePlanningClient = { new TestServerSidePlanningClient(spark) } }