Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -105,38 +105,49 @@ object ServerSidePlannedTable extends DeltaLogging {
val namespace = ident.namespace().mkString(".")
val tableName = ident.name()

// Extract catalog name from identifier namespace, or default to spark_catalog
//
// Spark Identifier structure:
// - For "catalog.database.table": namespace() = ["catalog", "database"], name() = "table"
// - For "database.table": namespace() = ["database"], name() = "table"
// - For "table": namespace() = [], name() = "table"
//
// Note: We check namespace().length > 1 (not >= 1) because a single-element namespace
// represents just the database name without an explicit catalog, so we use the default.
// See Spark's LookupCatalog, CatalogAndIdentifier and ResolveSessionCatalog.
val catalogName = if (ident.namespace().length > 1) {
ident.namespace().head
} else {
"spark_catalog"
}
// Create metadata from table
val metadata = ServerSidePlanningMetadata.fromTable(table, spark, ident, isUnityCatalog)

// Try to create ServerSidePlannedTable with server-side planning
try {
val client = ServerSidePlanningClientFactory.getClient(spark, catalogName)
Some(new ServerSidePlannedTable(spark, namespace, tableName, table.schema(), client))
} catch {
case _: IllegalStateException =>
// Factory not registered - fall through to normal path
logWarning(s"Server-side planning not available for catalog $catalogName. " +
val plannedTable = tryCreate(spark, namespace, tableName, table.schema(), metadata)
if (plannedTable.isEmpty) {
logWarning(
s"Server-side planning not available for catalog ${metadata.catalogName}. " +
"Falling back to normal table loading.")
None
}
plannedTable
} else {
None
}
}

/**
* Try to create a ServerSidePlannedTable with server-side planning.
* Returns None if the planning client factory is not available.
*
* @param spark The SparkSession
* @param databaseName The database name (may include catalog prefix)
* @param tableName The table name
* @param tableSchema The table schema
* @param metadata Metadata extracted from loadTable response
* @return Some(ServerSidePlannedTable) if successful, None if factory not registered
*/
private def tryCreate(
spark: SparkSession,
databaseName: String,
tableName: String,
tableSchema: StructType,
metadata: ServerSidePlanningMetadata): Option[ServerSidePlannedTable] = {
try {
val client = ServerSidePlanningClientFactory.buildClient(spark, metadata)
Some(new ServerSidePlannedTable(spark, databaseName, tableName, tableSchema, client))
} catch {
case _: IllegalStateException =>
// Factory not registered - this shouldn't happen in production but could during testing
None
}
}

/**
* Check if a table has credentials available.
* Unity Catalog tables may lack credentials when accessed without proper permissions.
Expand Down Expand Up @@ -212,10 +223,10 @@ class ServerSidePlannedScan(

override def toBatch: Batch = this

override def planInputPartitions(): Array[InputPartition] = {
// Call the server-side planning API to get the scan plan
val scanPlan = planningClient.planScan(databaseName, tableName)
// Call the server-side planning API once and store the result
private val scanPlan = planningClient.planScan(databaseName, tableName)

override def planInputPartitions(): Array[InputPartition] = {
// Convert each file to an InputPartition
scanPlan.files.map { file =>
ServerSidePlannedFileInputPartition(file.filePath, file.fileSizeInBytes, file.fileFormat)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,15 @@ trait ServerSidePlanningClient {
*/
private[serverSidePlanning] trait ServerSidePlanningClientFactory {
/**
* Create a client for a specific catalog by reading catalog-specific configuration.
* This method reads configuration from spark.sql.catalog.<catalogName>.uri and
* spark.sql.catalog.<catalogName>.token.
* Create a client using metadata necessary for server-side planning.
*
* @param spark The SparkSession
* @param catalogName The name of the catalog (e.g., "spark_catalog", "unity")
* @return A ServerSidePlanningClient configured for the specified catalog
* @param metadata Metadata necessary for server-side planning
* @return A ServerSidePlanningClient configured with the metadata
*/
def buildForCatalog(spark: SparkSession, catalogName: String): ServerSidePlanningClient
def buildClient(
spark: SparkSession,
metadata: ServerSidePlanningMetadata): ServerSidePlanningClient
}

/**
Expand Down Expand Up @@ -93,19 +93,23 @@ private[serverSidePlanning] object ServerSidePlanningClientFactory {
}

/**
* Get a client for a specific catalog using the registered factory.
* This is the single public entry point for obtaining a ServerSidePlanningClient.
*
* @param spark The SparkSession
* @param catalogName The name of the catalog (e.g., "spark_catalog", "unity")
* @return A ServerSidePlanningClient configured for the specified catalog
* @throws IllegalStateException if no factory has been registered
* Get the currently registered factory.
* Throws IllegalStateException if no factory has been registered.
*/
def getClient(spark: SparkSession, catalogName: String): ServerSidePlanningClient = {
def getFactory(): ServerSidePlanningClientFactory = {
registeredFactory.getOrElse {
throw new IllegalStateException(
"No ServerSidePlanningClientFactory has been registered. " +
"Call ServerSidePlanningClientFactory.setFactory() to register an implementation.")
}.buildForCatalog(spark, catalogName)
}
}

/**
* Convenience method to create a client from metadata using the registered factory.
*/
def buildClient(
spark: SparkSession,
metadata: ServerSidePlanningMetadata): ServerSidePlanningClient = {
getFactory().buildClient(spark, metadata)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Copyright (2025) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.delta.serverSidePlanning

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.connector.catalog.{Identifier, Table}

/**
* Metadata required for creating a server-side planning client.
*
* This interface captures all information from the catalog's loadTable response
* that is needed to create and configure a ServerSidePlanningClient.
*/
private[serverSidePlanning] trait ServerSidePlanningMetadata {
/**
* The base URI for the planning endpoint.
*/
def planningEndpointUri: String

/**
* Authentication token for the planning endpoint.
*/
def authToken: Option[String]
Comment on lines +34 to +37
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in future we may want to generalize this to allow different auth mechanisms other than token.


/**
* Catalog name for configuration lookups.
*/
def catalogName: String

/**
* Additional table properties that may be needed.
* For example, table UUID, credential hints, etc.
*/
def tableProperties: Map[String, String]
}

/**
* Default metadata for non-UC catalogs.
* Used when server-side planning is force-enabled for testing/development.
*/
private[serverSidePlanning] case class DefaultMetadata(
catalogName: String,
tableProps: Map[String, String] = Map.empty) extends ServerSidePlanningMetadata {
override def planningEndpointUri: String = ""
override def authToken: Option[String] = None
override def tableProperties: Map[String, String] = tableProps
}

object ServerSidePlanningMetadata {
/**
* Create metadata from a loaded table.
*
* Currently returns DefaultMetadata for all catalogs.
* Unity Catalog-specific implementation will be added in a later PR.
*/
def fromTable(
table: Table,
spark: SparkSession,
ident: Identifier,
isUnityCatalog: Boolean): ServerSidePlanningMetadata = {

val catalogName = extractCatalogName(ident)
DefaultMetadata(catalogName, Map.empty)
}

private def extractCatalogName(ident: Identifier): String = {
if (ident.namespace().length > 1) {
ident.namespace().head
} else {
"spark_catalog"
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -176,4 +176,25 @@ class ServerSidePlannedTableSuite extends QueryTest with DeltaSQLCommandTest {
)
}
}

test("fromTable returns metadata with empty defaults for non-UC catalogs") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what fromTable? you have to say ServerSidePlanningMetadata.fromTable?
please address in the follow up PR

import org.apache.spark.sql.connector.catalog.Identifier

// Create a simple identifier for testing
val ident = Identifier.of(Array("my_catalog", "my_schema"), "my_table")

// Call fromTable with a null table (we only use the identifier for catalog name extraction)
val metadata = ServerSidePlanningMetadata.fromTable(
table = null,
spark = spark,
ident = ident,
isUnityCatalog = false
)

// Verify the metadata has expected defaults
assert(metadata.catalogName == "my_catalog")
assert(metadata.planningEndpointUri == "")
assert(metadata.authToken.isEmpty)
assert(metadata.tableProperties.isEmpty)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@

package org.apache.spark.sql.delta.serverSidePlanning

import java.util.Locale

import org.apache.hadoop.fs.Path
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.delta.sources.DeltaSQLConf
Expand Down Expand Up @@ -85,9 +83,9 @@ class TestServerSidePlanningClient(spark: SparkSession) extends ServerSidePlanni
* Factory for creating TestServerSidePlanningClient instances.
*/
class TestServerSidePlanningClientFactory extends ServerSidePlanningClientFactory {
override def buildForCatalog(
override def buildClient(
spark: SparkSession,
catalogName: String): ServerSidePlanningClient = {
metadata: ServerSidePlanningMetadata): ServerSidePlanningClient = {
new TestServerSidePlanningClient(spark)
}
}
Loading