Skip to content

Commit 6ce4f27

Browse files
committed
[Server-Side Planning] Add metadata abstraction and factory pattern
This PR refactors ServerSidePlannedTable to use a metadata abstraction layer instead of directly depending on Unity Catalog parameters. The changes include: - ServerSidePlanningMetadata trait with UnityCatalogMetadata and DefaultMetadata - Factory pattern with buildClient(metadata) instead of buildForCatalog() - TestMetadata for unit testing with injectable values - Comprehensive test coverage for all metadata implementations The metadata abstraction enables future extensibility for different catalog implementations while maintaining backwards compatibility. Spark module only - Iceberg test infrastructure changes will be in Row 7 & 8. Files modified (7 spark module files): - ServerSidePlanningMetadata.scala (new) - UnityCatalogMetadata.scala (new) - TestMetadata.scala (new) - ServerSidePlannedTable.scala (modified) - ServerSidePlanningClient.scala (modified) - ServerSidePlannedTableSuite.scala (modified) - TestServerSidePlanningClient.scala (modified)
1 parent 62c2baa commit 6ce4f27

File tree

7 files changed

+408
-61
lines changed

7 files changed

+408
-61
lines changed

spark/src/main/scala/org/apache/spark/sql/delta/serverSidePlanning/ServerSidePlannedTable.scala

Lines changed: 44 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -105,38 +105,49 @@ object ServerSidePlannedTable extends DeltaLogging {
105105
val namespace = ident.namespace().mkString(".")
106106
val tableName = ident.name()
107107

108-
// Extract catalog name from identifier namespace, or default to spark_catalog
109-
//
110-
// Spark Identifier structure:
111-
// - For "catalog.database.table": namespace() = ["catalog", "database"], name() = "table"
112-
// - For "database.table": namespace() = ["database"], name() = "table"
113-
// - For "table": namespace() = [], name() = "table"
114-
//
115-
// Note: We check namespace().length > 1 (not >= 1) because a single-element namespace
116-
// represents just the database name without an explicit catalog, so we use the default.
117-
// See Spark's LookupCatalog, CatalogAndIdentifier and ResolveSessionCatalog.
118-
val catalogName = if (ident.namespace().length > 1) {
119-
ident.namespace().head
120-
} else {
121-
"spark_catalog"
122-
}
108+
// Create metadata from table
109+
val metadata = ServerSidePlanningMetadata.fromTable(table, spark, ident, isUnityCatalog)
123110

124111
// Try to create ServerSidePlannedTable with server-side planning
125-
try {
126-
val client = ServerSidePlanningClientFactory.getClient(spark, catalogName)
127-
Some(new ServerSidePlannedTable(spark, namespace, tableName, table.schema(), client))
128-
} catch {
129-
case _: IllegalStateException =>
130-
// Factory not registered - fall through to normal path
131-
logWarning(s"Server-side planning not available for catalog $catalogName. " +
112+
val plannedTable = tryCreate(spark, namespace, tableName, table.schema(), metadata)
113+
if (plannedTable.isEmpty) {
114+
logWarning(
115+
s"Server-side planning not available for catalog ${metadata.catalogName}. " +
132116
"Falling back to normal table loading.")
133-
None
134117
}
118+
plannedTable
135119
} else {
136120
None
137121
}
138122
}
139123

124+
/**
125+
* Try to create a ServerSidePlannedTable with server-side planning.
126+
* Returns None if the planning client factory is not available.
127+
*
128+
* @param spark The SparkSession
129+
* @param database The database name (may include catalog prefix)
130+
* @param tableName The table name
131+
* @param tableSchema The table schema
132+
* @param metadata Metadata extracted from loadTable response
133+
* @return Some(ServerSidePlannedTable) if successful, None if factory not registered
134+
*/
135+
private def tryCreate(
136+
spark: SparkSession,
137+
databaseName: String,
138+
tableName: String,
139+
tableSchema: StructType,
140+
metadata: ServerSidePlanningMetadata): Option[ServerSidePlannedTable] = {
141+
try {
142+
val client = ServerSidePlanningClientFactory.buildClient(spark, metadata)
143+
Some(new ServerSidePlannedTable(spark, databaseName, tableName, tableSchema, client))
144+
} catch {
145+
case _: IllegalStateException =>
146+
// Factory not registered - this shouldn't happen in production but could during testing
147+
None
148+
}
149+
}
150+
140151
/**
141152
* Check if a table has credentials available.
142153
* Unity Catalog tables may lack credentials when accessed without proper permissions.
@@ -168,7 +179,7 @@ class ServerSidePlannedTable(
168179
extends Table with SupportsRead with DeltaLogging {
169180

170181
// Returns fully qualified name (e.g., "catalog.database.table").
171-
// The databaseName parameter receives ident.namespace().mkString(".") from DeltaCatalog,
182+
// The database parameter receives ident.namespace().mkString(".") from DeltaCatalog,
172183
// which includes the catalog name when present, similar to DeltaTableV2's name() method.
173184
override def name(): String = s"$databaseName.$tableName"
174185

@@ -212,10 +223,10 @@ class ServerSidePlannedScan(
212223

213224
override def toBatch: Batch = this
214225

215-
override def planInputPartitions(): Array[InputPartition] = {
216-
// Call the server-side planning API to get the scan plan
217-
val scanPlan = planningClient.planScan(databaseName, tableName)
226+
// Call the server-side planning API once and store the result
227+
private val scanPlan = planningClient.planScan(databaseName, tableName)
218228

229+
override def planInputPartitions(): Array[InputPartition] = {
219230
// Convert each file to an InputPartition
220231
scanPlan.files.map { file =>
221232
ServerSidePlannedFileInputPartition(file.filePath, file.fileSizeInBytes, file.fileFormat)
@@ -241,29 +252,27 @@ case class ServerSidePlannedFileInputPartition(
241252
*/
242253
class ServerSidePlannedFilePartitionReaderFactory(
243254
spark: SparkSession,
244-
tableSchema: StructType)
255+
schema: StructType)
245256
extends PartitionReaderFactory {
246257

247258
import org.apache.spark.util.SerializableConfiguration
248259

249260
// scalastyle:off deltahadoopconfiguration
250261
// We use sessionState.newHadoopConf() here instead of deltaLog.newDeltaHadoopConf().
251262
// This means DataFrame options (like custom S3 credentials) passed by users will NOT be
252-
// included in the Hadoop configuration. This is intentional:
253-
// - Server-side planning uses server-provided credentials, not user-specified credentials
254-
// - ServerSidePlannedTable is NOT a Delta table, so we don't want Delta-specific options
255-
// from deltaLog.newDeltaHadoopConf()
256-
// - General Spark options from spark.hadoop.* are included and work for all tables
263+
// included in the Hadoop configuration. This would fail if users specify credentials in
264+
// DataFrame read options expecting them to be used when accessing the underlying files.
265+
// However, for now we accept this limitation to avoid requiring a DeltaLog parameter.
257266
private val hadoopConf = new SerializableConfiguration(spark.sessionState.newHadoopConf())
258267
// scalastyle:on deltahadoopconfiguration
259268

260269
// Pre-build reader function for Parquet on the driver
261270
// This function will be serialized and sent to executors
262271
private val parquetReaderBuilder = new ParquetFileFormat().buildReaderWithPartitionValues(
263272
sparkSession = spark,
264-
dataSchema = tableSchema,
273+
dataSchema = schema,
265274
partitionSchema = StructType(Nil),
266-
requiredSchema = tableSchema,
275+
requiredSchema = schema,
267276
filters = Seq.empty,
268277
options = Map(
269278
FileFormat.OPTION_RETURNING_BATCH -> "false"

spark/src/main/scala/org/apache/spark/sql/delta/serverSidePlanning/ServerSidePlanningClient.scala

Lines changed: 23 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -57,17 +57,17 @@ trait ServerSidePlanningClient {
5757
* Factory for creating ServerSidePlanningClient instances.
5858
* This allows for configurable implementations (REST, mock, Spark-based, etc.)
5959
*/
60-
private[serverSidePlanning] trait ServerSidePlanningClientFactory {
60+
trait ServerSidePlanningClientFactory {
6161
/**
62-
* Create a client for a specific catalog by reading catalog-specific configuration.
63-
* This method reads configuration from spark.sql.catalog.<catalogName>.uri and
64-
* spark.sql.catalog.<catalogName>.token.
62+
* Create a client using metadata from catalog's loadTable.
6563
*
6664
* @param spark The SparkSession
67-
* @param catalogName The name of the catalog (e.g., "spark_catalog", "unity")
68-
* @return A ServerSidePlanningClient configured for the specified catalog
65+
* @param metadata Metadata extracted from loadTable response
66+
* @return A ServerSidePlanningClient configured with the metadata
6967
*/
70-
def buildForCatalog(spark: SparkSession, catalogName: String): ServerSidePlanningClient
68+
def buildClient(
69+
spark: SparkSession,
70+
metadata: ServerSidePlanningMetadata): ServerSidePlanningClient
7171
}
7272

7373
/**
@@ -77,37 +77,41 @@ private[serverSidePlanning] trait ServerSidePlanningClientFactory {
7777
* By default, no factory is registered. Production code should register an appropriate
7878
* factory implementation before attempting to create clients.
7979
*/
80-
private[serverSidePlanning] object ServerSidePlanningClientFactory {
80+
object ServerSidePlanningClientFactory {
8181
@volatile private var registeredFactory: Option[ServerSidePlanningClientFactory] = None
8282

8383
/**
8484
* Set a factory for production use or testing.
8585
*/
86-
private[serverSidePlanning] def setFactory(factory: ServerSidePlanningClientFactory): Unit = {
86+
def setFactory(factory: ServerSidePlanningClientFactory): Unit = {
8787
registeredFactory = Some(factory)
8888
}
8989

9090
/**
9191
* Clear the registered factory.
9292
*/
93-
private[serverSidePlanning] def clearFactory(): Unit = {
93+
def clearFactory(): Unit = {
9494
registeredFactory = None
9595
}
9696

9797
/**
98-
* Get a client for a specific catalog using the registered factory.
99-
* This is the single public entry point for obtaining a ServerSidePlanningClient.
100-
*
101-
* @param spark The SparkSession
102-
* @param catalogName The name of the catalog (e.g., "spark_catalog", "unity")
103-
* @return A ServerSidePlanningClient configured for the specified catalog
104-
* @throws IllegalStateException if no factory has been registered
98+
* Get the currently registered factory.
99+
* Throws IllegalStateException if no factory has been registered.
105100
*/
106-
def getClient(spark: SparkSession, catalogName: String): ServerSidePlanningClient = {
101+
def getFactory(): ServerSidePlanningClientFactory = {
107102
registeredFactory.getOrElse {
108103
throw new IllegalStateException(
109104
"No ServerSidePlanningClientFactory has been registered. " +
110105
"Call ServerSidePlanningClientFactory.setFactory() to register an implementation.")
111-
}.buildForCatalog(spark, catalogName)
106+
}
107+
}
108+
109+
/**
110+
* Convenience method to create a client from metadata using the registered factory.
111+
*/
112+
def buildClient(
113+
spark: SparkSession,
114+
metadata: ServerSidePlanningMetadata): ServerSidePlanningClient = {
115+
getFactory().buildClient(spark, metadata)
112116
}
113117
}
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/*
2+
* Copyright (2021) The Delta Lake Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.apache.spark.sql.delta.serverSidePlanning
18+
19+
import org.apache.spark.sql.SparkSession
20+
import org.apache.spark.sql.connector.catalog.{Identifier, Table}
21+
22+
/**
23+
* Metadata required for creating a server-side planning client.
24+
*
25+
* This interface captures all information from the catalog's loadTable response
26+
* that is needed to create and configure a ServerSidePlanningClient.
27+
*/
28+
trait ServerSidePlanningMetadata {
29+
/**
30+
* The base URI for the planning endpoint.
31+
*/
32+
def planningEndpointUri: String
33+
34+
/**
35+
* Authentication token for the planning endpoint.
36+
*/
37+
def authToken: Option[String]
38+
39+
/**
40+
* Catalog name for configuration lookups.
41+
*/
42+
def catalogName: String
43+
44+
/**
45+
* Additional table properties that may be needed.
46+
* For example, table UUID, credential hints, etc.
47+
*/
48+
def tableProperties: Map[String, String]
49+
}
50+
51+
/**
52+
* Default metadata for non-UC catalogs.
53+
* Used when server-side planning is force-enabled for testing/development.
54+
*/
55+
case class DefaultMetadata(
56+
catalogName: String,
57+
tableProps: Map[String, String] = Map.empty) extends ServerSidePlanningMetadata {
58+
override def planningEndpointUri: String = ""
59+
override def authToken: Option[String] = None
60+
override def tableProperties: Map[String, String] = tableProps
61+
}
62+
63+
object ServerSidePlanningMetadata {
64+
/**
65+
* Create metadata from a loaded table.
66+
*
67+
* Returns UnityCatalogMetadata for UC catalogs, DefaultMetadata for non-UC catalogs.
68+
*/
69+
def fromTable(
70+
table: Table,
71+
spark: SparkSession,
72+
ident: Identifier,
73+
isUnityCatalog: Boolean): ServerSidePlanningMetadata = {
74+
75+
if (isUnityCatalog) {
76+
UnityCatalogMetadata.fromTable(table, spark, ident)
77+
} else {
78+
// For non-UC catalogs, return default metadata
79+
val catalogName = extractCatalogName(ident)
80+
DefaultMetadata(catalogName, Map.empty)
81+
}
82+
}
83+
84+
private def extractCatalogName(ident: Identifier): String = {
85+
if (ident.namespace().length > 1) {
86+
ident.namespace().head
87+
} else {
88+
"spark_catalog"
89+
}
90+
}
91+
}

0 commit comments

Comments
 (0)