diff --git a/kernel-spark/src/main/java/io/delta/kernel/spark/snapshot/DeltaSnapshotManagerFactory.java b/kernel-spark/src/main/java/io/delta/kernel/spark/snapshot/DeltaSnapshotManagerFactory.java new file mode 100644 index 00000000000..8770982e4e7 --- /dev/null +++ b/kernel-spark/src/main/java/io/delta/kernel/spark/snapshot/DeltaSnapshotManagerFactory.java @@ -0,0 +1,93 @@ +/* + * Copyright (2025) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.spark.snapshot; + +import static java.util.Objects.requireNonNull; + +import org.apache.hadoop.conf.Configuration; +import org.apache.spark.annotation.Experimental; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.catalyst.catalog.CatalogTable; + +/** + * Factory for creating {@link DeltaSnapshotManager} instances. + * + *

This factory provides two creation methods: + * + *

+ * + *

Example usage: + * + *

{@code
+ * // For path-based tables
+ * DeltaSnapshotManager manager = DeltaSnapshotManagerFactory.fromPath(
+ *     tablePath,
+ *     hadoopConf
+ * );
+ *
+ * // For catalog tables
+ * DeltaSnapshotManager manager = DeltaSnapshotManagerFactory.fromCatalogTable(
+ *     catalogTable,
+ *     spark,
+ *     hadoopConf
+ * );
+ * }
+ */ +@Experimental +public final class DeltaSnapshotManagerFactory { + + // Utility class - no instances + private DeltaSnapshotManagerFactory() {} + + /** + * Creates a path-based snapshot manager for filesystem Delta tables. + * + *

Use this when no catalog metadata is available or when you want to work directly with a + * filesystem path. + * + * @param tablePath filesystem path to the Delta table root + * @param hadoopConf Hadoop configuration for the Delta Kernel engine + * @return PathBasedSnapshotManager instance + * @throws NullPointerException if tablePath or hadoopConf is null + */ + public static DeltaSnapshotManager fromPath(String tablePath, Configuration hadoopConf) { + requireNonNull(tablePath, "tablePath is null"); + requireNonNull(hadoopConf, "hadoopConf is null"); + return new PathBasedSnapshotManager(tablePath, hadoopConf); + } + + /** + * Creates a snapshot manager from catalog table metadata. + * + *

Wire-up is intentionally deferred; this skeleton method currently throws until implemented + * in a follow-up PR. + * + * @throws UnsupportedOperationException always, until UC wiring is added + */ + public static DeltaSnapshotManager fromCatalogTable( + CatalogTable catalogTable, SparkSession spark, Configuration hadoopConf) { + requireNonNull(catalogTable, "catalogTable is null"); + requireNonNull(spark, "spark is null"); + requireNonNull(hadoopConf, "hadoopConf is null"); + throw new UnsupportedOperationException("UC catalog wiring not implemented in skeleton"); + } +} diff --git a/kernel-spark/src/main/java/io/delta/kernel/spark/snapshot/ManagedCatalogAdapter.java b/kernel-spark/src/main/java/io/delta/kernel/spark/snapshot/ManagedCatalogAdapter.java new file mode 100644 index 00000000000..05743a3fe5d --- /dev/null +++ b/kernel-spark/src/main/java/io/delta/kernel/spark/snapshot/ManagedCatalogAdapter.java @@ -0,0 +1,81 @@ +/* + * Copyright (2025) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.spark.snapshot; + +import io.delta.storage.commit.GetCommitsResponse; +import java.util.Optional; + +/** + * Adapter interface for catalog-managed Delta tables. + * + *

This is a thin, protocol-aligned interface that adapters implement to fetch commit metadata + * from a catalog's commit coordinator API. Adapters are responsible only for communication with the + * catalog - they don't know about Delta snapshots or Kernel internals. + * + *

The {@link CatalogManagedSnapshotManager} uses this interface to retrieve commits and then + * builds Delta snapshots and commit ranges using Kernel's TableManager APIs. + * + *

Implementations should be catalog-specific (e.g., UnityCatalogAdapter, GlueCatalogAdapter) but + * share this common interface so the snapshot manager can work with any catalog. + */ +public interface ManagedCatalogAdapter extends AutoCloseable { + + /** + * Returns the unique identifier for this table in the catalog. + * + * @return the catalog-assigned table identifier + */ + String getTableId(); + + /** + * Returns the storage path for this table. + * + * @return the filesystem path to the Delta table root + */ + String getTablePath(); + + /** + * Retrieves commits from the catalog's commit coordinator. + * + *

This is the primary method that adapters must implement. It calls the catalog's API to get + * the list of ratified commits within the specified version range. + * + * @param startVersion the starting version (inclusive), typically 0 for initial load + * @param endVersion optional ending version (inclusive); if empty, returns up to latest + * @return response containing the list of commits and the latest ratified table version + */ + GetCommitsResponse getCommits(long startVersion, Optional endVersion); + + /** + * Returns the latest ratified table version from the catalog. + * + *

For catalog-managed tables, this is the highest version that has been successfully ratified + * by the catalog coordinator. Returns -1 if the catalog hasn't registered any commits yet (which + * can happen when version 0 exists but hasn't been ratified). + * + *

Default implementation calls {@link #getCommits} with no end version and extracts the latest + * version from the response. Implementations may override for efficiency if the catalog provides + * a dedicated API. + * + * @return the latest version ratified by the catalog, or -1 if none registered + */ + default long getLatestRatifiedVersion() { + return getCommits(0, Optional.empty()).getLatestTableVersion(); + } + + @Override + void close(); +} diff --git a/kernel-spark/src/main/java/io/delta/kernel/spark/snapshot/unitycatalog/UCTableInfo.java b/kernel-spark/src/main/java/io/delta/kernel/spark/snapshot/unitycatalog/UCTableInfo.java new file mode 100644 index 00000000000..821f79656e6 --- /dev/null +++ b/kernel-spark/src/main/java/io/delta/kernel/spark/snapshot/unitycatalog/UCTableInfo.java @@ -0,0 +1,54 @@ +/* + * Copyright (2025) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.spark.snapshot.unitycatalog; + +import static java.util.Objects.requireNonNull; + +/** + * Table information for Unity Catalog managed tables. + * + *

This POJO encapsulates all the information needed to interact with a Unity Catalog table + * without requiring Spark dependencies. + */ +public final class UCTableInfo { + private final String tableId; + private final String tablePath; + private final String ucUri; + private final String ucToken; + + public UCTableInfo(String tableId, String tablePath, String ucUri, String ucToken) { + this.tableId = requireNonNull(tableId, "tableId is null"); + this.tablePath = requireNonNull(tablePath, "tablePath is null"); + this.ucUri = requireNonNull(ucUri, "ucUri is null"); + this.ucToken = requireNonNull(ucToken, "ucToken is null"); + } + + public String getTableId() { + return tableId; + } + + public String getTablePath() { + return tablePath; + } + + public String getUcUri() { + return ucUri; + } + + public String getUcToken() { + return ucToken; + } +} diff --git a/kernel-spark/src/main/java/io/delta/kernel/spark/snapshot/unitycatalog/UCUtils.java b/kernel-spark/src/main/java/io/delta/kernel/spark/snapshot/unitycatalog/UCUtils.java new file mode 100644 index 00000000000..5b89fdf5f8f --- /dev/null +++ b/kernel-spark/src/main/java/io/delta/kernel/spark/snapshot/unitycatalog/UCUtils.java @@ -0,0 +1,113 @@ +/* + * Copyright (2025) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.spark.snapshot.unitycatalog; + +import static java.util.Objects.requireNonNull; + +import io.delta.kernel.spark.utils.CatalogTableUtils; +import io.delta.storage.commit.uccommitcoordinator.UCCommitCoordinatorClient; +import java.util.Map; +import java.util.Optional; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.catalyst.catalog.CatalogTable; +import org.apache.spark.sql.delta.coordinatedcommits.UCCatalogConfig; +import org.apache.spark.sql.delta.coordinatedcommits.UCCommitCoordinatorBuilder$; + +/** + * Utility class for extracting Unity Catalog table information from Spark catalog metadata. + * + *

This class isolates Spark dependencies, allowing {@link UCManagedSnapshotManager} to be + * created without Spark if table info is provided directly via {@link UCTableInfo}. + */ +public final class UCUtils { + + // Utility class - no instances + private UCUtils() {} + + /** + * Extracts Unity Catalog table information from Spark catalog table metadata. + * + * @param catalogTable Spark catalog table metadata + * @param spark SparkSession for resolving Unity Catalog configurations + * @return table info if table is UC-managed, empty otherwise + * @throws IllegalArgumentException if table is UC-managed but configuration is invalid + */ + public static Optional extractTableInfo( + CatalogTable catalogTable, SparkSession spark) { + requireNonNull(catalogTable, "catalogTable is null"); + requireNonNull(spark, "spark is null"); + + if (!CatalogTableUtils.isUnityCatalogManagedTable(catalogTable)) { + return Optional.empty(); + } + + String tableId = extractUCTableId(catalogTable); + String tablePath = extractTablePath(catalogTable); + + // Get catalog name - require explicit catalog in identifier + scala.Option catalogOption = catalogTable.identifier().catalog(); + if (catalogOption.isEmpty()) { + throw new IllegalArgumentException( + "Unable to determine Unity Catalog for table " + + catalogTable.identifier() + + ": catalog name is missing. Use a fully-qualified table name with an explicit " + + "catalog (e.g., catalog.schema.table)."); + } + String catalogName = catalogOption.get(); + + // Get UC endpoint and token from Spark configs + scala.collection.immutable.Map ucConfigs = + UCCommitCoordinatorBuilder$.MODULE$.getCatalogConfigMap(spark); + + scala.Option configOpt = ucConfigs.get(catalogName); + + if (configOpt.isEmpty()) { + throw new IllegalArgumentException( + "Cannot create UC client for table " + + catalogTable.identifier() + + ": Unity Catalog configuration not found for catalog '" + + catalogName + + "'."); + } + + UCCatalogConfig config = configOpt.get(); + String ucUri = config.uri(); + String ucToken = config.token(); + + return Optional.of(new UCTableInfo(tableId, tablePath, ucUri, ucToken)); + } + + private static String extractUCTableId(CatalogTable catalogTable) { + Map storageProperties = + scala.jdk.javaapi.CollectionConverters.asJava(catalogTable.storage().properties()); + + // TODO: UC constants should be consolidated in a shared location (future PR) + String ucTableId = storageProperties.get(UCCommitCoordinatorClient.UC_TABLE_ID_KEY); + if (ucTableId == null || ucTableId.isEmpty()) { + throw new IllegalArgumentException( + "Cannot extract ucTableId from table " + catalogTable.identifier()); + } + return ucTableId; + } + + private static String extractTablePath(CatalogTable catalogTable) { + if (catalogTable.location() == null) { + throw new IllegalArgumentException( + "Cannot extract table path: location is null for table " + catalogTable.identifier()); + } + return catalogTable.location().toString(); + } +} diff --git a/kernel-spark/src/main/java/io/delta/kernel/spark/snapshot/unitycatalog/UnityCatalogAdapter.java b/kernel-spark/src/main/java/io/delta/kernel/spark/snapshot/unitycatalog/UnityCatalogAdapter.java new file mode 100644 index 00000000000..74ded81efe8 --- /dev/null +++ b/kernel-spark/src/main/java/io/delta/kernel/spark/snapshot/unitycatalog/UnityCatalogAdapter.java @@ -0,0 +1,110 @@ +/* + * Copyright (2025) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.spark.snapshot.unitycatalog; + +import static java.util.Objects.requireNonNull; + +import io.delta.kernel.spark.snapshot.ManagedCatalogAdapter; +import io.delta.storage.commit.GetCommitsResponse; +import java.util.Optional; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.catalyst.catalog.CatalogTable; + +/** + * Unity Catalog implementation of {@link ManagedCatalogAdapter}. + * + *

This adapter is responsible only for fetching commit metadata from Unity Catalog's commit + * coordinator API. It does not contain any Delta/Kernel snapshot building logic - that + * responsibility belongs to the snapshot manager layer. + * + *

Methods are stubbed in this wireframe PR and will be implemented in a follow-up once UC + * operations are enabled. + */ +public final class UnityCatalogAdapter implements ManagedCatalogAdapter { + + private final String tableId; + private final String tablePath; + private final String endpoint; + private final String token; + + public UnityCatalogAdapter(String tableId, String tablePath, String endpoint, String token) { + this.tableId = requireNonNull(tableId, "tableId is null"); + this.tablePath = requireNonNull(tablePath, "tablePath is null"); + this.endpoint = requireNonNull(endpoint, "endpoint is null"); + this.token = requireNonNull(token, "token is null"); + } + + /** + * Creates adapter from Spark catalog table. + * + *

Extracts UC connection info from Spark metadata and creates the adapter. + * + * @param catalogTable the catalog table metadata + * @param spark the active SparkSession + * @return Optional containing the adapter if this is a UC-managed table, empty otherwise + */ + public static Optional fromCatalog( + CatalogTable catalogTable, SparkSession spark) { + requireNonNull(catalogTable, "catalogTable is null"); + requireNonNull(spark, "spark is null"); + + return SparkUnityCatalogUtils.extractConnectionInfo(catalogTable, spark) + .map(UnityCatalogAdapter::fromConnectionInfo); + } + + /** Creates adapter from connection info (no Spark dependency). */ + public static ManagedCatalogAdapter fromConnectionInfo(UnityCatalogConnectionInfo info) { + requireNonNull(info, "info is null"); + return new UnityCatalogAdapter( + info.getTableId(), info.getTablePath(), info.getEndpoint(), info.getToken()); + } + + @Override + public String getTableId() { + return tableId; + } + + @Override + public String getTablePath() { + return tablePath; + } + + /** Returns the UC endpoint URL. */ + public String getEndpoint() { + return endpoint; + } + + /** Returns the UC authentication token. */ + public String getToken() { + return token; + } + + @Override + public GetCommitsResponse getCommits(long startVersion, Optional endVersion) { + requireNonNull(endVersion, "endVersion is null"); + throw new UnsupportedOperationException("UC getCommits not implemented yet"); + } + + @Override + public long getLatestRatifiedVersion() { + throw new UnsupportedOperationException("UC getLatestRatifiedVersion not implemented yet"); + } + + @Override + public void close() { + // no-op in wireframe; will close UCClient in implementation + } +} diff --git a/kernel-spark/src/test/java/io/delta/kernel/spark/snapshot/unitycatalog/UCTableInfoTest.java b/kernel-spark/src/test/java/io/delta/kernel/spark/snapshot/unitycatalog/UCTableInfoTest.java new file mode 100644 index 00000000000..4e30caf91b9 --- /dev/null +++ b/kernel-spark/src/test/java/io/delta/kernel/spark/snapshot/unitycatalog/UCTableInfoTest.java @@ -0,0 +1,40 @@ +/* + * Copyright (2025) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.spark.snapshot.unitycatalog; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import org.junit.jupiter.api.Test; + +/** Tests for {@link UCTableInfo}. */ +class UCTableInfoTest { + + @Test + void testConstructor_ValidInputs_StoresAllFields() { + // Use distinctive values that would fail if implementation had hardcoded defaults + String tableId = "uc_tbl_7f3a9b2c-e8d1-4f6a"; + String tablePath = "abfss://container@acct.dfs.core.windows.net/delta/v2"; + String ucUri = "https://uc-server.example.net/api/2.1/uc"; + String ucToken = "dapi_Kx9mN$2pQr#7vWz"; + + UCTableInfo info = new UCTableInfo(tableId, tablePath, ucUri, ucToken); + + assertEquals(tableId, info.getTableId(), "Table ID should be stored correctly"); + assertEquals(tablePath, info.getTablePath(), "Table path should be stored correctly"); + assertEquals(ucUri, info.getUcUri(), "UC URI should be stored correctly"); + assertEquals(ucToken, info.getUcToken(), "UC token should be stored correctly"); + } +} diff --git a/kernel-spark/src/test/java/io/delta/kernel/spark/utils/CatalogTableUtilsTest.java b/kernel-spark/src/test/java/io/delta/kernel/spark/utils/CatalogTableUtilsTest.java index 24e6593890a..86a52ced315 100644 --- a/kernel-spark/src/test/java/io/delta/kernel/spark/utils/CatalogTableUtilsTest.java +++ b/kernel-spark/src/test/java/io/delta/kernel/spark/utils/CatalogTableUtilsTest.java @@ -16,14 +16,15 @@ package io.delta.kernel.spark.utils; import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import io.delta.storage.commit.uccommitcoordinator.UCCommitCoordinatorClient; import java.util.Collections; +import java.util.HashMap; import java.util.Map; import org.apache.spark.sql.catalyst.catalog.CatalogTable; import org.junit.jupiter.api.Test; +import scala.Option; /** Tests for {@link CatalogTableUtils}. */ class CatalogTableUtilsTest { @@ -97,22 +98,6 @@ void testIsUnityCatalogManaged_PreviewFlagMissingId_ReturnsFalse() { "Preview flag without ID should not be considered Unity managed"); } - @Test - void testIsCatalogManaged_NullTable_ThrowsException() { - assertThrows( - NullPointerException.class, - () -> CatalogTableUtils.isCatalogManaged(null), - "Null table should throw NullPointerException"); - } - - @Test - void testIsUnityCatalogManaged_NullTable_ThrowsException() { - assertThrows( - NullPointerException.class, - () -> CatalogTableUtils.isUnityCatalogManagedTable(null), - "Null table should throw NullPointerException"); - } - @Test void testIsCatalogManaged_NullStorage_ReturnsFalse() { CatalogTable table = catalogTableWithNullStorage(Collections.emptyMap()); @@ -151,15 +136,36 @@ void testIsUnityCatalogManaged_NullStorageProperties_ReturnsFalse() { private static CatalogTable catalogTable( Map properties, Map storageProperties) { - return CatalogTableTestUtils$.MODULE$.catalogTableWithProperties(properties, storageProperties); + return CatalogTableTestUtils$.MODULE$.createCatalogTable( + "tbl" /* tableName */, + Option.empty() /* catalogName */, + properties, + storageProperties, + Option.empty() /* locationUri */, + false /* nullStorage */, + false /* nullStorageProperties */); } private static CatalogTable catalogTableWithNullStorage(Map properties) { - return CatalogTableTestUtils$.MODULE$.catalogTableWithNullStorage(properties); + return CatalogTableTestUtils$.MODULE$.createCatalogTable( + "tbl" /* tableName */, + Option.empty() /* catalogName */, + properties, + new HashMap<>() /* storageProperties */, + Option.empty() /* locationUri */, + true /* nullStorage */, + false /* nullStorageProperties */); } private static CatalogTable catalogTableWithNullStorageProperties( Map properties) { - return CatalogTableTestUtils$.MODULE$.catalogTableWithNullStorageProperties(properties); + return CatalogTableTestUtils$.MODULE$.createCatalogTable( + "tbl" /* tableName */, + Option.empty() /* catalogName */, + properties, + new HashMap<>() /* storageProperties */, + Option.empty() /* locationUri */, + false /* nullStorage */, + true /* nullStorageProperties */); } } diff --git a/kernel-spark/src/test/scala/io/delta/kernel/spark/snapshot/DeltaSnapshotManagerFactorySuite.scala b/kernel-spark/src/test/scala/io/delta/kernel/spark/snapshot/DeltaSnapshotManagerFactorySuite.scala new file mode 100644 index 00000000000..b94788ae826 --- /dev/null +++ b/kernel-spark/src/test/scala/io/delta/kernel/spark/snapshot/DeltaSnapshotManagerFactorySuite.scala @@ -0,0 +1,71 @@ +/* + * Copyright (2025) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.spark.snapshot + +import java.net.URI + +import io.delta.kernel.spark.utils.CatalogTableTestUtils + +import org.apache.hadoop.conf.Configuration +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.test.SharedSparkSession + +/** Tests for [[DeltaSnapshotManagerFactory]]. */ +class DeltaSnapshotManagerFactorySuite extends SparkFunSuite with SharedSparkSession { + + private def createTestCatalogTable() = { + CatalogTableTestUtils.createCatalogTable(locationUri = Some(new URI("s3://test/path"))) + } + + test("fromPath throws NullPointerException for null tablePath") { + assertThrows[NullPointerException] { + DeltaSnapshotManagerFactory.fromPath(null, new Configuration()) + } + } + + test("fromPath throws NullPointerException for null hadoopConf") { + assertThrows[NullPointerException] { + DeltaSnapshotManagerFactory.fromPath("/tmp/test", null) + } + } + + test("fromCatalogTable throws NullPointerException for null catalogTable") { + assertThrows[NullPointerException] { + DeltaSnapshotManagerFactory.fromCatalogTable(null, spark, new Configuration()) + } + } + + test("fromCatalogTable throws NullPointerException for null spark") { + val table = createTestCatalogTable() + assertThrows[NullPointerException] { + DeltaSnapshotManagerFactory.fromCatalogTable(table, null, new Configuration()) + } + } + + test("fromCatalogTable throws NullPointerException for null hadoopConf") { + val table = createTestCatalogTable() + assertThrows[NullPointerException] { + DeltaSnapshotManagerFactory.fromCatalogTable(table, spark, null) + } + } + + test("fromCatalogTable throws UnsupportedOperationException (skeleton)") { + val table = createTestCatalogTable() + assertThrows[UnsupportedOperationException] { + DeltaSnapshotManagerFactory.fromCatalogTable(table, spark, new Configuration()) + } + } +} diff --git a/kernel-spark/src/test/scala/io/delta/kernel/spark/snapshot/unitycatalog/UCUtilsSuite.scala b/kernel-spark/src/test/scala/io/delta/kernel/spark/snapshot/unitycatalog/UCUtilsSuite.scala new file mode 100644 index 00000000000..fa29ca500ff --- /dev/null +++ b/kernel-spark/src/test/scala/io/delta/kernel/spark/snapshot/unitycatalog/UCUtilsSuite.scala @@ -0,0 +1,225 @@ +/* + * Copyright (2025) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.spark.snapshot.unitycatalog + +import java.net.URI +import java.util.{HashMap => JHashMap} + +import io.delta.kernel.internal.tablefeatures.TableFeatures +import io.delta.kernel.spark.utils.CatalogTableTestUtils +import io.delta.storage.commit.uccommitcoordinator.UCCommitCoordinatorClient + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.catalog.CatalogTable +import org.apache.spark.sql.test.SharedSparkSession + +/** + * Unit tests for [[UCUtils]]. + * + * Tests use distinctive, high-entropy values that would fail if the implementation + * had hardcoded defaults instead of actually extracting values from the inputs. + */ +class UCUtilsSuite extends SparkFunSuite with SharedSparkSession { + + // Use the same constants as CatalogTableUtils to ensure consistency + private val FEATURE_CATALOG_MANAGED = + TableFeatures.SET_TABLE_FEATURE_SUPPORTED_PREFIX + + TableFeatures.CATALOG_MANAGED_RW_FEATURE.featureName() + private val FEATURE_SUPPORTED = TableFeatures.SET_TABLE_FEATURE_SUPPORTED_VALUE + private val UC_TABLE_ID_KEY = UCCommitCoordinatorClient.UC_TABLE_ID_KEY + private val UC_CATALOG_CONNECTOR = "io.unitycatalog.spark.UCSingleCatalog" + + // Distinctive values that would fail if hardcoded + private val TABLE_ID_ALPHA = "uc_8f2b3c9a-d1e7-4a6f-b8c2" + private val TABLE_PATH_ALPHA = "abfss://delta-store@prod.dfs.core.windows.net/warehouse/tbl_v3" + private val UC_URI_ALPHA = "https://uc-server-westus2.example.net/api/2.1/unity-catalog" + private val UC_TOKEN_ALPHA = "dapi_Xk7mP$9qRs#2vWz_prod" + private val CATALOG_ALPHA = "uc_catalog_westus2_prod" + + // ==================== Helper Methods ==================== + + private def makeNonUCTable(): CatalogTable = { + CatalogTableTestUtils.createCatalogTable(locationUri = Some(new URI(TABLE_PATH_ALPHA))) + } + + private def makeUCTable( + tableId: String = TABLE_ID_ALPHA, + tablePath: String = TABLE_PATH_ALPHA, + catalogName: Option[String] = None): CatalogTable = { + val storageProps = new JHashMap[String, String]() + storageProps.put(FEATURE_CATALOG_MANAGED, FEATURE_SUPPORTED) + storageProps.put(UC_TABLE_ID_KEY, tableId) + + CatalogTableTestUtils.createCatalogTable( + catalogName = catalogName, + storageProperties = storageProps, + locationUri = Some(new URI(tablePath))) + } + + private def withUCCatalogConfig( + catalogName: String, + uri: String, + token: String)(testCode: => Unit): Unit = { + val configs = Seq( + s"spark.sql.catalog.$catalogName" -> UC_CATALOG_CONNECTOR, + s"spark.sql.catalog.$catalogName.uri" -> uri, + s"spark.sql.catalog.$catalogName.token" -> token) + val originalValues = configs.map { case (key, _) => key -> spark.conf.getOption(key) }.toMap + + try { + configs.foreach { case (key, value) => spark.conf.set(key, value) } + testCode + } finally { + configs.foreach { case (key, _) => + originalValues.get(key).flatten match { + case Some(v) => spark.conf.set(key, v) + case None => spark.conf.unset(key) + } + } + } + } + + // ==================== Tests ==================== + + test("returns empty for non-UC table") { + val table = makeNonUCTable() + val result = UCUtils.extractTableInfo(table, spark) + assert(result.isEmpty, "Non-UC table should return empty Optional") + } + + test("returns empty when UC table ID present but feature flag missing") { + val storageProps = new JHashMap[String, String]() + storageProps.put(UC_TABLE_ID_KEY, "orphan_id_9x7y5z") + // No FEATURE_CATALOG_MANAGED - simulates corrupted/partial metadata + + val table = CatalogTableTestUtils.createCatalogTable( + storageProperties = storageProps, + locationUri = Some(new URI("gs://other-bucket/path"))) + val result = UCUtils.extractTableInfo(table, spark) + assert(result.isEmpty, "Missing feature flag should return empty") + } + + test("throws IllegalArgumentException for UC table with empty table ID") { + val storageProps = new JHashMap[String, String]() + storageProps.put(FEATURE_CATALOG_MANAGED, FEATURE_SUPPORTED) + storageProps.put(UC_TABLE_ID_KEY, "") + + val table = CatalogTableTestUtils.createCatalogTable( + storageProperties = storageProps, + locationUri = Some(new URI("s3://empty-id-bucket/path"))) + val exception = intercept[IllegalArgumentException] { + UCUtils.extractTableInfo(table, spark) + } + assert(exception.getMessage.contains("Cannot extract ucTableId")) + } + + test("throws exception for UC table without location") { + val storageProps = new JHashMap[String, String]() + storageProps.put(FEATURE_CATALOG_MANAGED, FEATURE_SUPPORTED) + storageProps.put(UC_TABLE_ID_KEY, "no_location_tbl_id_3k9m") + + val table = CatalogTableTestUtils.createCatalogTable(storageProperties = storageProps) + // Spark throws AnalysisException when location is missing + val exception = intercept[Exception] { + UCUtils.extractTableInfo(table, spark) + } + assert(exception.getMessage.contains("locationUri") || + exception.getMessage.contains("location")) + } + + test("throws IllegalArgumentException when no matching catalog configuration") { + val table = makeUCTable(catalogName = Some("nonexistent_catalog_xyz")) + + val exception = intercept[IllegalArgumentException] { + UCUtils.extractTableInfo(table, spark) + } + assert(exception.getMessage.contains("Unity Catalog configuration not found") || + exception.getMessage.contains("Cannot create UC client")) + } + + test("extracts table info when UC catalog is properly configured") { + val table = makeUCTable(catalogName = Some(CATALOG_ALPHA)) + + withUCCatalogConfig(CATALOG_ALPHA, UC_URI_ALPHA, UC_TOKEN_ALPHA) { + val result = UCUtils.extractTableInfo(table, spark) + + assert(result.isPresent, "Should return table info") + val info = result.get() + // Each assertion uses the specific expected value - would fail if hardcoded + assert(info.getTableId == TABLE_ID_ALPHA, s"Table ID mismatch: got ${info.getTableId}") + assert( + info.getTablePath == TABLE_PATH_ALPHA, + s"Table path mismatch: got ${info.getTablePath}") + assert(info.getUcUri == UC_URI_ALPHA, s"UC URI mismatch: got ${info.getUcUri}") + assert(info.getUcToken == UC_TOKEN_ALPHA, s"UC token mismatch: got ${info.getUcToken}") + } + } + + test("selects correct catalog when multiple catalogs configured") { + // Use completely different values for each catalog to prove selection works + val catalogBeta = "uc_catalog_eastus_staging" + val ucUriBeta = "https://uc-server-eastus.example.net/api/2.1/uc" + val ucTokenBeta = "dapi_Yz3nQ$8wRt#1vXa_staging" + val tableIdBeta = "uc_tbl_staging_4d7e2f1a" + val tablePathBeta = "s3://staging-bucket-us-east/delta/tables/v2" + + val catalogGamma = "uc_catalog_euwest_dev" + val ucUriGamma = "https://uc-server-euwest.example.net/api/2.1/uc" + val ucTokenGamma = "dapi_Jk5pL$3mNq#9vBc_dev" + + // Table is in catalogBeta + val table = makeUCTable( + tableId = tableIdBeta, + tablePath = tablePathBeta, + catalogName = Some(catalogBeta)) + + val configs = Seq( + // catalogGamma config (should NOT be used) + s"spark.sql.catalog.$catalogGamma" -> UC_CATALOG_CONNECTOR, + s"spark.sql.catalog.$catalogGamma.uri" -> ucUriGamma, + s"spark.sql.catalog.$catalogGamma.token" -> ucTokenGamma, + // catalogBeta config (should be used) + s"spark.sql.catalog.$catalogBeta" -> UC_CATALOG_CONNECTOR, + s"spark.sql.catalog.$catalogBeta.uri" -> ucUriBeta, + s"spark.sql.catalog.$catalogBeta.token" -> ucTokenBeta) + val originalValues = configs.map { case (key, _) => key -> spark.conf.getOption(key) }.toMap + + try { + configs.foreach { case (key, value) => spark.conf.set(key, value) } + + val result = UCUtils.extractTableInfo(table, spark) + assert(result.isPresent, "Should return table info") + + val info = result.get() + // Verify it selected catalogBeta's config, not catalogGamma's + assert( + info.getUcUri == ucUriBeta, + s"Should use catalogBeta's URI, got: ${info.getUcUri}") + assert(info.getUcToken == ucTokenBeta, s"Should use catalogBeta's token, got: ${info.getUcToken}") + assert(info.getTableId == tableIdBeta, s"Should extract tableIdBeta, got: ${info.getTableId}") + assert( + info.getTablePath == tablePathBeta, + s"Should extract tablePathBeta, got: ${info.getTablePath}") + } finally { + configs.foreach { case (key, _) => + originalValues.get(key).flatten match { + case Some(v) => spark.conf.set(key, v) + case None => spark.conf.unset(key) + } + } + } + } +} diff --git a/kernel-spark/src/test/scala/io/delta/kernel/spark/utils/CatalogTableTestUtils.scala b/kernel-spark/src/test/scala/io/delta/kernel/spark/utils/CatalogTableTestUtils.scala index 4f80f346bb8..ea98efb54b5 100644 --- a/kernel-spark/src/test/scala/io/delta/kernel/spark/utils/CatalogTableTestUtils.scala +++ b/kernel-spark/src/test/scala/io/delta/kernel/spark/utils/CatalogTableTestUtils.scala @@ -15,8 +15,6 @@ */ package io.delta.kernel.spark.utils -import scala.collection.immutable.Seq - import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType} import org.apache.spark.sql.types.StructType @@ -30,58 +28,52 @@ import org.apache.spark.sql.types.StructType */ object CatalogTableTestUtils { - def catalogTableWithProperties( - properties: java.util.Map[String, String], - storageProperties: java.util.Map[String, String]): CatalogTable = { + /** + * Creates a [[CatalogTable]] with configurable options. + * + * @param tableName table name (default: "tbl") + * @param catalogName optional catalog name for the identifier + * @param properties table properties (default: empty) + * @param storageProperties storage properties (default: empty) + * @param locationUri optional storage location URI + * @param nullStorage if true, sets storage to null (for edge case testing) + * @param nullStorageProperties if true, sets storage properties to null + */ + def createCatalogTable( + tableName: String = "tbl", + catalogName: Option[String] = None, + properties: java.util.Map[String, String] = new java.util.HashMap[String, String](), + storageProperties: java.util.Map[String, String] = new java.util.HashMap[String, String](), + locationUri: Option[java.net.URI] = None, + nullStorage: Boolean = false, + nullStorageProperties: Boolean = false): CatalogTable = { + val scalaProps = ScalaUtils.toScalaMap(properties) - val scalaStorageProps = ScalaUtils.toScalaMap(storageProperties) + val scalaStorageProps = + if (nullStorageProperties) null else ScalaUtils.toScalaMap(storageProperties) - CatalogTable( - identifier = TableIdentifier("tbl"), - tableType = CatalogTableType.MANAGED, - storage = CatalogStorageFormat( - locationUri = None, + val identifier = catalogName match { + case Some(catalog) => + TableIdentifier(tableName, Some("default") /* database */, Some(catalog) /* catalog */ ) + case None => TableIdentifier(tableName) + } + + val storage = if (nullStorage) { + null + } else { + CatalogStorageFormat( + locationUri = locationUri, inputFormat = None, outputFormat = None, serde = None, compressed = false, - properties = scalaStorageProps), - schema = new StructType(), - provider = None, - partitionColumnNames = Seq.empty, - bucketSpec = None, - properties = scalaProps) - } - - def catalogTableWithNullStorage( - properties: java.util.Map[String, String]): CatalogTable = { - val scalaProps = ScalaUtils.toScalaMap(properties) - - CatalogTable( - identifier = TableIdentifier("tbl"), - tableType = CatalogTableType.MANAGED, - storage = null, - schema = new StructType(), - provider = None, - partitionColumnNames = Seq.empty, - bucketSpec = None, - properties = scalaProps) - } - - def catalogTableWithNullStorageProperties( - properties: java.util.Map[String, String]): CatalogTable = { - val scalaProps = ScalaUtils.toScalaMap(properties) + properties = scalaStorageProps) + } CatalogTable( - identifier = TableIdentifier("tbl"), + identifier = identifier, tableType = CatalogTableType.MANAGED, - storage = CatalogStorageFormat( - locationUri = None, - inputFormat = None, - outputFormat = None, - serde = None, - compressed = false, - properties = null), + storage = storage, schema = new StructType(), provider = None, partitionColumnNames = Seq.empty, diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/coordinatedcommits/UCCommitCoordinatorBuilder.scala b/spark/src/main/scala/org/apache/spark/sql/delta/coordinatedcommits/UCCommitCoordinatorBuilder.scala index 3faab141bf1..377214d45a4 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/coordinatedcommits/UCCommitCoordinatorBuilder.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/coordinatedcommits/UCCommitCoordinatorBuilder.scala @@ -229,6 +229,16 @@ object UCCommitCoordinatorBuilder .toList } + /** + * Returns catalog configurations as a Map for O(1) lookup by catalog name. + * Wraps [[getCatalogConfigs]] results in [[UCCatalogConfig]] for better readability. + */ + private[delta] def getCatalogConfigMap(spark: SparkSession): Map[String, UCCatalogConfig] = { + getCatalogConfigs(spark).map { + case (name, uri, token) => name -> UCCatalogConfig(name, uri, token) + }.toMap + } + private def safeClose(ucClient: UCClient, uri: String): Unit = { try { ucClient.close() @@ -252,3 +262,9 @@ object UCTokenBasedRestClientFactory extends UCClientFactory { override def createUCClient(uri: String, token: String): UCClient = new UCTokenBasedRestClient(uri, token) } + +/** + * Holder for Unity Catalog configuration extracted from Spark configs. + * Used by [[UCCommitCoordinatorBuilder.getCatalogConfigMap]]. + */ +case class UCCatalogConfig(catalogName: String, uri: String, token: String)