diff --git a/kernel-spark/src/main/java/io/delta/kernel/spark/catalog/SparkTable.java b/kernel-spark/src/main/java/io/delta/kernel/spark/catalog/SparkTable.java index e864e0b1f87..5b6d0391957 100644 --- a/kernel-spark/src/main/java/io/delta/kernel/spark/catalog/SparkTable.java +++ b/kernel-spark/src/main/java/io/delta/kernel/spark/catalog/SparkTable.java @@ -21,7 +21,8 @@ import io.delta.kernel.Snapshot; import io.delta.kernel.spark.read.SparkScanBuilder; import io.delta.kernel.spark.snapshot.DeltaSnapshotManager; -import io.delta.kernel.spark.snapshot.PathBasedSnapshotManager; +import io.delta.kernel.spark.snapshot.SnapshotManagerFactory; +import io.delta.kernel.spark.unity.UnityCatalogClientFactory; import io.delta.kernel.spark.utils.SchemaUtils; import java.util.*; import org.apache.hadoop.conf.Configuration; @@ -58,6 +59,7 @@ public class SparkTable implements Table, SupportsRead { private final Column[] columns; private final Transform[] partitionTransforms; private final Optional catalogTable; + private final Optional unityCatalogClient; /** * Creates a SparkTable from a filesystem path without a catalog table. @@ -118,6 +120,9 @@ private SparkTable( Optional catalogTable) { this.identifier = requireNonNull(identifier, "identifier is null"); this.catalogTable = catalogTable; + this.unityCatalogClient = + catalogTable.flatMap( + table -> UnityCatalogClientFactory.create(SparkSession.active(), identifier, table)); // Merge options: file system options from catalog + user options (user takes precedence) // This follows the same pattern as DeltaTableV2 in delta-spark Map merged = new HashMap<>(); @@ -138,7 +143,9 @@ private SparkTable( this.hadoopConf = SparkSession.active().sessionState().newHadoopConfWithOptions(toScalaMap(options)); - this.snapshotManager = new PathBasedSnapshotManager(tablePath, hadoopConf); + this.snapshotManager = + SnapshotManagerFactory.create( + identifier, tablePath, hadoopConf, this.catalogTable, this.unityCatalogClient); // Load the initial snapshot through the manager this.initialSnapshot = snapshotManager.loadLatestSnapshot(); this.schema = SchemaUtils.convertKernelSchemaToSparkSchema(initialSnapshot.getSchema()); @@ -197,6 +204,10 @@ public Optional getCatalogTable() { return catalogTable; } + public Optional getUnityCatalogClient() { + return unityCatalogClient; + } + @Override public String name() { return identifier.name(); diff --git a/kernel-spark/src/main/java/io/delta/kernel/spark/snapshot/SnapshotManagerFactory.java b/kernel-spark/src/main/java/io/delta/kernel/spark/snapshot/SnapshotManagerFactory.java new file mode 100644 index 00000000000..3bf2e2f6176 --- /dev/null +++ b/kernel-spark/src/main/java/io/delta/kernel/spark/snapshot/SnapshotManagerFactory.java @@ -0,0 +1,90 @@ +/* + * Copyright (2025) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.spark.snapshot; + +import static java.util.Objects.requireNonNull; + +import io.delta.kernel.spark.unity.UnityCatalogClientFactory; +import io.delta.kernel.spark.utils.CatalogTableUtils; +import java.util.Optional; +import org.apache.hadoop.conf.Configuration; +import org.apache.spark.sql.catalyst.catalog.CatalogTable; +import org.apache.spark.sql.connector.catalog.Identifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Factory responsible for instantiating {@link DeltaSnapshotManager} implementations. + * + *

The factory centralises the decision of whether a table should use the traditional + * filesystem-based snapshot manager or a catalog-backed implementation. Today all tables rely on + * {@link PathBasedSnapshotManager}. Catalog-managed support will be integrated in a subsequent + * change once the corresponding snapshot manager is implemented. + */ +public final class SnapshotManagerFactory { + + private static final Logger LOG = LoggerFactory.getLogger(SnapshotManagerFactory.class); + + private SnapshotManagerFactory() {} + + /** + * Creates an appropriate {@link DeltaSnapshotManager} based on the provided metadata. + * + * @param identifier Spark identifier for the table being resolved + * @param tablePath canonical filesystem path to the table root + * @param hadoopConf Hadoop configuration pre-populated with user options + * @param catalogTable optional Spark {@link CatalogTable} descriptor when available + * @param unityCatalogClient optional Unity Catalog client handle for catalog-managed tables + * @return a snapshot manager implementation ready to serve table snapshots + */ + public static DeltaSnapshotManager create( + Identifier identifier, + String tablePath, + Configuration hadoopConf, + Optional catalogTable, + Optional unityCatalogClient) { + + requireNonNull(identifier, "identifier is null"); + requireNonNull(tablePath, "tablePath is null"); + requireNonNull(hadoopConf, "hadoopConf is null"); + requireNonNull(catalogTable, "catalogTable optional is null"); + requireNonNull(unityCatalogClient, "unityCatalogClient optional is null"); + + if (catalogTable.isPresent() + && CatalogTableUtils.isUnityCatalogManagedTable(catalogTable.get()) + && unityCatalogClient.isPresent()) { + LOG.debug( + "Unity Catalog-managed table '{}' detected. Falling back to PathBasedSnapshotManager " + + "until catalog-managed support is wired.", + identifier); + } + + return new PathBasedSnapshotManager(tablePath, hadoopConf); + } + + /** + * Convenience overload for path-based tables without Spark catalog metadata. + * + * @param identifier Spark identifier for the table being resolved + * @param tablePath canonical filesystem path to the table root + * @param hadoopConf Hadoop configuration pre-populated with user options + * @return a {@link PathBasedSnapshotManager} instance + */ + public static DeltaSnapshotManager createForPath( + Identifier identifier, String tablePath, Configuration hadoopConf) { + return create(identifier, tablePath, hadoopConf, Optional.empty(), Optional.empty()); + } +} diff --git a/kernel-spark/src/main/java/io/delta/kernel/spark/unity/UnityCatalogClientFactory.java b/kernel-spark/src/main/java/io/delta/kernel/spark/unity/UnityCatalogClientFactory.java new file mode 100644 index 00000000000..6573b99caf6 --- /dev/null +++ b/kernel-spark/src/main/java/io/delta/kernel/spark/unity/UnityCatalogClientFactory.java @@ -0,0 +1,287 @@ +package io.delta.kernel.spark.unity; + +import static java.util.Objects.requireNonNull; + +import io.delta.kernel.spark.utils.CatalogTableUtils; +import io.delta.kernel.spark.utils.ScalaUtils; +import io.delta.storage.commit.uccommitcoordinator.UCClient; +import io.delta.storage.commit.uccommitcoordinator.UCTokenBasedRestClient; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.function.BiFunction; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.catalyst.catalog.CatalogTable; +import org.apache.spark.sql.connector.catalog.Identifier; + +/** + * Factory for constructing Unity Catalog clients from Spark session catalog configuration. + * + *

The logic mirrors the config resolution performed by {@code + * org.apache.spark.sql.delta.coordinatedcommits.UCCommitCoordinatorBuilder}, ensuring the connector + * honours the same semantics across V1 and Kernel-backed paths. + */ +public final class UnityCatalogClientFactory { + + private static final String SPARK_SQL_CATALOG_PREFIX = "spark.sql.catalog."; + private static final String URI_SUFFIX = "uri"; + private static final String TOKEN_SUFFIX = "token"; + private static final String WAREHOUSE_SUFFIX = "warehouse"; + private static final String UNITY_CATALOG_CONNECTOR_CLASS = + "io.unitycatalog.spark.UCSingleCatalog"; + + private UnityCatalogClientFactory() {} + + private static volatile BiFunction clientBuilder = + UnityCatalogClientFactory::defaultClientBuilder; + + /** + * Creates a Unity Catalog client for the provided catalog table if it is Unity Catalog managed. + * + * @param spark active Spark session + * @param identifier table identifier supplied by Spark DSv2 + * @param catalogTable catalog metadata for the table + * @return optional Unity Catalog client details; empty when the table is not UC managed + */ + public static Optional create( + SparkSession spark, Identifier identifier, CatalogTable catalogTable) { + requireNonNull(spark, "spark session is null"); + requireNonNull(identifier, "identifier is null"); + requireNonNull(catalogTable, "catalogTable is null"); + + if (!CatalogTableUtils.isUnityCatalogManagedTable(catalogTable)) { + return Optional.empty(); + } + + Map sparkConf = ScalaUtils.toJavaMap(spark.conf().getAll()); + + List unityCatalogs = collectUnityCatalogConfigs(sparkConf); + if (unityCatalogs.isEmpty()) { + throw new IllegalStateException( + "Unity Catalog table detected but no Unity Catalog connectors are configured."); + } + + Optional requestedCatalogName = extractCatalogName(identifier); + CatalogConfig selectedCatalog = + selectCatalogConfig(requestedCatalogName, unityCatalogs, identifier.name()); + + UCClient ucClient = clientBuilder.apply(selectedCatalog.uri, selectedCatalog.token); + UnityCatalogClient clientHandle = + new UnityCatalogClient(selectedCatalog.name, ucClient, selectedCatalog.warehouse); + return Optional.of(clientHandle); + } + + private static CatalogConfig selectCatalogConfig( + Optional requestedCatalogName, List unityCatalogs, String tableName) { + if (requestedCatalogName.isPresent()) { + Optional match = + unityCatalogs.stream() + .filter(config -> config.name.equalsIgnoreCase(requestedCatalogName.get())) + .findFirst(); + if (match.isPresent()) { + return match.get(); + } + if (unityCatalogs.size() == 1) { + return unityCatalogs.get(0); + } + throw new IllegalStateException( + String.format( + Locale.ROOT, + "Unable to locate Unity Catalog connector '%s' for table '%s'.", + requestedCatalogName.get(), + tableName)); + } + + if (unityCatalogs.size() == 1) { + return unityCatalogs.get(0); + } + + throw new IllegalStateException( + String.format( + Locale.ROOT, + "Multiple Unity Catalog connectors configured (%s) but table '%s' does not carry a " + + "catalog-qualified identifier.", + listCatalogNames(unityCatalogs), + tableName)); + } + + private static List collectUnityCatalogConfigs(Map sparkConf) { + Map catalogProperties = new HashMap<>(); + + for (Map.Entry entry : sparkConf.entrySet()) { + String key = entry.getKey(); + if (!key.startsWith(SPARK_SQL_CATALOG_PREFIX)) { + continue; + } + String remainder = key.substring(SPARK_SQL_CATALOG_PREFIX.length()); + int dotIndex = remainder.indexOf('.'); + String catalogName; + String propertyKey = null; + if (dotIndex == -1) { + catalogName = remainder; + } else { + catalogName = remainder.substring(0, dotIndex); + propertyKey = remainder.substring(dotIndex + 1); + } + + CatalogProperties properties = + catalogProperties.computeIfAbsent(catalogName, CatalogProperties::new); + if (propertyKey == null) { + properties.connectorClass = entry.getValue(); + } else if (URI_SUFFIX.equals(propertyKey)) { + properties.uri = entry.getValue(); + } else if (TOKEN_SUFFIX.equals(propertyKey)) { + properties.token = entry.getValue(); + } else if (WAREHOUSE_SUFFIX.equals(propertyKey)) { + properties.warehouse = Optional.ofNullable(entry.getValue()).filter(v -> !v.isEmpty()); + } + } + + List unityCatalogs = new ArrayList<>(); + for (CatalogProperties properties : catalogProperties.values()) { + if (!UNITY_CATALOG_CONNECTOR_CLASS.equals(properties.connectorClass)) { + continue; + } + + String uri = requireTrimmed(properties.uri, properties.name, URI_SUFFIX); + try { + new URI(uri); + } catch (URISyntaxException e) { + throw new IllegalStateException( + String.format( + Locale.ROOT, + "Invalid Unity Catalog URI '%s' configured for catalog '%s'.", + uri, + properties.name), + e); + } + + String token = requireTrimmed(properties.token, properties.name, TOKEN_SUFFIX); + unityCatalogs.add(new CatalogConfig(properties.name, uri, token, properties.warehouse)); + } + + return unityCatalogs; + } + + private static String listCatalogNames(List configs) { + List names = new ArrayList<>(configs.size()); + for (CatalogConfig config : configs) { + names.add(config.name); + } + Collections.sort(names, String.CASE_INSENSITIVE_ORDER); + return String.join(",", names); + } + + private static String requireTrimmed(String value, String catalogName, String propertySuffix) { + if (value == null) { + throw new IllegalStateException( + String.format( + Locale.ROOT, + "Missing Unity Catalog configuration '%s%s%s'.", + SPARK_SQL_CATALOG_PREFIX, + catalogName, + propertySuffix.isEmpty() ? "" : "." + propertySuffix)); + } + String trimmed = value.trim(); + if (trimmed.isEmpty()) { + throw new IllegalStateException( + String.format( + Locale.ROOT, + "Unity Catalog configuration '%s%s.%s' cannot be empty.", + SPARK_SQL_CATALOG_PREFIX, + catalogName, + propertySuffix)); + } + return trimmed; + } + + private static Optional extractCatalogName(Identifier identifier) { + String[] namespace = identifier.namespace(); + if (namespace != null && namespace.length > 0) { + return Optional.of(namespace[0]); + } + return Optional.empty(); + } + + /** Unity Catalog client handle containing additional metadata required by the connector. */ + public static final class UnityCatalogClient implements AutoCloseable { + private final String catalogName; + private final UCClient ucClient; + private final Optional warehouse; + + UnityCatalogClient(String catalogName, UCClient ucClient, Optional warehouse) { + this.catalogName = requireNonNull(catalogName, "catalogName is null"); + this.ucClient = requireNonNull(ucClient, "ucClient is null"); + this.warehouse = Objects.requireNonNullElseGet(warehouse, Optional::empty); + } + + public String getCatalogName() { + return catalogName; + } + + public UCClient getUcClient() { + return ucClient; + } + + public Optional getWarehouse() { + return warehouse; + } + + @Override + public void close() throws Exception { + ucClient.close(); + } + } + + private static final class CatalogProperties { + private final String name; + private String connectorClass; + private String uri; + private String token; + private Optional warehouse = Optional.empty(); + + private CatalogProperties(String name) { + this.name = name; + } + } + + private static final class CatalogConfig { + private final String name; + private final String uri; + private final String token; + private final Optional warehouse; + + private CatalogConfig(String name, String uri, String token, Optional warehouse) { + this.name = name; + this.uri = uri; + this.token = token; + this.warehouse = warehouse; + } + } + + /** Visible for testing: override the UC client builder. */ + public static void setClientBuilderForTesting(BiFunction builder) { + clientBuilder = requireNonNull(builder, "builder is null"); + } + + /** Visible for testing: reset the UC client builder to the default implementation. */ + public static void resetClientBuilderForTesting() { + clientBuilder = UnityCatalogClientFactory::defaultClientBuilder; + } + + private static UCClient defaultClientBuilder(String uri, String token) { + try { + return new UCTokenBasedRestClient(uri, token); + } catch (NoClassDefFoundError e) { + throw new IllegalStateException( + "Unity Catalog client requires Apache HttpClient dependencies on the classpath.", e); + } + } +} diff --git a/kernel-spark/src/main/java/io/delta/kernel/spark/utils/CatalogTableUtils.java b/kernel-spark/src/main/java/io/delta/kernel/spark/utils/CatalogTableUtils.java new file mode 100644 index 00000000000..d95f92473ad --- /dev/null +++ b/kernel-spark/src/main/java/io/delta/kernel/spark/utils/CatalogTableUtils.java @@ -0,0 +1,102 @@ +/* + * Copyright (2025) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.spark.utils; + +import static java.util.Objects.requireNonNull; + +import io.delta.storage.commit.uccommitcoordinator.UCCommitCoordinatorClient; +import java.util.Map; +import org.apache.spark.sql.catalyst.catalog.CatalogTable; + +/** + * Utility helpers for inspecting Delta-related metadata persisted on Spark {@link CatalogTable} + * instances by Unity Catalog. + * + *

Unity Catalog marks catalog-managed tables via feature flags stored in table storage + * properties. This helper centralises the logic for interpreting those properties so the Kernel + * connector can decide when to use catalog-owned (CCv2) behaviour. + * + *

    + *
  • {@link #isCatalogManaged(CatalogTable)} checks whether either {@code + * delta.feature.catalogManaged} or {@code delta.feature.catalogOwned-preview} is set to + * {@code supported}, signalling that a catalog manages the table. + *
  • {@link #isUnityCatalogManagedTable(CatalogTable)} additionally verifies the presence of the + * Unity Catalog table identifier ({@link UCCommitCoordinatorClient#UC_TABLE_ID_KEY}) to + * confirm that the table is backed by Unity Catalog. + *
+ */ +public final class CatalogTableUtils { + static final String FEATURE_CATALOG_MANAGED = "delta.feature.catalogManaged"; + static final String FEATURE_CATALOG_OWNED_PREVIEW = "delta.feature.catalogOwned-preview"; + private static final String SUPPORTED = "supported"; + + private CatalogTableUtils() {} + + /** + * Checks whether any catalog manages this table via CCv2 semantics. + * + * @param table Spark {@link CatalogTable} descriptor + * @return {@code true} when either catalog feature flag is set to {@code supported} + */ + public static boolean isCatalogManaged(CatalogTable table) { + requireNonNull(table, "table is null"); + Map storageProperties = getStorageProperties(table); + return isCatalogManagedFeatureEnabled(storageProperties, FEATURE_CATALOG_MANAGED) + || isCatalogManagedFeatureEnabled(storageProperties, FEATURE_CATALOG_OWNED_PREVIEW); + } + + /** + * Checks whether the table is Unity Catalog managed. + * + * @param table Spark {@link CatalogTable} descriptor + * @return {@code true} when the table is catalog managed and contains the UC identifier + */ + public static boolean isUnityCatalogManagedTable(CatalogTable table) { + requireNonNull(table, "table is null"); + Map storageProperties = getStorageProperties(table); + boolean isUCBacked = storageProperties.containsKey(UCCommitCoordinatorClient.UC_TABLE_ID_KEY); + return isUCBacked && isCatalogManaged(table); + } + + /** + * Checks whether the given feature key is enabled in the table properties. + * + * @param tableProperties The table properties + * @param featureKey The feature key + * @return {@code true} when the feature key is set to {@code supported} + */ + private static boolean isCatalogManagedFeatureEnabled( + Map tableProperties, String featureKey) { + requireNonNull(tableProperties, "tableProperties is null"); + requireNonNull(featureKey, "featureKey is null"); + String featureValue = tableProperties.get(featureKey); + if (featureValue == null) { + return false; + } + return featureValue.equalsIgnoreCase(SUPPORTED); + } + + /** + * Returns the catalog storage properties published with a {@link CatalogTable}. + * + * @param table Spark {@link CatalogTable} descriptor + * @return Java map view of the storage properties + */ + private static Map getStorageProperties(CatalogTable table) { + requireNonNull(table, "table is null"); + return ScalaUtils.toJavaMap(table.storage().properties()); + } +} diff --git a/kernel-spark/src/main/java/io/delta/kernel/spark/utils/ScalaUtils.java b/kernel-spark/src/main/java/io/delta/kernel/spark/utils/ScalaUtils.java index 3832dbd5fb3..39317f96b99 100644 --- a/kernel-spark/src/main/java/io/delta/kernel/spark/utils/ScalaUtils.java +++ b/kernel-spark/src/main/java/io/delta/kernel/spark/utils/ScalaUtils.java @@ -15,10 +15,12 @@ */ package io.delta.kernel.spark.utils; +import java.util.Collections; import java.util.Map; import scala.Tuple2; import scala.collection.immutable.Map$; import scala.collection.mutable.Builder; +import scala.jdk.javaapi.CollectionConverters; public final class ScalaUtils { public static scala.collection.immutable.Map toScalaMap( @@ -35,4 +37,12 @@ public static scala.collection.immutable.Map toScalaMap( } return b.result(); } + + public static Map toJavaMap( + scala.collection.immutable.Map scalaMap) { + if (scalaMap == null || scalaMap.isEmpty()) { + return Collections.emptyMap(); + } + return CollectionConverters.asJava(scalaMap); + } } diff --git a/kernel-spark/src/test/java/io/delta/kernel/spark/unity/UnityCatalogClientFactoryTest.java b/kernel-spark/src/test/java/io/delta/kernel/spark/unity/UnityCatalogClientFactoryTest.java new file mode 100644 index 00000000000..c6dc4d90c68 --- /dev/null +++ b/kernel-spark/src/test/java/io/delta/kernel/spark/unity/UnityCatalogClientFactoryTest.java @@ -0,0 +1,194 @@ +package io.delta.kernel.spark.unity; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import io.delta.kernel.spark.SparkDsv2TestBase; +import io.delta.storage.commit.Commit; +import io.delta.storage.commit.CommitFailedException; +import io.delta.storage.commit.GetCommitsResponse; +import io.delta.storage.commit.actions.AbstractMetadata; +import io.delta.storage.commit.actions.AbstractProtocol; +import io.delta.storage.commit.uccommitcoordinator.UCClient; +import io.delta.storage.commit.uccommitcoordinator.UCCommitCoordinatorClient; +import io.delta.storage.commit.uccommitcoordinator.UCCommitCoordinatorException; +import java.io.IOException; +import java.net.URI; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.catalyst.catalog.CatalogTable; +import org.apache.spark.sql.connector.catalog.Identifier; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; + +public class UnityCatalogClientFactoryTest extends SparkDsv2TestBase { + + private static final String UC_CATALOG = "main"; + + private final AtomicReference recordedUri = new AtomicReference<>(); + private final AtomicReference recordedToken = new AtomicReference<>(); + + @AfterEach + public void resetOverrides() { + SparkSession spark = SparkDsv2TestBase.spark; + unsetIfDefined(spark, confKey(UC_CATALOG)); + unsetIfDefined(spark, confKey(UC_CATALOG, "uri")); + unsetIfDefined(spark, confKey(UC_CATALOG, "token")); + unsetIfDefined(spark, confKey(UC_CATALOG, "warehouse")); + UnityCatalogClientFactory.resetClientBuilderForTesting(); + recordedToken.set(null); + recordedUri.set(null); + } + + @Test + public void createUnityCatalogClientFromConfigs() throws Exception { + SparkSession spark = SparkDsv2TestBase.spark; + spark.conf().set(confKey(UC_CATALOG), "io.unitycatalog.spark.UCSingleCatalog"); + spark.conf().set(confKey(UC_CATALOG, "uri"), "https://example.cloud.databricks.com"); + spark.conf().set(confKey(UC_CATALOG, "token"), "dapi123"); + spark.conf().set(confKey(UC_CATALOG, "warehouse"), "/mnt/warehouse"); + + UnityCatalogClientFactory.setClientBuilderForTesting( + (uri, token) -> { + recordedUri.set(uri); + recordedToken.set(token); + return new RecordingUCClient(); + }); + + CatalogTable catalogTable = buildUnityCatalogTable("table-123"); + Identifier identifier = Identifier.of(new String[] {UC_CATALOG, "default"}, "tbl"); + + Optional clientOpt = + UnityCatalogClientFactory.create(spark, identifier, catalogTable); + + assertTrue(clientOpt.isPresent(), "Expected Unity Catalog client to be created"); + UnityCatalogClientFactory.UnityCatalogClient client = clientOpt.get(); + assertEquals(UC_CATALOG, client.getCatalogName()); + assertEquals(Optional.of("/mnt/warehouse"), client.getWarehouse()); + assertEquals("https://example.cloud.databricks.com", recordedUri.get()); + assertEquals("dapi123", recordedToken.get()); + + client.close(); + } + + @Test + public void missingTokenConfigurationThrows() { + SparkSession spark = SparkDsv2TestBase.spark; + spark.conf().set(confKey(UC_CATALOG), "io.unitycatalog.spark.UCSingleCatalog"); + spark.conf().set(confKey(UC_CATALOG, "uri"), "https://example.cloud.databricks.com"); + + CatalogTable catalogTable = buildUnityCatalogTable("table-456"); + Identifier identifier = Identifier.of(new String[] {UC_CATALOG, "default"}, "tbl"); + + IllegalStateException thrown = + assertThrows( + IllegalStateException.class, + () -> UnityCatalogClientFactory.create(spark, identifier, catalogTable)); + + assertTrue( + thrown.getMessage().contains("token"), + "Exception message should mention missing token configuration"); + } + + @Test + public void usesSoleCatalogWhenIdentifierUnqualified() throws Exception { + SparkSession spark = SparkDsv2TestBase.spark; + spark.conf().set(confKey(UC_CATALOG), "io.unitycatalog.spark.UCSingleCatalog"); + spark.conf().set(confKey(UC_CATALOG, "uri"), "https://example.cloud.databricks.com"); + spark.conf().set(confKey(UC_CATALOG, "token"), "token-789"); + + UnityCatalogClientFactory.setClientBuilderForTesting( + (uri, token) -> { + recordedUri.set(uri); + recordedToken.set(token); + return new RecordingUCClient(); + }); + + CatalogTable catalogTable = buildUnityCatalogTable("table-789"); + Identifier identifier = Identifier.of(new String[] {"default"}, "tbl"); + + Optional clientOpt = + UnityCatalogClientFactory.create(spark, identifier, catalogTable); + + assertTrue(clientOpt.isPresent()); + assertEquals("https://example.cloud.databricks.com", recordedUri.get()); + assertEquals("token-789", recordedToken.get()); + clientOpt.get().close(); + } + + @Test + public void returnsEmptyWhenTableNotUnityCatalogManaged() { + SparkSession spark = SparkDsv2TestBase.spark; + CatalogTable catalogTable = + io.delta.kernel.spark.utils.CatalogTableTestUtils$.MODULE$.catalogTableWithProperties( + new HashMap<>(), new HashMap<>()); + Identifier identifier = Identifier.of(new String[] {"default"}, "tbl"); + + Optional clientOpt = + UnityCatalogClientFactory.create(spark, identifier, catalogTable); + + assertFalse(clientOpt.isPresent()); + } + + private static CatalogTable buildUnityCatalogTable(String tableId) { + Map storageProps = new HashMap<>(); + storageProps.put("delta.feature.catalogManaged", "supported"); + storageProps.put(UCCommitCoordinatorClient.UC_TABLE_ID_KEY, tableId); + return io.delta.kernel.spark.utils.CatalogTableTestUtils$.MODULE$.catalogTableWithProperties( + new HashMap<>(), storageProps); + } + + private static String confKey(String catalog) { + return "spark.sql.catalog." + catalog; + } + + private static String confKey(String catalog, String suffix) { + return confKey(catalog) + "." + suffix; + } + + private static void unsetIfDefined(SparkSession spark, String key) { + try { + spark.conf().unset(key); + } catch (IllegalArgumentException ignored) { + // Config was not set; nothing to cleanup. + } + } + + private static final class RecordingUCClient implements UCClient { + + @Override + public String getMetastoreId() { + return "test-metastore"; + } + + @Override + public void commit( + String tableId, + URI tableUri, + Optional commit, + Optional lastKnownBackfilledVersion, + boolean disown, + Optional newMetadata, + Optional newProtocol) + throws IOException, CommitFailedException, UCCommitCoordinatorException { + throw new UnsupportedOperationException("Not implemented in test stub"); + } + + @Override + public GetCommitsResponse getCommits( + String tableId, URI tableUri, Optional startVersion, Optional endVersion) + throws IOException, UCCommitCoordinatorException { + throw new UnsupportedOperationException("Not implemented in test stub"); + } + + @Override + public void close() { + // no-op + } + } +} diff --git a/kernel-spark/src/test/java/io/delta/kernel/spark/utils/CatalogTableUtilsTest.java b/kernel-spark/src/test/java/io/delta/kernel/spark/utils/CatalogTableUtilsTest.java new file mode 100644 index 00000000000..a3b2f4a59f6 --- /dev/null +++ b/kernel-spark/src/test/java/io/delta/kernel/spark/utils/CatalogTableUtilsTest.java @@ -0,0 +1,103 @@ +/* + * Copyright (2025) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.spark.utils; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import io.delta.storage.commit.uccommitcoordinator.UCCommitCoordinatorClient; +import java.util.Collections; +import java.util.Map; +import org.apache.spark.sql.catalyst.catalog.CatalogTable; +import org.junit.jupiter.api.Test; + +/** Tests for {@link CatalogTableUtils}. */ +class CatalogTableUtilsTest { + + @Test + void testIsCatalogManaged_CatalogFlagEnabled_ReturnsTrue() { + CatalogTable table = + catalogTable( + Collections.emptyMap(), Map.of(CatalogTableUtils.FEATURE_CATALOG_MANAGED, "supported")); + + assertTrue( + CatalogTableUtils.isCatalogManaged(table), "Catalog-managed flag should enable detection"); + } + + @Test + void testIsCatalogManaged_PreviewFlagEnabled_ReturnsTrue() { + CatalogTable table = + catalogTable( + Collections.emptyMap(), + Map.of(CatalogTableUtils.FEATURE_CATALOG_OWNED_PREVIEW, "SuPpOrTeD")); + + assertTrue( + CatalogTableUtils.isCatalogManaged(table), + "Preview flag should enable detection ignoring case"); + } + + @Test + void testIsCatalogManaged_NoFlags_ReturnsFalse() { + CatalogTable table = catalogTable(Collections.emptyMap(), Collections.emptyMap()); + + assertFalse( + CatalogTableUtils.isCatalogManaged(table), "No catalog flags should disable detection"); + } + + @Test + void testIsUnityCatalogManaged_FlagAndIdPresent_ReturnsTrue() { + CatalogTable table = + catalogTable( + Collections.emptyMap(), + Map.of( + CatalogTableUtils.FEATURE_CATALOG_MANAGED, + "supported", + UCCommitCoordinatorClient.UC_TABLE_ID_KEY, + "abc-123")); + + assertTrue( + CatalogTableUtils.isUnityCatalogManagedTable(table), + "Unity Catalog detection should require flag and identifier"); + } + + @Test + void testIsUnityCatalogManaged_MissingId_ReturnsFalse() { + CatalogTable table = + catalogTable( + Collections.emptyMap(), Map.of(CatalogTableUtils.FEATURE_CATALOG_MANAGED, "supported")); + + assertFalse( + CatalogTableUtils.isUnityCatalogManagedTable(table), + "Missing table identifier should break Unity detection"); + } + + @Test + void testIsUnityCatalogManaged_PreviewFlagMissingId_ReturnsFalse() { + CatalogTable table = + catalogTable( + Collections.emptyMap(), + Map.of(CatalogTableUtils.FEATURE_CATALOG_OWNED_PREVIEW, "supported")); + + assertFalse( + CatalogTableUtils.isUnityCatalogManagedTable(table), + "Preview flag without ID should not be considered Unity managed"); + } + + private static CatalogTable catalogTable( + Map properties, Map storageProperties) { + return CatalogTableTestUtils$.MODULE$.catalogTableWithProperties(properties, storageProperties); + } +} diff --git a/kernel-spark/src/test/scala/io/delta/kernel/spark/utils/CatalogTableTestUtils.scala b/kernel-spark/src/test/scala/io/delta/kernel/spark/utils/CatalogTableTestUtils.scala new file mode 100644 index 00000000000..05a1ae39357 --- /dev/null +++ b/kernel-spark/src/test/scala/io/delta/kernel/spark/utils/CatalogTableTestUtils.scala @@ -0,0 +1,55 @@ +/* + * Copyright (2025) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.spark.utils + +import scala.collection.immutable.Seq + +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType} +import org.apache.spark.sql.types.StructType + +/** + * Helpers for constructing [[CatalogTable]] instances inside Java tests. + * + * Spark's [[CatalogTable]] is defined in Scala and its constructor signature shifts between Spark + * releases. Centralising the construction in Scala keeps the kernel tests insulated from those + * binary changes and saves Java tests from manually wiring the many optional parameters. + */ +object CatalogTableTestUtils { + + def catalogTableWithProperties( + properties: java.util.Map[String, String], + storageProperties: java.util.Map[String, String]): CatalogTable = { + val scalaProps = ScalaUtils.toScalaMap(properties) + val scalaStorageProps = ScalaUtils.toScalaMap(storageProperties) + + CatalogTable( + identifier = TableIdentifier("tbl"), + tableType = CatalogTableType.MANAGED, + storage = CatalogStorageFormat( + locationUri = None, + inputFormat = None, + outputFormat = None, + serde = None, + compressed = false, + properties = scalaStorageProps), + schema = new StructType(), + provider = None, + partitionColumnNames = Seq.empty, + bucketSpec = None, + properties = scalaProps) + } +} diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/VacuumCommand.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/VacuumCommand.scala index 91345e99724..83c2dc90e5d 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/VacuumCommand.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/VacuumCommand.scala @@ -181,6 +181,10 @@ object VacuumCommand extends VacuumCommandImpl with Serializable { throw DeltaErrors.deltaCannotVacuumManagedTable() } + // By default, we will do full vacuum unless LITE vacuum conf is set + val isLiteVacuumEnabled = spark.sessionState.conf.getConf(DeltaSQLConf.LITE_VACUUM_ENABLED) + val defaultType = if (isLiteVacuumEnabled) VacuumType.LITE else VacuumType.FULL + val vacuumType = vacuumTypeOpt.map(VacuumType.withName).getOrElse(defaultType) val snapshotTombstoneRetentionMillis = DeltaLog.tombstoneRetentionMillis(snapshot.metadata) val retentionMillis = retentionHours.flatMap { h => @@ -235,10 +239,6 @@ object VacuumCommand extends VacuumCommandImpl with Serializable { val partitionColumns = snapshot.metadata.partitionSchema.fieldNames val parallelism = spark.sessionState.conf.parallelPartitionDiscoveryParallelism val shouldIcebergMetadataDirBeHidden = UniversalFormat.icebergEnabled(snapshot.metadata) - // By default, we will do full vacuum unless LITE vacuum conf is set - val isLiteVacuumEnabled = spark.sessionState.conf.getConf(DeltaSQLConf.LITE_VACUUM_ENABLED) - val defaultType = if (isLiteVacuumEnabled) VacuumType.LITE else VacuumType.FULL - val vacuumType = vacuumTypeOpt.map(VacuumType.withName).getOrElse(defaultType) val latestCommitVersionOutsideOfRetentionWindowOpt: Option[Long] = if (vacuumType == VacuumType.LITE) { try {