diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/serverSidePlanning/ServerSidePlannedTable.scala b/spark/src/main/scala/org/apache/spark/sql/delta/serverSidePlanning/ServerSidePlannedTable.scala new file mode 100644 index 00000000000..c67aaab58b4 --- /dev/null +++ b/spark/src/main/scala/org/apache/spark/sql/delta/serverSidePlanning/ServerSidePlannedTable.scala @@ -0,0 +1,205 @@ +/* + * Copyright (2025) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta.serverSidePlanning + +import java.util +import java.util.Locale + +import scala.collection.JavaConverters._ + +import org.apache.spark.paths.SparkPath +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.delta.serverSidePlanning.ServerSidePlanningClient +import org.apache.spark.sql.connector.catalog.{SupportsRead, Table, TableCapability} +import org.apache.spark.sql.connector.read._ +import org.apache.spark.sql.execution.datasources.{FileFormat, PartitionedFile} +import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +/** + * A Spark Table implementation that uses server-side scan planning + * to get the list of files to read. Used as a fallback when Unity Catalog + * doesn't provide credentials. + * + * Similar to DeltaTableV2, we accept SparkSession as a constructor parameter + * since Tables are created on the driver and are not serialized to executors. + */ +class ServerSidePlannedTable( + spark: SparkSession, + databaseName: String, + tableName: String, + tableSchema: StructType, + planningClient: ServerSidePlanningClient) extends Table with SupportsRead { + + // Returns fully qualified name (e.g., "catalog.database.table"). + // The databaseName parameter receives ident.namespace().mkString(".") from DeltaCatalog, + // which includes the catalog name when present, similar to DeltaTableV2's name() method. + override def name(): String = s"$databaseName.$tableName" + + override def schema(): StructType = tableSchema + + override def capabilities(): util.Set[TableCapability] = { + Set(TableCapability.BATCH_READ).asJava + } + + override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = { + new ServerSidePlannedScanBuilder(spark, databaseName, tableName, tableSchema, planningClient) + } +} + +/** + * ScanBuilder that uses ServerSidePlanningClient to plan the scan. + */ +class ServerSidePlannedScanBuilder( + spark: SparkSession, + databaseName: String, + tableName: String, + tableSchema: StructType, + planningClient: ServerSidePlanningClient) extends ScanBuilder { + + override def build(): Scan = { + new ServerSidePlannedScan(spark, databaseName, tableName, tableSchema, planningClient) + } +} + +/** + * Scan implementation that calls the server-side planning API to get file list. + */ +class ServerSidePlannedScan( + spark: SparkSession, + databaseName: String, + tableName: String, + tableSchema: StructType, + planningClient: ServerSidePlanningClient) extends Scan with Batch { + + override def readSchema(): StructType = tableSchema + + override def toBatch: Batch = this + + override def planInputPartitions(): Array[InputPartition] = { + // Call the server-side planning API to get the scan plan + val scanPlan = planningClient.planScan(databaseName, tableName) + + // Convert each file to an InputPartition + scanPlan.files.map { file => + ServerSidePlannedFileInputPartition(file.filePath, file.fileSizeInBytes, file.fileFormat) + }.toArray + } + + override def createReaderFactory(): PartitionReaderFactory = { + new ServerSidePlannedFilePartitionReaderFactory(spark, tableSchema) + } +} + +/** + * InputPartition representing a single file from the server-side scan plan. + */ +case class ServerSidePlannedFileInputPartition( + filePath: String, + fileSizeInBytes: Long, + fileFormat: String) extends InputPartition + +/** + * Factory for creating PartitionReaders that read server-side planned files. + * Builds reader functions on the driver for Parquet files. + */ +class ServerSidePlannedFilePartitionReaderFactory( + spark: SparkSession, + tableSchema: StructType) + extends PartitionReaderFactory { + + import org.apache.spark.util.SerializableConfiguration + + // scalastyle:off deltahadoopconfiguration + // We use sessionState.newHadoopConf() here instead of deltaLog.newDeltaHadoopConf(). + // This means DataFrame options (like custom S3 credentials) passed by users will NOT be + // included in the Hadoop configuration. This is intentional: + // - Server-side planning uses server-provided credentials, not user-specified credentials + // - ServerSidePlannedTable is NOT a Delta table, so we don't want Delta-specific options + // from deltaLog.newDeltaHadoopConf() + // - General Spark options from spark.hadoop.* are included and work for all tables + private val hadoopConf = new SerializableConfiguration(spark.sessionState.newHadoopConf()) + // scalastyle:on deltahadoopconfiguration + + // Pre-build reader function for Parquet on the driver + // This function will be serialized and sent to executors + private val parquetReaderBuilder = new ParquetFileFormat().buildReaderWithPartitionValues( + sparkSession = spark, + dataSchema = tableSchema, + partitionSchema = StructType(Nil), + requiredSchema = tableSchema, + filters = Seq.empty, + options = Map( + FileFormat.OPTION_RETURNING_BATCH -> "false" + ), + hadoopConf = hadoopConf.value + ) + + override def createReader(partition: InputPartition): PartitionReader[InternalRow] = { + val filePartition = partition.asInstanceOf[ServerSidePlannedFileInputPartition] + + // Verify file format is Parquet + // Scalastyle suppression needed: the caselocale regex incorrectly flags even correct usage + // of toLowerCase(Locale.ROOT). Similar to PartitionUtils.scala and SchemaUtils.scala. + // scalastyle:off caselocale + if (filePartition.fileFormat.toLowerCase(Locale.ROOT) != "parquet") { + // scalastyle:on caselocale + throw new UnsupportedOperationException( + s"File format '${filePartition.fileFormat}' is not supported. Only Parquet is supported.") + } + + new ServerSidePlannedFilePartitionReader(filePartition, parquetReaderBuilder) + } +} + +/** + * PartitionReader that reads a single file using a pre-built reader function. + * The reader function was created on the driver and is executed on the executor. + */ +class ServerSidePlannedFilePartitionReader( + partition: ServerSidePlannedFileInputPartition, + readerBuilder: PartitionedFile => Iterator[InternalRow]) + extends PartitionReader[InternalRow] { + + // Create PartitionedFile for this file + private val partitionedFile = PartitionedFile( + partitionValues = InternalRow.empty, + filePath = SparkPath.fromPathString(partition.filePath), + start = 0, + length = partition.fileSizeInBytes + ) + + // Call the pre-built reader function with our PartitionedFile + // This happens on the executor and doesn't need SparkSession + private lazy val readerIterator: Iterator[InternalRow] = { + readerBuilder(partitionedFile) + } + + override def next(): Boolean = { + readerIterator.hasNext + } + + override def get(): InternalRow = { + readerIterator.next() + } + + override def close(): Unit = { + // Reader cleanup is handled by Spark + } +} diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/serverSidePlanning/ServerSidePlanningClient.scala b/spark/src/main/scala/org/apache/spark/sql/delta/serverSidePlanning/ServerSidePlanningClient.scala new file mode 100644 index 00000000000..2472164858c --- /dev/null +++ b/spark/src/main/scala/org/apache/spark/sql/delta/serverSidePlanning/ServerSidePlanningClient.scala @@ -0,0 +1,111 @@ +/* + * Copyright (2025) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta.serverSidePlanning + +import org.apache.spark.sql.SparkSession + +/** + * Simple data class representing a file to scan. + * No dependencies on Iceberg types. + */ +case class ScanFile( + filePath: String, + fileSizeInBytes: Long, + fileFormat: String // "parquet", "orc", etc. +) + +/** + * Result of a table scan plan operation. + */ +case class ScanPlan( + files: Seq[ScanFile] +) + +/** + * Interface for planning table scans via server-side planning (e.g., Iceberg REST catalog). + * This interface is intentionally simple and has no dependencies + * on Iceberg libraries, allowing it to live in delta-spark module. + */ +trait ServerSidePlanningClient { + /** + * Plan a table scan and return the list of files to read. + * + * @param databaseName The database or schema name + * @param table The table name + * @return ScanPlan containing files to read + */ + def planScan(databaseName: String, table: String): ScanPlan +} + +/** + * Factory for creating ServerSidePlanningClient instances. + * This allows for configurable implementations (REST, mock, Spark-based, etc.) + */ +private[serverSidePlanning] trait ServerSidePlanningClientFactory { + /** + * Create a client for a specific catalog by reading catalog-specific configuration. + * This method reads configuration from spark.sql.catalog..uri and + * spark.sql.catalog..token. + * + * @param spark The SparkSession + * @param catalogName The name of the catalog (e.g., "spark_catalog", "unity") + * @return A ServerSidePlanningClient configured for the specified catalog + */ + def buildForCatalog(spark: SparkSession, catalogName: String): ServerSidePlanningClient +} + +/** + * Registry for client factories. Can be configured for testing or to provide + * production implementations (e.g., IcebergRESTCatalogPlanningClientFactory). + * + * By default, no factory is registered. Production code should register an appropriate + * factory implementation before attempting to create clients. + */ +private[serverSidePlanning] object ServerSidePlanningClientFactory { + @volatile private var registeredFactory: Option[ServerSidePlanningClientFactory] = None + + /** + * Set a factory for production use or testing. + */ + private[serverSidePlanning] def setFactory(factory: ServerSidePlanningClientFactory): Unit = { + registeredFactory = Some(factory) + } + + /** + * Clear the registered factory. + */ + private[serverSidePlanning] def clearFactory(): Unit = { + registeredFactory = None + } + + /** + * Get a client for a specific catalog using the registered factory. + * This is the single public entry point for obtaining a ServerSidePlanningClient. + * + * @param spark The SparkSession + * @param catalogName The name of the catalog (e.g., "spark_catalog", "unity") + * @return A ServerSidePlanningClient configured for the specified catalog + * @throws IllegalStateException if no factory has been registered + */ + def getClient(spark: SparkSession, catalogName: String): ServerSidePlanningClient = { + registeredFactory.getOrElse { + throw new IllegalStateException( + "No ServerSidePlanningClientFactory has been registered. " + + "Call ServerSidePlanningClientFactory.setFactory() to register an implementation.") + }.buildForCatalog(spark, catalogName) + } +} diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/serverSidePlanning/ServerSidePlannedTableSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/serverSidePlanning/ServerSidePlannedTableSuite.scala new file mode 100644 index 00000000000..a31ed56c909 --- /dev/null +++ b/spark/src/test/scala/org/apache/spark/sql/delta/serverSidePlanning/ServerSidePlannedTableSuite.scala @@ -0,0 +1,104 @@ +/* + * Copyright (2025) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta.serverSidePlanning + +import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.test.SharedSparkSession + +class ServerSidePlannedTableSuite extends QueryTest with SharedSparkSession { + + test("end-to-end: ServerSidePlannedTable with test client") { + withTable("test_table") { + // Create a Parquet table with data + sql(""" + CREATE TABLE test_table ( + id INT, + name STRING, + category STRING + ) USING parquet + """) + + sql(""" + INSERT INTO test_table (id, name, category) VALUES + (1, 'Alice', 'A'), + (2, 'Bob', 'B'), + (3, 'Charlie', 'A'), + (4, 'David', 'B') + """) + + // Configure factory to use test client + val testFactory = new TestServerSidePlanningClientFactory() + ServerSidePlanningClientFactory.setFactory(testFactory) + + try { + // Create client and verify it's the test client + val client = ServerSidePlanningClientFactory.getClient(spark, "spark_catalog") + assert(client.isInstanceOf[TestServerSidePlanningClient], + "Client should be TestServerSidePlanningClient") + + // Get scan plan and verify file discovery + val scanPlan = client.planScan("default", "test_table") + assert(scanPlan.files.nonEmpty, "Should discover data files") + assert(scanPlan.files.forall(_.fileFormat == "parquet"), + "Parquet tables should have parquet file format") + assert(scanPlan.files.forall(_.fileSizeInBytes > 0), + "All files should have positive size") + + // Get the table schema from the actual table + val tableSchema = spark.table("test_table").schema + + // Create ServerSidePlannedTable using schema from the table + val table = new ServerSidePlannedTable( + spark = spark, + databaseName = "default", + tableName = "test_table", + tableSchema = tableSchema, + planningClient = client + ) + + // Verify table metadata + assert(table.name() == "default.test_table", + "Table name should be fully qualified") + assert(table.schema() == tableSchema, + "Table schema should match") + + // Verify scan produces correct number of partitions + val scan = table.newScanBuilder( + new org.apache.spark.sql.util.CaseInsensitiveStringMap( + java.util.Collections.emptyMap() + ) + ).build() + + val partitions = scan.toBatch.planInputPartitions() + assert(partitions.length == scanPlan.files.length, + s"Should have ${scanPlan.files.length} partitions, one per file") + + // Verify reader factory can be created + val readerFactory = scan.toBatch.createReaderFactory() + assert(readerFactory != null, "Reader factory should be created") + + // Verify we can create a reader for the first partition + val reader = readerFactory.createReader(partitions(0)) + assert(reader != null, "Reader should be created for partition") + + } finally { + // Clean up factory + ServerSidePlanningClientFactory.clearFactory() + } + } + } +} diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/serverSidePlanning/TestServerSidePlanningClient.scala b/spark/src/test/scala/org/apache/spark/sql/delta/serverSidePlanning/TestServerSidePlanningClient.scala new file mode 100644 index 00000000000..328e3f2c045 --- /dev/null +++ b/spark/src/test/scala/org/apache/spark/sql/delta/serverSidePlanning/TestServerSidePlanningClient.scala @@ -0,0 +1,77 @@ +/* + * Copyright (2025) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta.serverSidePlanning + +import java.util.Locale + +import org.apache.hadoop.fs.Path +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.functions.input_file_name + +/** + * Implementation of ServerSidePlanningClient that uses Spark SQL with input_file_name() + * to discover the list of files in a table. This allows end-to-end testing without + * a real server that can do server-side planning. + */ +class TestServerSidePlanningClient(spark: SparkSession) extends ServerSidePlanningClient { + + override def planScan(databaseName: String, table: String): ScanPlan = { + val fullTableName = s"$databaseName.$table" + + // Use input_file_name() to get the list of files + // Query: SELECT DISTINCT input_file_name() FROM table + val filesDF = spark.table(fullTableName) + .select(input_file_name().as("file_path")) + .distinct() + + // Collect file paths + val filePaths = filesDF.collect().map(_.getString(0)) + + // Get file metadata (size, format) from filesystem + // scalastyle:off deltahadoopconfiguration + val hadoopConf = spark.sessionState.newHadoopConf() + // scalastyle:on deltahadoopconfiguration + val files = filePaths.map { filePath => + // input_file_name() returns URL-encoded paths, decode them + val decodedPath = java.net.URLDecoder.decode(filePath, "UTF-8") + val path = new Path(decodedPath) + val fs = path.getFileSystem(hadoopConf) + val fileStatus = fs.getFileStatus(path) + + ScanFile( + filePath = decodedPath, + fileSizeInBytes = fileStatus.getLen, + fileFormat = getFileFormat(path) + ) + }.toSeq + + ScanPlan(files = files) + } + + private def getFileFormat(path: Path): String = "parquet" +} + +/** + * Factory for creating TestServerSidePlanningClient instances. + */ +class TestServerSidePlanningClientFactory extends ServerSidePlanningClientFactory { + override def buildForCatalog( + spark: SparkSession, + catalogName: String): ServerSidePlanningClient = { + new TestServerSidePlanningClient(spark) + } +}