diff --git a/kernel-spark/src/main/java/io/delta/kernel/spark/snapshot/unitycatalog/UCTableInfo.java b/kernel-spark/src/main/java/io/delta/kernel/spark/snapshot/unitycatalog/UCTableInfo.java new file mode 100644 index 00000000000..821f79656e6 --- /dev/null +++ b/kernel-spark/src/main/java/io/delta/kernel/spark/snapshot/unitycatalog/UCTableInfo.java @@ -0,0 +1,54 @@ +/* + * Copyright (2025) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.spark.snapshot.unitycatalog; + +import static java.util.Objects.requireNonNull; + +/** + * Table information for Unity Catalog managed tables. + * + *

This POJO encapsulates all the information needed to interact with a Unity Catalog table + * without requiring Spark dependencies. + */ +public final class UCTableInfo { + private final String tableId; + private final String tablePath; + private final String ucUri; + private final String ucToken; + + public UCTableInfo(String tableId, String tablePath, String ucUri, String ucToken) { + this.tableId = requireNonNull(tableId, "tableId is null"); + this.tablePath = requireNonNull(tablePath, "tablePath is null"); + this.ucUri = requireNonNull(ucUri, "ucUri is null"); + this.ucToken = requireNonNull(ucToken, "ucToken is null"); + } + + public String getTableId() { + return tableId; + } + + public String getTablePath() { + return tablePath; + } + + public String getUcUri() { + return ucUri; + } + + public String getUcToken() { + return ucToken; + } +} diff --git a/kernel-spark/src/main/java/io/delta/kernel/spark/snapshot/unitycatalog/UCUtils.java b/kernel-spark/src/main/java/io/delta/kernel/spark/snapshot/unitycatalog/UCUtils.java new file mode 100644 index 00000000000..5b89fdf5f8f --- /dev/null +++ b/kernel-spark/src/main/java/io/delta/kernel/spark/snapshot/unitycatalog/UCUtils.java @@ -0,0 +1,113 @@ +/* + * Copyright (2025) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.spark.snapshot.unitycatalog; + +import static java.util.Objects.requireNonNull; + +import io.delta.kernel.spark.utils.CatalogTableUtils; +import io.delta.storage.commit.uccommitcoordinator.UCCommitCoordinatorClient; +import java.util.Map; +import java.util.Optional; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.catalyst.catalog.CatalogTable; +import org.apache.spark.sql.delta.coordinatedcommits.UCCatalogConfig; +import org.apache.spark.sql.delta.coordinatedcommits.UCCommitCoordinatorBuilder$; + +/** + * Utility class for extracting Unity Catalog table information from Spark catalog metadata. + * + *

This class isolates Spark dependencies, allowing {@link UCManagedSnapshotManager} to be + * created without Spark if table info is provided directly via {@link UCTableInfo}. + */ +public final class UCUtils { + + // Utility class - no instances + private UCUtils() {} + + /** + * Extracts Unity Catalog table information from Spark catalog table metadata. + * + * @param catalogTable Spark catalog table metadata + * @param spark SparkSession for resolving Unity Catalog configurations + * @return table info if table is UC-managed, empty otherwise + * @throws IllegalArgumentException if table is UC-managed but configuration is invalid + */ + public static Optional extractTableInfo( + CatalogTable catalogTable, SparkSession spark) { + requireNonNull(catalogTable, "catalogTable is null"); + requireNonNull(spark, "spark is null"); + + if (!CatalogTableUtils.isUnityCatalogManagedTable(catalogTable)) { + return Optional.empty(); + } + + String tableId = extractUCTableId(catalogTable); + String tablePath = extractTablePath(catalogTable); + + // Get catalog name - require explicit catalog in identifier + scala.Option catalogOption = catalogTable.identifier().catalog(); + if (catalogOption.isEmpty()) { + throw new IllegalArgumentException( + "Unable to determine Unity Catalog for table " + + catalogTable.identifier() + + ": catalog name is missing. Use a fully-qualified table name with an explicit " + + "catalog (e.g., catalog.schema.table)."); + } + String catalogName = catalogOption.get(); + + // Get UC endpoint and token from Spark configs + scala.collection.immutable.Map ucConfigs = + UCCommitCoordinatorBuilder$.MODULE$.getCatalogConfigMap(spark); + + scala.Option configOpt = ucConfigs.get(catalogName); + + if (configOpt.isEmpty()) { + throw new IllegalArgumentException( + "Cannot create UC client for table " + + catalogTable.identifier() + + ": Unity Catalog configuration not found for catalog '" + + catalogName + + "'."); + } + + UCCatalogConfig config = configOpt.get(); + String ucUri = config.uri(); + String ucToken = config.token(); + + return Optional.of(new UCTableInfo(tableId, tablePath, ucUri, ucToken)); + } + + private static String extractUCTableId(CatalogTable catalogTable) { + Map storageProperties = + scala.jdk.javaapi.CollectionConverters.asJava(catalogTable.storage().properties()); + + // TODO: UC constants should be consolidated in a shared location (future PR) + String ucTableId = storageProperties.get(UCCommitCoordinatorClient.UC_TABLE_ID_KEY); + if (ucTableId == null || ucTableId.isEmpty()) { + throw new IllegalArgumentException( + "Cannot extract ucTableId from table " + catalogTable.identifier()); + } + return ucTableId; + } + + private static String extractTablePath(CatalogTable catalogTable) { + if (catalogTable.location() == null) { + throw new IllegalArgumentException( + "Cannot extract table path: location is null for table " + catalogTable.identifier()); + } + return catalogTable.location().toString(); + } +} diff --git a/kernel-spark/src/test/java/io/delta/kernel/spark/snapshot/unitycatalog/UCTableInfoTest.java b/kernel-spark/src/test/java/io/delta/kernel/spark/snapshot/unitycatalog/UCTableInfoTest.java new file mode 100644 index 00000000000..4e30caf91b9 --- /dev/null +++ b/kernel-spark/src/test/java/io/delta/kernel/spark/snapshot/unitycatalog/UCTableInfoTest.java @@ -0,0 +1,40 @@ +/* + * Copyright (2025) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.spark.snapshot.unitycatalog; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import org.junit.jupiter.api.Test; + +/** Tests for {@link UCTableInfo}. */ +class UCTableInfoTest { + + @Test + void testConstructor_ValidInputs_StoresAllFields() { + // Use distinctive values that would fail if implementation had hardcoded defaults + String tableId = "uc_tbl_7f3a9b2c-e8d1-4f6a"; + String tablePath = "abfss://container@acct.dfs.core.windows.net/delta/v2"; + String ucUri = "https://uc-server.example.net/api/2.1/uc"; + String ucToken = "dapi_Kx9mN$2pQr#7vWz"; + + UCTableInfo info = new UCTableInfo(tableId, tablePath, ucUri, ucToken); + + assertEquals(tableId, info.getTableId(), "Table ID should be stored correctly"); + assertEquals(tablePath, info.getTablePath(), "Table path should be stored correctly"); + assertEquals(ucUri, info.getUcUri(), "UC URI should be stored correctly"); + assertEquals(ucToken, info.getUcToken(), "UC token should be stored correctly"); + } +} diff --git a/kernel-spark/src/test/java/io/delta/kernel/spark/utils/CatalogTableUtilsTest.java b/kernel-spark/src/test/java/io/delta/kernel/spark/utils/CatalogTableUtilsTest.java index 24e6593890a..86a52ced315 100644 --- a/kernel-spark/src/test/java/io/delta/kernel/spark/utils/CatalogTableUtilsTest.java +++ b/kernel-spark/src/test/java/io/delta/kernel/spark/utils/CatalogTableUtilsTest.java @@ -16,14 +16,15 @@ package io.delta.kernel.spark.utils; import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import io.delta.storage.commit.uccommitcoordinator.UCCommitCoordinatorClient; import java.util.Collections; +import java.util.HashMap; import java.util.Map; import org.apache.spark.sql.catalyst.catalog.CatalogTable; import org.junit.jupiter.api.Test; +import scala.Option; /** Tests for {@link CatalogTableUtils}. */ class CatalogTableUtilsTest { @@ -97,22 +98,6 @@ void testIsUnityCatalogManaged_PreviewFlagMissingId_ReturnsFalse() { "Preview flag without ID should not be considered Unity managed"); } - @Test - void testIsCatalogManaged_NullTable_ThrowsException() { - assertThrows( - NullPointerException.class, - () -> CatalogTableUtils.isCatalogManaged(null), - "Null table should throw NullPointerException"); - } - - @Test - void testIsUnityCatalogManaged_NullTable_ThrowsException() { - assertThrows( - NullPointerException.class, - () -> CatalogTableUtils.isUnityCatalogManagedTable(null), - "Null table should throw NullPointerException"); - } - @Test void testIsCatalogManaged_NullStorage_ReturnsFalse() { CatalogTable table = catalogTableWithNullStorage(Collections.emptyMap()); @@ -151,15 +136,36 @@ void testIsUnityCatalogManaged_NullStorageProperties_ReturnsFalse() { private static CatalogTable catalogTable( Map properties, Map storageProperties) { - return CatalogTableTestUtils$.MODULE$.catalogTableWithProperties(properties, storageProperties); + return CatalogTableTestUtils$.MODULE$.createCatalogTable( + "tbl" /* tableName */, + Option.empty() /* catalogName */, + properties, + storageProperties, + Option.empty() /* locationUri */, + false /* nullStorage */, + false /* nullStorageProperties */); } private static CatalogTable catalogTableWithNullStorage(Map properties) { - return CatalogTableTestUtils$.MODULE$.catalogTableWithNullStorage(properties); + return CatalogTableTestUtils$.MODULE$.createCatalogTable( + "tbl" /* tableName */, + Option.empty() /* catalogName */, + properties, + new HashMap<>() /* storageProperties */, + Option.empty() /* locationUri */, + true /* nullStorage */, + false /* nullStorageProperties */); } private static CatalogTable catalogTableWithNullStorageProperties( Map properties) { - return CatalogTableTestUtils$.MODULE$.catalogTableWithNullStorageProperties(properties); + return CatalogTableTestUtils$.MODULE$.createCatalogTable( + "tbl" /* tableName */, + Option.empty() /* catalogName */, + properties, + new HashMap<>() /* storageProperties */, + Option.empty() /* locationUri */, + false /* nullStorage */, + true /* nullStorageProperties */); } } diff --git a/kernel-spark/src/test/scala/io/delta/kernel/spark/snapshot/unitycatalog/UCUtilsSuite.scala b/kernel-spark/src/test/scala/io/delta/kernel/spark/snapshot/unitycatalog/UCUtilsSuite.scala new file mode 100644 index 00000000000..fa29ca500ff --- /dev/null +++ b/kernel-spark/src/test/scala/io/delta/kernel/spark/snapshot/unitycatalog/UCUtilsSuite.scala @@ -0,0 +1,225 @@ +/* + * Copyright (2025) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.spark.snapshot.unitycatalog + +import java.net.URI +import java.util.{HashMap => JHashMap} + +import io.delta.kernel.internal.tablefeatures.TableFeatures +import io.delta.kernel.spark.utils.CatalogTableTestUtils +import io.delta.storage.commit.uccommitcoordinator.UCCommitCoordinatorClient + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.catalog.CatalogTable +import org.apache.spark.sql.test.SharedSparkSession + +/** + * Unit tests for [[UCUtils]]. + * + * Tests use distinctive, high-entropy values that would fail if the implementation + * had hardcoded defaults instead of actually extracting values from the inputs. + */ +class UCUtilsSuite extends SparkFunSuite with SharedSparkSession { + + // Use the same constants as CatalogTableUtils to ensure consistency + private val FEATURE_CATALOG_MANAGED = + TableFeatures.SET_TABLE_FEATURE_SUPPORTED_PREFIX + + TableFeatures.CATALOG_MANAGED_RW_FEATURE.featureName() + private val FEATURE_SUPPORTED = TableFeatures.SET_TABLE_FEATURE_SUPPORTED_VALUE + private val UC_TABLE_ID_KEY = UCCommitCoordinatorClient.UC_TABLE_ID_KEY + private val UC_CATALOG_CONNECTOR = "io.unitycatalog.spark.UCSingleCatalog" + + // Distinctive values that would fail if hardcoded + private val TABLE_ID_ALPHA = "uc_8f2b3c9a-d1e7-4a6f-b8c2" + private val TABLE_PATH_ALPHA = "abfss://delta-store@prod.dfs.core.windows.net/warehouse/tbl_v3" + private val UC_URI_ALPHA = "https://uc-server-westus2.example.net/api/2.1/unity-catalog" + private val UC_TOKEN_ALPHA = "dapi_Xk7mP$9qRs#2vWz_prod" + private val CATALOG_ALPHA = "uc_catalog_westus2_prod" + + // ==================== Helper Methods ==================== + + private def makeNonUCTable(): CatalogTable = { + CatalogTableTestUtils.createCatalogTable(locationUri = Some(new URI(TABLE_PATH_ALPHA))) + } + + private def makeUCTable( + tableId: String = TABLE_ID_ALPHA, + tablePath: String = TABLE_PATH_ALPHA, + catalogName: Option[String] = None): CatalogTable = { + val storageProps = new JHashMap[String, String]() + storageProps.put(FEATURE_CATALOG_MANAGED, FEATURE_SUPPORTED) + storageProps.put(UC_TABLE_ID_KEY, tableId) + + CatalogTableTestUtils.createCatalogTable( + catalogName = catalogName, + storageProperties = storageProps, + locationUri = Some(new URI(tablePath))) + } + + private def withUCCatalogConfig( + catalogName: String, + uri: String, + token: String)(testCode: => Unit): Unit = { + val configs = Seq( + s"spark.sql.catalog.$catalogName" -> UC_CATALOG_CONNECTOR, + s"spark.sql.catalog.$catalogName.uri" -> uri, + s"spark.sql.catalog.$catalogName.token" -> token) + val originalValues = configs.map { case (key, _) => key -> spark.conf.getOption(key) }.toMap + + try { + configs.foreach { case (key, value) => spark.conf.set(key, value) } + testCode + } finally { + configs.foreach { case (key, _) => + originalValues.get(key).flatten match { + case Some(v) => spark.conf.set(key, v) + case None => spark.conf.unset(key) + } + } + } + } + + // ==================== Tests ==================== + + test("returns empty for non-UC table") { + val table = makeNonUCTable() + val result = UCUtils.extractTableInfo(table, spark) + assert(result.isEmpty, "Non-UC table should return empty Optional") + } + + test("returns empty when UC table ID present but feature flag missing") { + val storageProps = new JHashMap[String, String]() + storageProps.put(UC_TABLE_ID_KEY, "orphan_id_9x7y5z") + // No FEATURE_CATALOG_MANAGED - simulates corrupted/partial metadata + + val table = CatalogTableTestUtils.createCatalogTable( + storageProperties = storageProps, + locationUri = Some(new URI("gs://other-bucket/path"))) + val result = UCUtils.extractTableInfo(table, spark) + assert(result.isEmpty, "Missing feature flag should return empty") + } + + test("throws IllegalArgumentException for UC table with empty table ID") { + val storageProps = new JHashMap[String, String]() + storageProps.put(FEATURE_CATALOG_MANAGED, FEATURE_SUPPORTED) + storageProps.put(UC_TABLE_ID_KEY, "") + + val table = CatalogTableTestUtils.createCatalogTable( + storageProperties = storageProps, + locationUri = Some(new URI("s3://empty-id-bucket/path"))) + val exception = intercept[IllegalArgumentException] { + UCUtils.extractTableInfo(table, spark) + } + assert(exception.getMessage.contains("Cannot extract ucTableId")) + } + + test("throws exception for UC table without location") { + val storageProps = new JHashMap[String, String]() + storageProps.put(FEATURE_CATALOG_MANAGED, FEATURE_SUPPORTED) + storageProps.put(UC_TABLE_ID_KEY, "no_location_tbl_id_3k9m") + + val table = CatalogTableTestUtils.createCatalogTable(storageProperties = storageProps) + // Spark throws AnalysisException when location is missing + val exception = intercept[Exception] { + UCUtils.extractTableInfo(table, spark) + } + assert(exception.getMessage.contains("locationUri") || + exception.getMessage.contains("location")) + } + + test("throws IllegalArgumentException when no matching catalog configuration") { + val table = makeUCTable(catalogName = Some("nonexistent_catalog_xyz")) + + val exception = intercept[IllegalArgumentException] { + UCUtils.extractTableInfo(table, spark) + } + assert(exception.getMessage.contains("Unity Catalog configuration not found") || + exception.getMessage.contains("Cannot create UC client")) + } + + test("extracts table info when UC catalog is properly configured") { + val table = makeUCTable(catalogName = Some(CATALOG_ALPHA)) + + withUCCatalogConfig(CATALOG_ALPHA, UC_URI_ALPHA, UC_TOKEN_ALPHA) { + val result = UCUtils.extractTableInfo(table, spark) + + assert(result.isPresent, "Should return table info") + val info = result.get() + // Each assertion uses the specific expected value - would fail if hardcoded + assert(info.getTableId == TABLE_ID_ALPHA, s"Table ID mismatch: got ${info.getTableId}") + assert( + info.getTablePath == TABLE_PATH_ALPHA, + s"Table path mismatch: got ${info.getTablePath}") + assert(info.getUcUri == UC_URI_ALPHA, s"UC URI mismatch: got ${info.getUcUri}") + assert(info.getUcToken == UC_TOKEN_ALPHA, s"UC token mismatch: got ${info.getUcToken}") + } + } + + test("selects correct catalog when multiple catalogs configured") { + // Use completely different values for each catalog to prove selection works + val catalogBeta = "uc_catalog_eastus_staging" + val ucUriBeta = "https://uc-server-eastus.example.net/api/2.1/uc" + val ucTokenBeta = "dapi_Yz3nQ$8wRt#1vXa_staging" + val tableIdBeta = "uc_tbl_staging_4d7e2f1a" + val tablePathBeta = "s3://staging-bucket-us-east/delta/tables/v2" + + val catalogGamma = "uc_catalog_euwest_dev" + val ucUriGamma = "https://uc-server-euwest.example.net/api/2.1/uc" + val ucTokenGamma = "dapi_Jk5pL$3mNq#9vBc_dev" + + // Table is in catalogBeta + val table = makeUCTable( + tableId = tableIdBeta, + tablePath = tablePathBeta, + catalogName = Some(catalogBeta)) + + val configs = Seq( + // catalogGamma config (should NOT be used) + s"spark.sql.catalog.$catalogGamma" -> UC_CATALOG_CONNECTOR, + s"spark.sql.catalog.$catalogGamma.uri" -> ucUriGamma, + s"spark.sql.catalog.$catalogGamma.token" -> ucTokenGamma, + // catalogBeta config (should be used) + s"spark.sql.catalog.$catalogBeta" -> UC_CATALOG_CONNECTOR, + s"spark.sql.catalog.$catalogBeta.uri" -> ucUriBeta, + s"spark.sql.catalog.$catalogBeta.token" -> ucTokenBeta) + val originalValues = configs.map { case (key, _) => key -> spark.conf.getOption(key) }.toMap + + try { + configs.foreach { case (key, value) => spark.conf.set(key, value) } + + val result = UCUtils.extractTableInfo(table, spark) + assert(result.isPresent, "Should return table info") + + val info = result.get() + // Verify it selected catalogBeta's config, not catalogGamma's + assert( + info.getUcUri == ucUriBeta, + s"Should use catalogBeta's URI, got: ${info.getUcUri}") + assert(info.getUcToken == ucTokenBeta, s"Should use catalogBeta's token, got: ${info.getUcToken}") + assert(info.getTableId == tableIdBeta, s"Should extract tableIdBeta, got: ${info.getTableId}") + assert( + info.getTablePath == tablePathBeta, + s"Should extract tablePathBeta, got: ${info.getTablePath}") + } finally { + configs.foreach { case (key, _) => + originalValues.get(key).flatten match { + case Some(v) => spark.conf.set(key, v) + case None => spark.conf.unset(key) + } + } + } + } +} diff --git a/kernel-spark/src/test/scala/io/delta/kernel/spark/utils/CatalogTableTestUtils.scala b/kernel-spark/src/test/scala/io/delta/kernel/spark/utils/CatalogTableTestUtils.scala index 4f80f346bb8..ea98efb54b5 100644 --- a/kernel-spark/src/test/scala/io/delta/kernel/spark/utils/CatalogTableTestUtils.scala +++ b/kernel-spark/src/test/scala/io/delta/kernel/spark/utils/CatalogTableTestUtils.scala @@ -15,8 +15,6 @@ */ package io.delta.kernel.spark.utils -import scala.collection.immutable.Seq - import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType} import org.apache.spark.sql.types.StructType @@ -30,58 +28,52 @@ import org.apache.spark.sql.types.StructType */ object CatalogTableTestUtils { - def catalogTableWithProperties( - properties: java.util.Map[String, String], - storageProperties: java.util.Map[String, String]): CatalogTable = { + /** + * Creates a [[CatalogTable]] with configurable options. + * + * @param tableName table name (default: "tbl") + * @param catalogName optional catalog name for the identifier + * @param properties table properties (default: empty) + * @param storageProperties storage properties (default: empty) + * @param locationUri optional storage location URI + * @param nullStorage if true, sets storage to null (for edge case testing) + * @param nullStorageProperties if true, sets storage properties to null + */ + def createCatalogTable( + tableName: String = "tbl", + catalogName: Option[String] = None, + properties: java.util.Map[String, String] = new java.util.HashMap[String, String](), + storageProperties: java.util.Map[String, String] = new java.util.HashMap[String, String](), + locationUri: Option[java.net.URI] = None, + nullStorage: Boolean = false, + nullStorageProperties: Boolean = false): CatalogTable = { + val scalaProps = ScalaUtils.toScalaMap(properties) - val scalaStorageProps = ScalaUtils.toScalaMap(storageProperties) + val scalaStorageProps = + if (nullStorageProperties) null else ScalaUtils.toScalaMap(storageProperties) - CatalogTable( - identifier = TableIdentifier("tbl"), - tableType = CatalogTableType.MANAGED, - storage = CatalogStorageFormat( - locationUri = None, + val identifier = catalogName match { + case Some(catalog) => + TableIdentifier(tableName, Some("default") /* database */, Some(catalog) /* catalog */ ) + case None => TableIdentifier(tableName) + } + + val storage = if (nullStorage) { + null + } else { + CatalogStorageFormat( + locationUri = locationUri, inputFormat = None, outputFormat = None, serde = None, compressed = false, - properties = scalaStorageProps), - schema = new StructType(), - provider = None, - partitionColumnNames = Seq.empty, - bucketSpec = None, - properties = scalaProps) - } - - def catalogTableWithNullStorage( - properties: java.util.Map[String, String]): CatalogTable = { - val scalaProps = ScalaUtils.toScalaMap(properties) - - CatalogTable( - identifier = TableIdentifier("tbl"), - tableType = CatalogTableType.MANAGED, - storage = null, - schema = new StructType(), - provider = None, - partitionColumnNames = Seq.empty, - bucketSpec = None, - properties = scalaProps) - } - - def catalogTableWithNullStorageProperties( - properties: java.util.Map[String, String]): CatalogTable = { - val scalaProps = ScalaUtils.toScalaMap(properties) + properties = scalaStorageProps) + } CatalogTable( - identifier = TableIdentifier("tbl"), + identifier = identifier, tableType = CatalogTableType.MANAGED, - storage = CatalogStorageFormat( - locationUri = None, - inputFormat = None, - outputFormat = None, - serde = None, - compressed = false, - properties = null), + storage = storage, schema = new StructType(), provider = None, partitionColumnNames = Seq.empty, diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/coordinatedcommits/UCCommitCoordinatorBuilder.scala b/spark/src/main/scala/org/apache/spark/sql/delta/coordinatedcommits/UCCommitCoordinatorBuilder.scala index 3faab141bf1..377214d45a4 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/coordinatedcommits/UCCommitCoordinatorBuilder.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/coordinatedcommits/UCCommitCoordinatorBuilder.scala @@ -229,6 +229,16 @@ object UCCommitCoordinatorBuilder .toList } + /** + * Returns catalog configurations as a Map for O(1) lookup by catalog name. + * Wraps [[getCatalogConfigs]] results in [[UCCatalogConfig]] for better readability. + */ + private[delta] def getCatalogConfigMap(spark: SparkSession): Map[String, UCCatalogConfig] = { + getCatalogConfigs(spark).map { + case (name, uri, token) => name -> UCCatalogConfig(name, uri, token) + }.toMap + } + private def safeClose(ucClient: UCClient, uri: String): Unit = { try { ucClient.close() @@ -252,3 +262,9 @@ object UCTokenBasedRestClientFactory extends UCClientFactory { override def createUCClient(uri: String, token: String): UCClient = new UCTokenBasedRestClient(uri, token) } + +/** + * Holder for Unity Catalog configuration extracted from Spark configs. + * Used by [[UCCommitCoordinatorBuilder.getCatalogConfigMap]]. + */ +case class UCCatalogConfig(catalogName: String, uri: String, token: String)