diff --git a/build.sbt b/build.sbt index f5381b19344..aec010214a6 100644 --- a/build.sbt +++ b/build.sbt @@ -462,6 +462,7 @@ lazy val sparkV1Filtered = (project in file("spark-v1-filtered")) lazy val sparkV2 = (project in file("kernel-spark")) .dependsOn(sparkV1Filtered) .dependsOn(kernelDefaults) + .dependsOn(kernelUnityCatalog % "compile->compile;test->test") .dependsOn(goldenTables % "test") .settings( name := "delta-spark-v2", diff --git a/kernel-spark/src/main/java/io/delta/kernel/spark/catalog/SparkTable.java b/kernel-spark/src/main/java/io/delta/kernel/spark/catalog/SparkTable.java index c9a28dfb88a..943efe47bf2 100644 --- a/kernel-spark/src/main/java/io/delta/kernel/spark/catalog/SparkTable.java +++ b/kernel-spark/src/main/java/io/delta/kernel/spark/catalog/SparkTable.java @@ -21,7 +21,7 @@ import io.delta.kernel.Snapshot; import io.delta.kernel.spark.read.SparkScanBuilder; import io.delta.kernel.spark.snapshot.DeltaSnapshotManager; -import io.delta.kernel.spark.snapshot.PathBasedSnapshotManager; +import io.delta.kernel.spark.snapshot.DeltaSnapshotManagerFactory; import io.delta.kernel.spark.utils.SchemaUtils; import java.util.*; import org.apache.hadoop.conf.Configuration; @@ -141,7 +141,11 @@ private SparkTable( this.hadoopConf = SparkSession.active().sessionState().newHadoopConfWithOptions(toScalaMap(options)); - this.snapshotManager = new PathBasedSnapshotManager(tablePath, hadoopConf); + this.snapshotManager = + catalogTable.isPresent() + ? DeltaSnapshotManagerFactory.fromCatalogTable( + catalogTable.get(), SparkSession.active(), hadoopConf) + : DeltaSnapshotManagerFactory.fromPath(tablePath, hadoopConf); // Load the initial snapshot through the manager this.initialSnapshot = snapshotManager.loadLatestSnapshot(); this.schema = SchemaUtils.convertKernelSchemaToSparkSchema(initialSnapshot.getSchema()); diff --git a/kernel-spark/src/main/java/io/delta/kernel/spark/snapshot/CatalogManagedSnapshotManager.java b/kernel-spark/src/main/java/io/delta/kernel/spark/snapshot/CatalogManagedSnapshotManager.java new file mode 100644 index 00000000000..93655bbc872 --- /dev/null +++ b/kernel-spark/src/main/java/io/delta/kernel/spark/snapshot/CatalogManagedSnapshotManager.java @@ -0,0 +1,358 @@ +/* + * Copyright (2025) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.spark.snapshot; + +import static io.delta.kernel.internal.util.Preconditions.checkArgument; +import static java.util.Objects.requireNonNull; + +import io.delta.kernel.CommitRange; +import io.delta.kernel.CommitRangeBuilder; +import io.delta.kernel.Snapshot; +import io.delta.kernel.SnapshotBuilder; +import io.delta.kernel.TableManager; +import io.delta.kernel.defaults.engine.DefaultEngine; +import io.delta.kernel.engine.Engine; +import io.delta.kernel.internal.DeltaHistoryManager; +import io.delta.kernel.internal.SnapshotImpl; +import io.delta.kernel.internal.files.ParsedCatalogCommitData; +import io.delta.kernel.internal.files.ParsedLogData; +import io.delta.kernel.spark.exception.VersionNotFoundException; +import io.delta.storage.commit.Commit; +import io.delta.storage.commit.GetCommitsResponse; +import java.util.Comparator; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; +import org.apache.hadoop.conf.Configuration; +import org.apache.spark.annotation.Experimental; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Implementation of DeltaSnapshotManager for catalog-managed tables (e.g., UC). + * + *

This snapshot manager owns all Delta/Kernel logic for building snapshots and commit ranges + * from catalog-provided commit metadata. It uses a {@link ManagedCatalogAdapter} to fetch commits + * from the catalog, then applies Kernel's TableManager APIs to construct the appropriate Delta + * objects. + * + *

This manager is catalog-agnostic - it works with any adapter that implements {@link + * ManagedCatalogAdapter} (UC, Glue, Polaris, etc.). The adapter handles catalog-specific + * communication while this manager handles Delta semantics. + */ +@Experimental +public class CatalogManagedSnapshotManager implements DeltaSnapshotManager, AutoCloseable { + + private static final Logger logger = LoggerFactory.getLogger(CatalogManagedSnapshotManager.class); + + private final ManagedCatalogAdapter catalogAdapter; + private final String tableId; + private final String tablePath; + private final Configuration hadoopConf; + private final Engine kernelEngine; + + public CatalogManagedSnapshotManager( + ManagedCatalogAdapter catalogAdapter, + String tableId, + String tablePath, + Configuration hadoopConf) { + this.catalogAdapter = requireNonNull(catalogAdapter, "catalogAdapter is null"); + this.tableId = requireNonNull(tableId, "tableId is null"); + this.tablePath = requireNonNull(tablePath, "tablePath is null"); + this.hadoopConf = requireNonNull(hadoopConf, "hadoopConf is null"); + + this.kernelEngine = DefaultEngine.create(this.hadoopConf); + logger.info( + "Created CatalogManagedSnapshotManager for table {} at path {}", tableId, tablePath); + } + + /** Loads the latest snapshot of the catalog-managed Delta table. */ + @Override + public Snapshot loadLatestSnapshot() { + return loadSnapshotInternal(Optional.empty(), Optional.empty()); + } + + /** + * Loads a specific version of the catalog-managed Delta table. + * + * @param version the version to load (must be >= 0) + * @return the snapshot at the specified version + */ + @Override + public Snapshot loadSnapshotAt(long version) { + checkArgument(version >= 0, "version must be non-negative"); + return loadSnapshotInternal(Optional.of(version), Optional.empty()); + } + + /** + * Finds the active commit at a specific timestamp. + * + *

For catalog-managed tables, this method retrieves ratified commits from the catalog and uses + * {@link DeltaHistoryManager#getActiveCommitAtTimestamp} to find the commit that was active at + * the specified timestamp. + * + * @param timestampMillis the timestamp in milliseconds since epoch (UTC) + * @param canReturnLastCommit if true, returns the last commit if the timestamp is after all + * commits; if false, throws an exception + * @param mustBeRecreatable if true, only considers commits that can be fully recreated from + * available log files; if false, considers all commits + * @param canReturnEarliestCommit if true, returns the earliest commit if the timestamp is before + * all commits; if false, throws an exception + * @return the commit that was active at the specified timestamp + */ + @Override + public DeltaHistoryManager.Commit getActiveCommitAtTime( + long timestampMillis, + boolean canReturnLastCommit, + boolean mustBeRecreatable, + boolean canReturnEarliestCommit) { + // Load the latest snapshot for timestamp resolution + SnapshotImpl latestSnapshot = (SnapshotImpl) loadLatestSnapshot(); + + // Extract catalog commits from the snapshot's log segment (avoids redundant catalog call) + List catalogCommits = + latestSnapshot.getLogSegment().getAllCatalogCommits(); + + return DeltaHistoryManager.getActiveCommitAtTimestamp( + kernelEngine, + latestSnapshot, + latestSnapshot.getLogPath(), + timestampMillis, + mustBeRecreatable, + canReturnLastCommit, + canReturnEarliestCommit, + catalogCommits); + } + + /** + * Checks if a specific version exists and is accessible. + * + *

For catalog-managed tables, versions are assumed to be contiguous (enforced by the catalog + * coordinator). This method performs a lightweight check by verifying the version is within the + * valid range [0, latestRatifiedVersion]. + * + * @param version the version to check + * @param mustBeRecreatable if true, requires that the version can be fully recreated from + * available log files. For catalog-managed tables, all versions are recreatable since the + * catalog maintains the complete commit history. + * @param allowOutOfRange if true, allows versions greater than the latest version without + * throwing an exception; if false, throws exception for out-of-range versions + * @throws VersionNotFoundException if the version is not available + */ + @Override + public void checkVersionExists(long version, boolean mustBeRecreatable, boolean allowOutOfRange) + throws VersionNotFoundException { + checkArgument(version >= 0, "version must be non-negative"); + + // For catalog-managed tables, the earliest recreatable version is 0 since the catalog + // maintains the complete commit history + long earliestVersion = 0; + long latestVersion = catalogAdapter.getLatestRatifiedVersion(); + + if (version < earliestVersion || ((version > latestVersion) && !allowOutOfRange)) { + throw new VersionNotFoundException(version, earliestVersion, latestVersion); + } + } + + /** + * Gets a range of table changes between versions. + * + * @param engine the Kernel engine for I/O operations + * @param startVersion the start version (inclusive) + * @param endVersion optional end version (inclusive); if empty, uses latest + * @return the commit range between the boundaries + */ + @Override + public CommitRange getTableChanges(Engine engine, long startVersion, Optional endVersion) { + requireNonNull(engine, "engine is null"); + checkArgument(startVersion >= 0, "startVersion must be non-negative"); + endVersion.ifPresent(v -> checkArgument(v >= 0, "endVersion must be non-negative")); + + return loadCommitRangeInternal( + engine, Optional.of(startVersion), Optional.empty(), endVersion, Optional.empty()); + } + + /** + * Closes the catalog adapter and releases resources. + * + *

This method should be called when the snapshot manager is no longer needed. Prefer using + * try-with-resources to ensure proper cleanup. + */ + @Override + public void close() { + try { + catalogAdapter.close(); + logger.info("Closed CatalogManagedSnapshotManager for table {}", tableId); + } catch (Exception e) { + logger.warn("Error closing catalog-managed client for table {}", tableId, e); + } + } + + // ========== Internal implementation methods ========== + + /** + * Internal method to load a snapshot at a specific version or timestamp. + * + *

This method fetches commits from the catalog adapter, converts them to Kernel's + * ParsedLogData format, and uses TableManager to build the snapshot. + */ + private Snapshot loadSnapshotInternal(Optional versionOpt, Optional timestampOpt) { + checkArgument( + !versionOpt.isPresent() || !timestampOpt.isPresent(), + "Cannot provide both version and timestamp"); + + logger.info( + "[{}] Loading Snapshot at {}", + tableId, + getVersionOrTimestampString(versionOpt, timestampOpt)); + + // Fetch commits from catalog + GetCommitsResponse response = catalogAdapter.getCommits(0, versionOpt); + long catalogVersion = getCatalogVersion(response.getLatestTableVersion()); + + // Validate version if specified + versionOpt.ifPresent(v -> validateVersionExists(v, catalogVersion)); + + // Convert to Kernel ParsedLogData + List logData = convertToKernelLogData(response.getCommits()); + + // Build snapshot using TableManager + SnapshotBuilder snapshotBuilder = TableManager.loadSnapshot(tablePath); + + if (versionOpt.isPresent()) { + snapshotBuilder = snapshotBuilder.atVersion(versionOpt.get()); + } + + if (timestampOpt.isPresent()) { + // For timestamp queries, first build the latest snapshot for resolution + Snapshot latestSnapshot = + snapshotBuilder + .withLogData(logData) + .withMaxCatalogVersion(catalogVersion) + .build(kernelEngine); + snapshotBuilder = + TableManager.loadSnapshot(tablePath).atTimestamp(timestampOpt.get(), latestSnapshot); + } + + return snapshotBuilder + .withLogData(logData) + .withMaxCatalogVersion(catalogVersion) + .build(kernelEngine); + } + + /** Internal method to load a commit range with version or timestamp boundaries. */ + private CommitRange loadCommitRangeInternal( + Engine engine, + Optional startVersionOpt, + Optional startTimestampOpt, + Optional endVersionOpt, + Optional endTimestampOpt) { + checkArgument( + !startVersionOpt.isPresent() || !startTimestampOpt.isPresent(), + "Cannot provide both start version and start timestamp"); + checkArgument( + !endVersionOpt.isPresent() || !endTimestampOpt.isPresent(), + "Cannot provide both end version and end timestamp"); + + // For timestamp-based boundaries, don't filter by endVersion when fetching commits + Optional endVersionForQuery = + endVersionOpt.filter(v -> !startTimestampOpt.isPresent() && !endTimestampOpt.isPresent()); + + // Fetch commits from catalog + GetCommitsResponse response = catalogAdapter.getCommits(0, endVersionForQuery); + long catalogVersion = getCatalogVersion(response.getLatestTableVersion()); + + // Validate version boundaries + startVersionOpt.ifPresent(v -> validateVersionExists(v, catalogVersion)); + endVersionOpt.ifPresent(v -> validateVersionExists(v, catalogVersion)); + + // Convert to Kernel ParsedLogData + List logData = convertToKernelLogData(response.getCommits()); + + // Build commit range using TableManager + CommitRangeBuilder builder = TableManager.loadCommitRange(tablePath); + + if (startVersionOpt.isPresent()) { + builder = + builder.withStartBoundary( + CommitRangeBuilder.CommitBoundary.atVersion(startVersionOpt.get())); + } + if (startTimestampOpt.isPresent()) { + Snapshot latestSnapshot = loadLatestSnapshot(); + builder = + builder.withStartBoundary( + CommitRangeBuilder.CommitBoundary.atTimestamp( + startTimestampOpt.get(), latestSnapshot)); + } + if (endVersionOpt.isPresent()) { + builder = + builder.withEndBoundary(CommitRangeBuilder.CommitBoundary.atVersion(endVersionOpt.get())); + } + if (endTimestampOpt.isPresent()) { + Snapshot latestSnapshot = loadLatestSnapshot(); + builder = + builder.withEndBoundary( + CommitRangeBuilder.CommitBoundary.atTimestamp(endTimestampOpt.get(), latestSnapshot)); + } + + return builder.withLogData(logData).build(engine); + } + + /** Converts catalog commits to Kernel's ParsedLogData format. */ + private List convertToKernelLogData(List commits) { + return commits.stream() + .sorted(Comparator.comparingLong(Commit::getVersion)) + .map( + commit -> + ParsedCatalogCommitData.forFileStatus( + hadoopFileStatusToKernelFileStatus(commit.getFileStatus()))) + .collect(Collectors.toList()); + } + + /** Converts Hadoop FileStatus to Kernel FileStatus. */ + private static io.delta.kernel.utils.FileStatus hadoopFileStatusToKernelFileStatus( + org.apache.hadoop.fs.FileStatus hadoopFS) { + return io.delta.kernel.utils.FileStatus.of( + hadoopFS.getPath().toString(), hadoopFS.getLen(), hadoopFS.getModificationTime()); + } + + /** Gets the true catalog version, handling the -1 case for newly created tables. */ + private long getCatalogVersion(long rawVersion) { + // UC returns -1 when only 0.json exists but hasn't been registered with UC + return rawVersion == -1 ? 0 : rawVersion; + } + + /** Validates that the requested version exists. */ + private void validateVersionExists(long version, long maxVersion) { + if (version > maxVersion) { + throw new IllegalArgumentException( + String.format( + "[%s] Cannot load version %d as the latest version ratified by catalog is %d", + tableId, version, maxVersion)); + } + } + + private String getVersionOrTimestampString( + Optional versionOpt, Optional timestampOpt) { + if (versionOpt.isPresent()) { + return "version=" + versionOpt.get(); + } else if (timestampOpt.isPresent()) { + return "timestamp=" + timestampOpt.get(); + } else { + return "latest"; + } + } +} diff --git a/kernel-spark/src/main/java/io/delta/kernel/spark/snapshot/DeltaSnapshotManagerFactory.java b/kernel-spark/src/main/java/io/delta/kernel/spark/snapshot/DeltaSnapshotManagerFactory.java new file mode 100644 index 00000000000..73c46edfcee --- /dev/null +++ b/kernel-spark/src/main/java/io/delta/kernel/spark/snapshot/DeltaSnapshotManagerFactory.java @@ -0,0 +1,112 @@ +/* + * Copyright (2025) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.spark.snapshot; + +import static java.util.Objects.requireNonNull; + +import io.delta.kernel.spark.snapshot.unitycatalog.UnityCatalogAdapter; +import java.util.Optional; +import org.apache.hadoop.conf.Configuration; +import org.apache.spark.annotation.Experimental; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.catalyst.catalog.CatalogTable; + +/** + * Factory for creating {@link DeltaSnapshotManager} instances. + * + *

This factory provides two creation methods: + * + *

+ * + *

Example usage: + * + *

{@code
+ * // For path-based tables
+ * DeltaSnapshotManager manager = DeltaSnapshotManagerFactory.fromPath(
+ *     tablePath,
+ *     hadoopConf
+ * );
+ *
+ * // For catalog tables
+ * DeltaSnapshotManager manager = DeltaSnapshotManagerFactory.fromCatalogTable(
+ *     catalogTable,
+ *     spark,
+ *     hadoopConf
+ * );
+ * }
+ */ +@Experimental +public final class DeltaSnapshotManagerFactory { + + // Utility class - no instances + private DeltaSnapshotManagerFactory() {} + + /** + * Creates a path-based snapshot manager for filesystem Delta tables. + * + *

Use this when no catalog metadata is available or when you want to work directly with a + * filesystem path. + * + * @param tablePath filesystem path to the Delta table root + * @param hadoopConf Hadoop configuration for the Delta Kernel engine + * @return PathBasedSnapshotManager instance + * @throws NullPointerException if tablePath or hadoopConf is null + */ + public static DeltaSnapshotManager fromPath(String tablePath, Configuration hadoopConf) { + requireNonNull(tablePath, "tablePath is null"); + requireNonNull(hadoopConf, "hadoopConf is null"); + return new PathBasedSnapshotManager(tablePath, hadoopConf); + } + + /** + * Creates a snapshot manager from catalog table metadata. + * + *

Automatically selects {@link CatalogManagedSnapshotManager} for Unity Catalog managed + * tables, or falls back to {@link PathBasedSnapshotManager} for regular tables. + * + * @param catalogTable Spark catalog table metadata + * @param spark SparkSession for resolving Unity Catalog configurations + * @param hadoopConf Hadoop configuration for the Delta Kernel engine + * @return appropriate snapshot manager implementation + * @throws NullPointerException if catalogTable, spark, or hadoopConf is null + * @throws IllegalArgumentException if catalogTable is UC-managed but configuration is invalid + */ + public static DeltaSnapshotManager fromCatalogTable( + CatalogTable catalogTable, SparkSession spark, Configuration hadoopConf) { + requireNonNull(catalogTable, "catalogTable is null"); + requireNonNull(spark, "spark is null"); + requireNonNull(hadoopConf, "hadoopConf is null"); + + Optional adapterOpt = + UnityCatalogAdapter.fromCatalog(catalogTable, spark); + + if (adapterOpt.isPresent()) { + ManagedCatalogAdapter adapter = adapterOpt.get(); + return new CatalogManagedSnapshotManager( + adapter, adapter.getTableId(), adapter.getTablePath(), hadoopConf); + } + + // Fallback to path-based snapshot manager + String tablePath = catalogTable.location().toString(); + return new PathBasedSnapshotManager(tablePath, hadoopConf); + } +} diff --git a/kernel-spark/src/main/java/io/delta/kernel/spark/snapshot/ManagedCatalogAdapter.java b/kernel-spark/src/main/java/io/delta/kernel/spark/snapshot/ManagedCatalogAdapter.java new file mode 100644 index 00000000000..05743a3fe5d --- /dev/null +++ b/kernel-spark/src/main/java/io/delta/kernel/spark/snapshot/ManagedCatalogAdapter.java @@ -0,0 +1,81 @@ +/* + * Copyright (2025) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.spark.snapshot; + +import io.delta.storage.commit.GetCommitsResponse; +import java.util.Optional; + +/** + * Adapter interface for catalog-managed Delta tables. + * + *

This is a thin, protocol-aligned interface that adapters implement to fetch commit metadata + * from a catalog's commit coordinator API. Adapters are responsible only for communication with the + * catalog - they don't know about Delta snapshots or Kernel internals. + * + *

The {@link CatalogManagedSnapshotManager} uses this interface to retrieve commits and then + * builds Delta snapshots and commit ranges using Kernel's TableManager APIs. + * + *

Implementations should be catalog-specific (e.g., UnityCatalogAdapter, GlueCatalogAdapter) but + * share this common interface so the snapshot manager can work with any catalog. + */ +public interface ManagedCatalogAdapter extends AutoCloseable { + + /** + * Returns the unique identifier for this table in the catalog. + * + * @return the catalog-assigned table identifier + */ + String getTableId(); + + /** + * Returns the storage path for this table. + * + * @return the filesystem path to the Delta table root + */ + String getTablePath(); + + /** + * Retrieves commits from the catalog's commit coordinator. + * + *

This is the primary method that adapters must implement. It calls the catalog's API to get + * the list of ratified commits within the specified version range. + * + * @param startVersion the starting version (inclusive), typically 0 for initial load + * @param endVersion optional ending version (inclusive); if empty, returns up to latest + * @return response containing the list of commits and the latest ratified table version + */ + GetCommitsResponse getCommits(long startVersion, Optional endVersion); + + /** + * Returns the latest ratified table version from the catalog. + * + *

For catalog-managed tables, this is the highest version that has been successfully ratified + * by the catalog coordinator. Returns -1 if the catalog hasn't registered any commits yet (which + * can happen when version 0 exists but hasn't been ratified). + * + *

Default implementation calls {@link #getCommits} with no end version and extracts the latest + * version from the response. Implementations may override for efficiency if the catalog provides + * a dedicated API. + * + * @return the latest version ratified by the catalog, or -1 if none registered + */ + default long getLatestRatifiedVersion() { + return getCommits(0, Optional.empty()).getLatestTableVersion(); + } + + @Override + void close(); +} diff --git a/kernel-spark/src/main/java/io/delta/kernel/spark/snapshot/unitycatalog/UCTableInfo.java b/kernel-spark/src/main/java/io/delta/kernel/spark/snapshot/unitycatalog/UCTableInfo.java new file mode 100644 index 00000000000..821f79656e6 --- /dev/null +++ b/kernel-spark/src/main/java/io/delta/kernel/spark/snapshot/unitycatalog/UCTableInfo.java @@ -0,0 +1,54 @@ +/* + * Copyright (2025) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.spark.snapshot.unitycatalog; + +import static java.util.Objects.requireNonNull; + +/** + * Table information for Unity Catalog managed tables. + * + *

This POJO encapsulates all the information needed to interact with a Unity Catalog table + * without requiring Spark dependencies. + */ +public final class UCTableInfo { + private final String tableId; + private final String tablePath; + private final String ucUri; + private final String ucToken; + + public UCTableInfo(String tableId, String tablePath, String ucUri, String ucToken) { + this.tableId = requireNonNull(tableId, "tableId is null"); + this.tablePath = requireNonNull(tablePath, "tablePath is null"); + this.ucUri = requireNonNull(ucUri, "ucUri is null"); + this.ucToken = requireNonNull(ucToken, "ucToken is null"); + } + + public String getTableId() { + return tableId; + } + + public String getTablePath() { + return tablePath; + } + + public String getUcUri() { + return ucUri; + } + + public String getUcToken() { + return ucToken; + } +} diff --git a/kernel-spark/src/main/java/io/delta/kernel/spark/snapshot/unitycatalog/UCUtils.java b/kernel-spark/src/main/java/io/delta/kernel/spark/snapshot/unitycatalog/UCUtils.java new file mode 100644 index 00000000000..5b89fdf5f8f --- /dev/null +++ b/kernel-spark/src/main/java/io/delta/kernel/spark/snapshot/unitycatalog/UCUtils.java @@ -0,0 +1,113 @@ +/* + * Copyright (2025) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.spark.snapshot.unitycatalog; + +import static java.util.Objects.requireNonNull; + +import io.delta.kernel.spark.utils.CatalogTableUtils; +import io.delta.storage.commit.uccommitcoordinator.UCCommitCoordinatorClient; +import java.util.Map; +import java.util.Optional; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.catalyst.catalog.CatalogTable; +import org.apache.spark.sql.delta.coordinatedcommits.UCCatalogConfig; +import org.apache.spark.sql.delta.coordinatedcommits.UCCommitCoordinatorBuilder$; + +/** + * Utility class for extracting Unity Catalog table information from Spark catalog metadata. + * + *

This class isolates Spark dependencies, allowing {@link UCManagedSnapshotManager} to be + * created without Spark if table info is provided directly via {@link UCTableInfo}. + */ +public final class UCUtils { + + // Utility class - no instances + private UCUtils() {} + + /** + * Extracts Unity Catalog table information from Spark catalog table metadata. + * + * @param catalogTable Spark catalog table metadata + * @param spark SparkSession for resolving Unity Catalog configurations + * @return table info if table is UC-managed, empty otherwise + * @throws IllegalArgumentException if table is UC-managed but configuration is invalid + */ + public static Optional extractTableInfo( + CatalogTable catalogTable, SparkSession spark) { + requireNonNull(catalogTable, "catalogTable is null"); + requireNonNull(spark, "spark is null"); + + if (!CatalogTableUtils.isUnityCatalogManagedTable(catalogTable)) { + return Optional.empty(); + } + + String tableId = extractUCTableId(catalogTable); + String tablePath = extractTablePath(catalogTable); + + // Get catalog name - require explicit catalog in identifier + scala.Option catalogOption = catalogTable.identifier().catalog(); + if (catalogOption.isEmpty()) { + throw new IllegalArgumentException( + "Unable to determine Unity Catalog for table " + + catalogTable.identifier() + + ": catalog name is missing. Use a fully-qualified table name with an explicit " + + "catalog (e.g., catalog.schema.table)."); + } + String catalogName = catalogOption.get(); + + // Get UC endpoint and token from Spark configs + scala.collection.immutable.Map ucConfigs = + UCCommitCoordinatorBuilder$.MODULE$.getCatalogConfigMap(spark); + + scala.Option configOpt = ucConfigs.get(catalogName); + + if (configOpt.isEmpty()) { + throw new IllegalArgumentException( + "Cannot create UC client for table " + + catalogTable.identifier() + + ": Unity Catalog configuration not found for catalog '" + + catalogName + + "'."); + } + + UCCatalogConfig config = configOpt.get(); + String ucUri = config.uri(); + String ucToken = config.token(); + + return Optional.of(new UCTableInfo(tableId, tablePath, ucUri, ucToken)); + } + + private static String extractUCTableId(CatalogTable catalogTable) { + Map storageProperties = + scala.jdk.javaapi.CollectionConverters.asJava(catalogTable.storage().properties()); + + // TODO: UC constants should be consolidated in a shared location (future PR) + String ucTableId = storageProperties.get(UCCommitCoordinatorClient.UC_TABLE_ID_KEY); + if (ucTableId == null || ucTableId.isEmpty()) { + throw new IllegalArgumentException( + "Cannot extract ucTableId from table " + catalogTable.identifier()); + } + return ucTableId; + } + + private static String extractTablePath(CatalogTable catalogTable) { + if (catalogTable.location() == null) { + throw new IllegalArgumentException( + "Cannot extract table path: location is null for table " + catalogTable.identifier()); + } + return catalogTable.location().toString(); + } +} diff --git a/kernel-spark/src/main/java/io/delta/kernel/spark/snapshot/unitycatalog/UnityCatalogAdapter.java b/kernel-spark/src/main/java/io/delta/kernel/spark/snapshot/unitycatalog/UnityCatalogAdapter.java new file mode 100644 index 00000000000..9f51255253c --- /dev/null +++ b/kernel-spark/src/main/java/io/delta/kernel/spark/snapshot/unitycatalog/UnityCatalogAdapter.java @@ -0,0 +1,129 @@ +/* + * Copyright (2025) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.spark.snapshot.unitycatalog; + +import static java.util.Objects.requireNonNull; + +import io.delta.kernel.spark.snapshot.ManagedCatalogAdapter; +import io.delta.storage.commit.GetCommitsResponse; +import io.delta.storage.commit.uccommitcoordinator.UCClient; +import io.delta.storage.commit.uccommitcoordinator.UCCommitCoordinatorException; +import io.delta.storage.commit.uccommitcoordinator.UCTokenBasedRestClient; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Optional; +import org.apache.hadoop.fs.Path; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.catalyst.catalog.CatalogTable; + +/** + * Unity Catalog implementation of {@link ManagedCatalogAdapter}. + * + *

This adapter is responsible only for fetching commit metadata from Unity Catalog's commit + * coordinator API. It does not contain any Delta/Kernel snapshot building logic - that + * responsibility belongs to the {@link + * io.delta.kernel.spark.snapshot.CatalogManagedSnapshotManager} layer. + */ +public final class UnityCatalogAdapter implements ManagedCatalogAdapter { + + private final String tableId; + private final String tablePath; + private final UCClient ucClient; + + public UnityCatalogAdapter(String tableId, String tablePath, UCClient ucClient) { + this.tableId = requireNonNull(tableId, "tableId is null"); + this.tablePath = requireNonNull(tablePath, "tablePath is null"); + this.ucClient = requireNonNull(ucClient, "ucClient is null"); + } + + /** + * Creates adapter from Spark catalog table. + * + *

Extracts UC connection info from Spark metadata and delegates to {@link + * #fromConnectionInfo}. + * + * @param catalogTable Spark catalog table metadata + * @param spark SparkSession for resolving Unity Catalog configurations + * @return adapter if table is UC-managed, empty otherwise + * @throws IllegalArgumentException if table is UC-managed but configuration is invalid + */ + public static Optional fromCatalog( + CatalogTable catalogTable, SparkSession spark) { + requireNonNull(catalogTable, "catalogTable is null"); + requireNonNull(spark, "spark is null"); + + return SparkUnityCatalogUtils.extractConnectionInfo(catalogTable, spark) + .map(UnityCatalogAdapter::fromConnectionInfo); + } + + /** + * Creates adapter from connection info (no Spark dependency). + * + *

This method allows creating a UC adapter without Spark dependencies if you have connection + * information directly. + * + * @param info Unity Catalog connection information + * @return adapter instance + */ + public static ManagedCatalogAdapter fromConnectionInfo(UnityCatalogConnectionInfo info) { + requireNonNull(info, "info is null"); + UCClient client = new UCTokenBasedRestClient(info.getEndpoint(), info.getToken()); + return new UnityCatalogAdapter(info.getTableId(), info.getTablePath(), client); + } + + @Override + public String getTableId() { + return tableId; + } + + @Override + public String getTablePath() { + return tablePath; + } + + @Override + public GetCommitsResponse getCommits(long startVersion, Optional endVersion) { + requireNonNull(endVersion, "endVersion is null"); + try { + return ucClient.getCommits( + tableId, + new Path(tablePath).toUri(), + startVersion == 0 ? Optional.empty() : Optional.of(startVersion), + endVersion); + } catch (IOException e) { + throw new UncheckedIOException(e); + } catch (UCCommitCoordinatorException e) { + throw new RuntimeException(e); + } + } + + @Override + public long getLatestRatifiedVersion() { + GetCommitsResponse response = getCommits(0, Optional.empty()); + long maxRatified = response.getLatestTableVersion(); + // UC returns -1 when only 0.json exists (CREATE not yet registered with UC) + return maxRatified == -1 ? 0 : maxRatified; + } + + @Override + public void close() { + try { + ucClient.close(); + } catch (Exception e) { + // Swallow close errors to avoid disrupting caller cleanup + } + } +} diff --git a/kernel-spark/src/test/java/io/delta/kernel/spark/snapshot/unitycatalog/UCTableInfoTest.java b/kernel-spark/src/test/java/io/delta/kernel/spark/snapshot/unitycatalog/UCTableInfoTest.java new file mode 100644 index 00000000000..4e30caf91b9 --- /dev/null +++ b/kernel-spark/src/test/java/io/delta/kernel/spark/snapshot/unitycatalog/UCTableInfoTest.java @@ -0,0 +1,40 @@ +/* + * Copyright (2025) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.spark.snapshot.unitycatalog; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import org.junit.jupiter.api.Test; + +/** Tests for {@link UCTableInfo}. */ +class UCTableInfoTest { + + @Test + void testConstructor_ValidInputs_StoresAllFields() { + // Use distinctive values that would fail if implementation had hardcoded defaults + String tableId = "uc_tbl_7f3a9b2c-e8d1-4f6a"; + String tablePath = "abfss://container@acct.dfs.core.windows.net/delta/v2"; + String ucUri = "https://uc-server.example.net/api/2.1/uc"; + String ucToken = "dapi_Kx9mN$2pQr#7vWz"; + + UCTableInfo info = new UCTableInfo(tableId, tablePath, ucUri, ucToken); + + assertEquals(tableId, info.getTableId(), "Table ID should be stored correctly"); + assertEquals(tablePath, info.getTablePath(), "Table path should be stored correctly"); + assertEquals(ucUri, info.getUcUri(), "UC URI should be stored correctly"); + assertEquals(ucToken, info.getUcToken(), "UC token should be stored correctly"); + } +} diff --git a/kernel-spark/src/test/java/io/delta/kernel/spark/utils/CatalogTableUtilsTest.java b/kernel-spark/src/test/java/io/delta/kernel/spark/utils/CatalogTableUtilsTest.java index 24e6593890a..86a52ced315 100644 --- a/kernel-spark/src/test/java/io/delta/kernel/spark/utils/CatalogTableUtilsTest.java +++ b/kernel-spark/src/test/java/io/delta/kernel/spark/utils/CatalogTableUtilsTest.java @@ -16,14 +16,15 @@ package io.delta.kernel.spark.utils; import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import io.delta.storage.commit.uccommitcoordinator.UCCommitCoordinatorClient; import java.util.Collections; +import java.util.HashMap; import java.util.Map; import org.apache.spark.sql.catalyst.catalog.CatalogTable; import org.junit.jupiter.api.Test; +import scala.Option; /** Tests for {@link CatalogTableUtils}. */ class CatalogTableUtilsTest { @@ -97,22 +98,6 @@ void testIsUnityCatalogManaged_PreviewFlagMissingId_ReturnsFalse() { "Preview flag without ID should not be considered Unity managed"); } - @Test - void testIsCatalogManaged_NullTable_ThrowsException() { - assertThrows( - NullPointerException.class, - () -> CatalogTableUtils.isCatalogManaged(null), - "Null table should throw NullPointerException"); - } - - @Test - void testIsUnityCatalogManaged_NullTable_ThrowsException() { - assertThrows( - NullPointerException.class, - () -> CatalogTableUtils.isUnityCatalogManagedTable(null), - "Null table should throw NullPointerException"); - } - @Test void testIsCatalogManaged_NullStorage_ReturnsFalse() { CatalogTable table = catalogTableWithNullStorage(Collections.emptyMap()); @@ -151,15 +136,36 @@ void testIsUnityCatalogManaged_NullStorageProperties_ReturnsFalse() { private static CatalogTable catalogTable( Map properties, Map storageProperties) { - return CatalogTableTestUtils$.MODULE$.catalogTableWithProperties(properties, storageProperties); + return CatalogTableTestUtils$.MODULE$.createCatalogTable( + "tbl" /* tableName */, + Option.empty() /* catalogName */, + properties, + storageProperties, + Option.empty() /* locationUri */, + false /* nullStorage */, + false /* nullStorageProperties */); } private static CatalogTable catalogTableWithNullStorage(Map properties) { - return CatalogTableTestUtils$.MODULE$.catalogTableWithNullStorage(properties); + return CatalogTableTestUtils$.MODULE$.createCatalogTable( + "tbl" /* tableName */, + Option.empty() /* catalogName */, + properties, + new HashMap<>() /* storageProperties */, + Option.empty() /* locationUri */, + true /* nullStorage */, + false /* nullStorageProperties */); } private static CatalogTable catalogTableWithNullStorageProperties( Map properties) { - return CatalogTableTestUtils$.MODULE$.catalogTableWithNullStorageProperties(properties); + return CatalogTableTestUtils$.MODULE$.createCatalogTable( + "tbl" /* tableName */, + Option.empty() /* catalogName */, + properties, + new HashMap<>() /* storageProperties */, + Option.empty() /* locationUri */, + false /* nullStorage */, + true /* nullStorageProperties */); } } diff --git a/kernel-spark/src/test/scala/io/delta/kernel/spark/snapshot/CatalogManagedSnapshotManagerSuite.scala b/kernel-spark/src/test/scala/io/delta/kernel/spark/snapshot/CatalogManagedSnapshotManagerSuite.scala new file mode 100644 index 00000000000..918b9e8ae9c --- /dev/null +++ b/kernel-spark/src/test/scala/io/delta/kernel/spark/snapshot/CatalogManagedSnapshotManagerSuite.scala @@ -0,0 +1,372 @@ +/* + * Copyright (2025) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.spark.snapshot + +import java.util.Optional + +import io.delta.kernel.spark.exception.VersionNotFoundException +import io.delta.kernel.spark.snapshot.unitycatalog.UnityCatalogAdapter +import io.delta.kernel.unitycatalog.{InMemoryUCClient, UCCatalogManagedTestUtils} + +import org.apache.hadoop.conf.Configuration +import org.scalatest.funsuite.AnyFunSuite + +/** Tests for [[CatalogManagedSnapshotManager]]. */ +class CatalogManagedSnapshotManagerSuite extends AnyFunSuite with UCCatalogManagedTestUtils { + + private val testUcTableId = "testUcTableId" + + test("constructor throws on null hadoopConf") { + val adapter = new UnityCatalogAdapter( + testUcTableId, + "/tmp/path", + new InMemoryUCClient("ucMetastoreId")) + + assertThrows[NullPointerException] { + new CatalogManagedSnapshotManager(adapter, testUcTableId, "/tmp/path", null) + } + } + + test("constructor throws on null catalogAdapter") { + assertThrows[NullPointerException] { + new CatalogManagedSnapshotManager(null, testUcTableId, "/tmp/path", new Configuration()) + } + } + + test("constructor throws on null tableId") { + val adapter = new UnityCatalogAdapter( + testUcTableId, + "/tmp/path", + new InMemoryUCClient("ucMetastoreId")) + + assertThrows[NullPointerException] { + new CatalogManagedSnapshotManager(adapter, null, "/tmp/path", new Configuration()) + } + } + + test("constructor throws on null tablePath") { + val adapter = new UnityCatalogAdapter( + testUcTableId, + "/tmp/path", + new InMemoryUCClient("ucMetastoreId")) + + assertThrows[NullPointerException] { + new CatalogManagedSnapshotManager(adapter, testUcTableId, null, new Configuration()) + } + } + + test("loadLatestSnapshot returns snapshot at max ratified version") { + withUCClientAndTestTable { (ucClient, tablePath, maxRatifiedVersion) => + val adapter = new UnityCatalogAdapter(testUcTableId, tablePath, ucClient) + val manager = new CatalogManagedSnapshotManager( + adapter, + testUcTableId, + tablePath, + new Configuration()) + + try { + val snapshot = manager.loadLatestSnapshot() + + assert(snapshot != null, "Snapshot should not be null") + assert(snapshot.getVersion == maxRatifiedVersion, "Should load max ratified version") + } finally { + manager.close() + } + } + } + + test("loadSnapshotAt loads specified version") { + withUCClientAndTestTable { (ucClient, tablePath, maxRatifiedVersion) => + val adapter = new UnityCatalogAdapter(testUcTableId, tablePath, ucClient) + val manager = new CatalogManagedSnapshotManager( + adapter, + testUcTableId, + tablePath, + new Configuration()) + + try { + val snapshot = manager.loadSnapshotAt(1L) + + assert(snapshot != null, "Snapshot should not be null") + assert(snapshot.getVersion == 1L, "Should load version 1") + } finally { + manager.close() + } + } + } + + test("loadSnapshotAt throws on negative version") { + withUCClientAndTestTable { (ucClient, tablePath, _) => + val adapter = new UnityCatalogAdapter(testUcTableId, tablePath, ucClient) + val manager = new CatalogManagedSnapshotManager( + adapter, + testUcTableId, + tablePath, + new Configuration()) + + try { + assertThrows[IllegalArgumentException] { + manager.loadSnapshotAt(-1L) + } + } finally { + manager.close() + } + } + } + + test("checkVersionExists validates version range") { + withUCClientAndTestTable { (ucClient, tablePath, maxRatifiedVersion) => + val adapter = new UnityCatalogAdapter(testUcTableId, tablePath, ucClient) + val manager = new CatalogManagedSnapshotManager( + adapter, + testUcTableId, + tablePath, + new Configuration()) + + try { + // Versions 0, 1, 2 should exist + manager.checkVersionExists( + 0L, + /* mustBeRecreatable = */ true, + /* allowOutOfRange = */ false) + manager.checkVersionExists( + 1L, + /* mustBeRecreatable = */ true, + /* allowOutOfRange = */ false) + manager.checkVersionExists( + maxRatifiedVersion, + /* mustBeRecreatable = */ true, + /* allowOutOfRange = */ false) + + // Version beyond latest should throw + assertThrows[VersionNotFoundException] { + manager.checkVersionExists( + maxRatifiedVersion + 1, + /* mustBeRecreatable = */ true, + /* allowOutOfRange = */ false) + } + } finally { + manager.close() + } + } + } + + test("checkVersionExists allows out of range when specified") { + withUCClientAndTestTable { (ucClient, tablePath, maxRatifiedVersion) => + val adapter = new UnityCatalogAdapter(testUcTableId, tablePath, ucClient) + val manager = new CatalogManagedSnapshotManager( + adapter, + testUcTableId, + tablePath, + new Configuration()) + + try { + // Should not throw when allowOutOfRange = true + manager.checkVersionExists( + maxRatifiedVersion + 10, + /* mustBeRecreatable = */ true, + /* allowOutOfRange = */ true) + } finally { + manager.close() + } + } + } + + test("checkVersionExists throws on negative version") { + withUCClientAndTestTable { (ucClient, tablePath, _) => + val adapter = new UnityCatalogAdapter(testUcTableId, tablePath, ucClient) + val manager = new CatalogManagedSnapshotManager( + adapter, + testUcTableId, + tablePath, + new Configuration()) + + try { + assertThrows[IllegalArgumentException] { + manager.checkVersionExists( + -1L, + /* mustBeRecreatable = */ true, + /* allowOutOfRange = */ false) + } + } finally { + manager.close() + } + } + } + + test("getTableChanges returns commit range") { + withUCClientAndTestTable { (ucClient, tablePath, _) => + val adapter = new UnityCatalogAdapter(testUcTableId, tablePath, ucClient) + val manager = new CatalogManagedSnapshotManager( + adapter, + testUcTableId, + tablePath, + new Configuration()) + + try { + val commitRange = manager.getTableChanges( + defaultEngine, + /* startVersion = */ 1L, + /* endVersion = */ Optional.of(2L)) + + assert(commitRange != null, "CommitRange should not be null") + } finally { + manager.close() + } + } + } + + test("close releases resources") { + withUCClientAndTestTable { (ucClient, tablePath, _) => + val adapter = new UnityCatalogAdapter(testUcTableId, tablePath, ucClient) + val manager = new CatalogManagedSnapshotManager( + adapter, + testUcTableId, + tablePath, + new Configuration()) + + // Should not throw + manager.close() + } + } + + // Time-travel tests for getActiveCommitAtTime + + test("getActiveCommitAtTime returns commit at exact timestamp") { + withUCClientAndTestTable { (ucClient, tablePath, _) => + val adapter = new UnityCatalogAdapter(testUcTableId, tablePath, ucClient) + val manager = new CatalogManagedSnapshotManager( + adapter, + testUcTableId, + tablePath, + new Configuration()) + + try { + // Timestamp at v1 should return version 1 + val commit = manager.getActiveCommitAtTime( + v1Ts, + /* canReturnLastCommit = */ false, + /* mustBeRecreatable = */ true, + /* canReturnEarliestCommit = */ false) + + assert(commit != null, "Commit should not be null") + assert(commit.getVersion == 1L, s"Expected version 1, got ${commit.getVersion}") + } finally { + manager.close() + } + } + } + + test("getActiveCommitAtTime returns latest when canReturnLastCommit is true") { + withUCClientAndTestTable { (ucClient, tablePath, maxRatifiedVersion) => + val adapter = new UnityCatalogAdapter(testUcTableId, tablePath, ucClient) + val manager = new CatalogManagedSnapshotManager( + adapter, + testUcTableId, + tablePath, + new Configuration()) + + try { + // Timestamp after all commits with canReturnLastCommit=true + val futureTs = v2Ts + 1000000L + val commit = manager.getActiveCommitAtTime( + futureTs, + /* canReturnLastCommit = */ true, + /* mustBeRecreatable = */ true, + /* canReturnEarliestCommit = */ false) + + assert(commit != null, "Commit should not be null") + assert(commit.getVersion == maxRatifiedVersion, + s"Expected version $maxRatifiedVersion, got ${commit.getVersion}") + } finally { + manager.close() + } + } + } + + test("getActiveCommitAtTime returns earliest when canReturnEarliestCommit is true") { + withUCClientAndTestTable { (ucClient, tablePath, _) => + val adapter = new UnityCatalogAdapter(testUcTableId, tablePath, ucClient) + val manager = new CatalogManagedSnapshotManager( + adapter, + testUcTableId, + tablePath, + new Configuration()) + + try { + // Timestamp before all commits with canReturnEarliestCommit=true + val pastTs = v0Ts - 1000000L + val commit = manager.getActiveCommitAtTime( + pastTs, + /* canReturnLastCommit = */ false, + /* mustBeRecreatable = */ true, + /* canReturnEarliestCommit = */ true) + + assert(commit != null, "Commit should not be null") + // Should return the earliest available version + assert(commit.getVersion >= 0L, s"Expected version >= 0, got ${commit.getVersion}") + } finally { + manager.close() + } + } + } + + test("getActiveCommitAtTime returns commit between versions") { + withUCClientAndTestTable { (ucClient, tablePath, _) => + val adapter = new UnityCatalogAdapter(testUcTableId, tablePath, ucClient) + val manager = new CatalogManagedSnapshotManager( + adapter, + testUcTableId, + tablePath, + new Configuration()) + + try { + // Timestamp between v1 and v2 should return v1 + val betweenTs = (v1Ts + v2Ts) / 2 + val commit = manager.getActiveCommitAtTime( + betweenTs, + /* canReturnLastCommit = */ false, + /* mustBeRecreatable = */ true, + /* canReturnEarliestCommit = */ false) + + assert(commit != null, "Commit should not be null") + assert(commit.getVersion == 1L, s"Expected version 1, got ${commit.getVersion}") + } finally { + manager.close() + } + } + } + + test("loadSnapshotAt loads version 0") { + withUCClientAndTestTable { (ucClient, tablePath, _) => + val adapter = new UnityCatalogAdapter(testUcTableId, tablePath, ucClient) + val manager = new CatalogManagedSnapshotManager( + adapter, + testUcTableId, + tablePath, + new Configuration()) + + try { + val snapshot = manager.loadSnapshotAt(0L) + + assert(snapshot != null, "Snapshot should not be null") + assert(snapshot.getVersion == 0L, "Should load version 0") + } finally { + manager.close() + } + } + } +} diff --git a/kernel-spark/src/test/scala/io/delta/kernel/spark/snapshot/DeltaSnapshotManagerFactorySuite.scala b/kernel-spark/src/test/scala/io/delta/kernel/spark/snapshot/DeltaSnapshotManagerFactorySuite.scala new file mode 100644 index 00000000000..aeb70216f15 --- /dev/null +++ b/kernel-spark/src/test/scala/io/delta/kernel/spark/snapshot/DeltaSnapshotManagerFactorySuite.scala @@ -0,0 +1,126 @@ +/* + * Copyright (2025) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.spark.snapshot + +import java.net.URI + +import io.delta.storage.commit.uccommitcoordinator.UCCommitCoordinatorClient +import org.apache.hadoop.conf.Configuration +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType} +import org.scalatest.funsuite.AnyFunSuite + +class DeltaSnapshotManagerFactorySuite extends AnyFunSuite { + + private def nonUcTable(location: String): CatalogTable = { + CatalogTable( + identifier = TableIdentifier("tbl", Some("default")), + tableType = CatalogTableType.EXTERNAL, + storage = CatalogStorageFormat.empty.copy(locationUri = Some(new URI(location))), + schema = new org.apache.spark.sql.types.StructType(), + provider = Some("delta")) + } + + private def ucTable(location: String, tableId: String): CatalogTable = { + CatalogTable( + identifier = TableIdentifier("tbl", Some("uc"), Some("main")), + tableType = CatalogTableType.EXTERNAL, + storage = CatalogStorageFormat.empty.copy( + locationUri = Some(new URI(location)), + properties = Map( + UCCommitCoordinatorClient.UC_TABLE_ID_KEY -> tableId, + "delta.feature.catalogManaged" -> "supported")), + schema = new org.apache.spark.sql.types.StructType(), + provider = Some("delta")) + } + + test("fromPath returns path-based manager") { + val mgr = DeltaSnapshotManagerFactory.fromPath("/tmp/test", new Configuration()) + assert(mgr.isInstanceOf[PathBasedSnapshotManager]) + } + + test("fromCatalogTable falls back to path-based for non-UC tables") { + val spark = SparkSession.builder().master("local[1]").appName("factory-non-uc").getOrCreate() + try { + val table = nonUcTable("file:/tmp/non-uc") + val mgr = DeltaSnapshotManagerFactory.fromCatalogTable(table, spark, new Configuration()) + assert(mgr.isInstanceOf[PathBasedSnapshotManager]) + } finally { + spark.stop() + } + } + + test("fromCatalogTable throws when UC table is missing UC config") { + val spark = SparkSession.builder().master("local[1]").appName("factory-uc-missing-config").getOrCreate() + try { + val table = ucTable("file:/tmp/uc", tableId = "abc123") + assertThrows[IllegalArgumentException] { + DeltaSnapshotManagerFactory.fromCatalogTable(table, spark, new Configuration()) + } + } finally { + spark.stop() + } + } + + // Null parameter validation tests + + test("fromPath throws on null tablePath") { + assertThrows[NullPointerException] { + DeltaSnapshotManagerFactory.fromPath(null, new Configuration()) + } + } + + test("fromPath throws on null hadoopConf") { + assertThrows[NullPointerException] { + DeltaSnapshotManagerFactory.fromPath("/tmp/test", null) + } + } + + test("fromCatalogTable throws on null catalogTable") { + val spark = SparkSession.builder().master("local[1]").appName("factory-null-table").getOrCreate() + try { + assertThrows[NullPointerException] { + DeltaSnapshotManagerFactory.fromCatalogTable(null, spark, new Configuration()) + } + } finally { + spark.stop() + } + } + + test("fromCatalogTable throws on null spark") { + val table = nonUcTable("file:/tmp/test") + assertThrows[NullPointerException] { + DeltaSnapshotManagerFactory.fromCatalogTable(table, null, new Configuration()) + } + } + + test("fromCatalogTable throws on null hadoopConf") { + val spark = SparkSession.builder().master("local[1]").appName("factory-null-conf").getOrCreate() + try { + val table = nonUcTable("file:/tmp/test") + assertThrows[NullPointerException] { + DeltaSnapshotManagerFactory.fromCatalogTable(table, spark, null) + } + } finally { + spark.stop() + } + } + + // NOTE: Testing fromCatalogTable returning CatalogManagedSnapshotManager for valid UC tables + // requires full SparkSession integration with UC catalog configuration (UCSingleCatalog, + // endpoint, token). This is covered by integration tests rather than unit tests. +} diff --git a/kernel-spark/src/test/scala/io/delta/kernel/spark/snapshot/unitycatalog/UCUtilsSuite.scala b/kernel-spark/src/test/scala/io/delta/kernel/spark/snapshot/unitycatalog/UCUtilsSuite.scala new file mode 100644 index 00000000000..fa29ca500ff --- /dev/null +++ b/kernel-spark/src/test/scala/io/delta/kernel/spark/snapshot/unitycatalog/UCUtilsSuite.scala @@ -0,0 +1,225 @@ +/* + * Copyright (2025) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.spark.snapshot.unitycatalog + +import java.net.URI +import java.util.{HashMap => JHashMap} + +import io.delta.kernel.internal.tablefeatures.TableFeatures +import io.delta.kernel.spark.utils.CatalogTableTestUtils +import io.delta.storage.commit.uccommitcoordinator.UCCommitCoordinatorClient + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.catalog.CatalogTable +import org.apache.spark.sql.test.SharedSparkSession + +/** + * Unit tests for [[UCUtils]]. + * + * Tests use distinctive, high-entropy values that would fail if the implementation + * had hardcoded defaults instead of actually extracting values from the inputs. + */ +class UCUtilsSuite extends SparkFunSuite with SharedSparkSession { + + // Use the same constants as CatalogTableUtils to ensure consistency + private val FEATURE_CATALOG_MANAGED = + TableFeatures.SET_TABLE_FEATURE_SUPPORTED_PREFIX + + TableFeatures.CATALOG_MANAGED_RW_FEATURE.featureName() + private val FEATURE_SUPPORTED = TableFeatures.SET_TABLE_FEATURE_SUPPORTED_VALUE + private val UC_TABLE_ID_KEY = UCCommitCoordinatorClient.UC_TABLE_ID_KEY + private val UC_CATALOG_CONNECTOR = "io.unitycatalog.spark.UCSingleCatalog" + + // Distinctive values that would fail if hardcoded + private val TABLE_ID_ALPHA = "uc_8f2b3c9a-d1e7-4a6f-b8c2" + private val TABLE_PATH_ALPHA = "abfss://delta-store@prod.dfs.core.windows.net/warehouse/tbl_v3" + private val UC_URI_ALPHA = "https://uc-server-westus2.example.net/api/2.1/unity-catalog" + private val UC_TOKEN_ALPHA = "dapi_Xk7mP$9qRs#2vWz_prod" + private val CATALOG_ALPHA = "uc_catalog_westus2_prod" + + // ==================== Helper Methods ==================== + + private def makeNonUCTable(): CatalogTable = { + CatalogTableTestUtils.createCatalogTable(locationUri = Some(new URI(TABLE_PATH_ALPHA))) + } + + private def makeUCTable( + tableId: String = TABLE_ID_ALPHA, + tablePath: String = TABLE_PATH_ALPHA, + catalogName: Option[String] = None): CatalogTable = { + val storageProps = new JHashMap[String, String]() + storageProps.put(FEATURE_CATALOG_MANAGED, FEATURE_SUPPORTED) + storageProps.put(UC_TABLE_ID_KEY, tableId) + + CatalogTableTestUtils.createCatalogTable( + catalogName = catalogName, + storageProperties = storageProps, + locationUri = Some(new URI(tablePath))) + } + + private def withUCCatalogConfig( + catalogName: String, + uri: String, + token: String)(testCode: => Unit): Unit = { + val configs = Seq( + s"spark.sql.catalog.$catalogName" -> UC_CATALOG_CONNECTOR, + s"spark.sql.catalog.$catalogName.uri" -> uri, + s"spark.sql.catalog.$catalogName.token" -> token) + val originalValues = configs.map { case (key, _) => key -> spark.conf.getOption(key) }.toMap + + try { + configs.foreach { case (key, value) => spark.conf.set(key, value) } + testCode + } finally { + configs.foreach { case (key, _) => + originalValues.get(key).flatten match { + case Some(v) => spark.conf.set(key, v) + case None => spark.conf.unset(key) + } + } + } + } + + // ==================== Tests ==================== + + test("returns empty for non-UC table") { + val table = makeNonUCTable() + val result = UCUtils.extractTableInfo(table, spark) + assert(result.isEmpty, "Non-UC table should return empty Optional") + } + + test("returns empty when UC table ID present but feature flag missing") { + val storageProps = new JHashMap[String, String]() + storageProps.put(UC_TABLE_ID_KEY, "orphan_id_9x7y5z") + // No FEATURE_CATALOG_MANAGED - simulates corrupted/partial metadata + + val table = CatalogTableTestUtils.createCatalogTable( + storageProperties = storageProps, + locationUri = Some(new URI("gs://other-bucket/path"))) + val result = UCUtils.extractTableInfo(table, spark) + assert(result.isEmpty, "Missing feature flag should return empty") + } + + test("throws IllegalArgumentException for UC table with empty table ID") { + val storageProps = new JHashMap[String, String]() + storageProps.put(FEATURE_CATALOG_MANAGED, FEATURE_SUPPORTED) + storageProps.put(UC_TABLE_ID_KEY, "") + + val table = CatalogTableTestUtils.createCatalogTable( + storageProperties = storageProps, + locationUri = Some(new URI("s3://empty-id-bucket/path"))) + val exception = intercept[IllegalArgumentException] { + UCUtils.extractTableInfo(table, spark) + } + assert(exception.getMessage.contains("Cannot extract ucTableId")) + } + + test("throws exception for UC table without location") { + val storageProps = new JHashMap[String, String]() + storageProps.put(FEATURE_CATALOG_MANAGED, FEATURE_SUPPORTED) + storageProps.put(UC_TABLE_ID_KEY, "no_location_tbl_id_3k9m") + + val table = CatalogTableTestUtils.createCatalogTable(storageProperties = storageProps) + // Spark throws AnalysisException when location is missing + val exception = intercept[Exception] { + UCUtils.extractTableInfo(table, spark) + } + assert(exception.getMessage.contains("locationUri") || + exception.getMessage.contains("location")) + } + + test("throws IllegalArgumentException when no matching catalog configuration") { + val table = makeUCTable(catalogName = Some("nonexistent_catalog_xyz")) + + val exception = intercept[IllegalArgumentException] { + UCUtils.extractTableInfo(table, spark) + } + assert(exception.getMessage.contains("Unity Catalog configuration not found") || + exception.getMessage.contains("Cannot create UC client")) + } + + test("extracts table info when UC catalog is properly configured") { + val table = makeUCTable(catalogName = Some(CATALOG_ALPHA)) + + withUCCatalogConfig(CATALOG_ALPHA, UC_URI_ALPHA, UC_TOKEN_ALPHA) { + val result = UCUtils.extractTableInfo(table, spark) + + assert(result.isPresent, "Should return table info") + val info = result.get() + // Each assertion uses the specific expected value - would fail if hardcoded + assert(info.getTableId == TABLE_ID_ALPHA, s"Table ID mismatch: got ${info.getTableId}") + assert( + info.getTablePath == TABLE_PATH_ALPHA, + s"Table path mismatch: got ${info.getTablePath}") + assert(info.getUcUri == UC_URI_ALPHA, s"UC URI mismatch: got ${info.getUcUri}") + assert(info.getUcToken == UC_TOKEN_ALPHA, s"UC token mismatch: got ${info.getUcToken}") + } + } + + test("selects correct catalog when multiple catalogs configured") { + // Use completely different values for each catalog to prove selection works + val catalogBeta = "uc_catalog_eastus_staging" + val ucUriBeta = "https://uc-server-eastus.example.net/api/2.1/uc" + val ucTokenBeta = "dapi_Yz3nQ$8wRt#1vXa_staging" + val tableIdBeta = "uc_tbl_staging_4d7e2f1a" + val tablePathBeta = "s3://staging-bucket-us-east/delta/tables/v2" + + val catalogGamma = "uc_catalog_euwest_dev" + val ucUriGamma = "https://uc-server-euwest.example.net/api/2.1/uc" + val ucTokenGamma = "dapi_Jk5pL$3mNq#9vBc_dev" + + // Table is in catalogBeta + val table = makeUCTable( + tableId = tableIdBeta, + tablePath = tablePathBeta, + catalogName = Some(catalogBeta)) + + val configs = Seq( + // catalogGamma config (should NOT be used) + s"spark.sql.catalog.$catalogGamma" -> UC_CATALOG_CONNECTOR, + s"spark.sql.catalog.$catalogGamma.uri" -> ucUriGamma, + s"spark.sql.catalog.$catalogGamma.token" -> ucTokenGamma, + // catalogBeta config (should be used) + s"spark.sql.catalog.$catalogBeta" -> UC_CATALOG_CONNECTOR, + s"spark.sql.catalog.$catalogBeta.uri" -> ucUriBeta, + s"spark.sql.catalog.$catalogBeta.token" -> ucTokenBeta) + val originalValues = configs.map { case (key, _) => key -> spark.conf.getOption(key) }.toMap + + try { + configs.foreach { case (key, value) => spark.conf.set(key, value) } + + val result = UCUtils.extractTableInfo(table, spark) + assert(result.isPresent, "Should return table info") + + val info = result.get() + // Verify it selected catalogBeta's config, not catalogGamma's + assert( + info.getUcUri == ucUriBeta, + s"Should use catalogBeta's URI, got: ${info.getUcUri}") + assert(info.getUcToken == ucTokenBeta, s"Should use catalogBeta's token, got: ${info.getUcToken}") + assert(info.getTableId == tableIdBeta, s"Should extract tableIdBeta, got: ${info.getTableId}") + assert( + info.getTablePath == tablePathBeta, + s"Should extract tablePathBeta, got: ${info.getTablePath}") + } finally { + configs.foreach { case (key, _) => + originalValues.get(key).flatten match { + case Some(v) => spark.conf.set(key, v) + case None => spark.conf.unset(key) + } + } + } + } +} diff --git a/kernel-spark/src/test/scala/io/delta/kernel/spark/snapshot/unitycatalog/UnityCatalogAdapterSuite.scala b/kernel-spark/src/test/scala/io/delta/kernel/spark/snapshot/unitycatalog/UnityCatalogAdapterSuite.scala new file mode 100644 index 00000000000..fc86322db8a --- /dev/null +++ b/kernel-spark/src/test/scala/io/delta/kernel/spark/snapshot/unitycatalog/UnityCatalogAdapterSuite.scala @@ -0,0 +1,281 @@ +/* + * Copyright (2025) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.spark.snapshot.unitycatalog + +import java.io.IOException +import java.net.URI +import java.util.Optional + +import scala.jdk.CollectionConverters._ + +import io.delta.kernel.internal.util.FileNames +import io.delta.storage.commit.{Commit, GetCommitsResponse} +import io.delta.storage.commit.actions.{AbstractMetadata, AbstractProtocol} +import io.delta.storage.commit.uccommitcoordinator.{CommitLimitReachedException, UCClient} +import org.apache.hadoop.fs.{FileStatus, Path} +import org.scalatest.funsuite.AnyFunSuite + +class UnityCatalogAdapterSuite extends AnyFunSuite { + + private val tableId = "testTableId" + private val tablePath = "/tmp/test-table" + private val tableUri = new URI(tablePath) + + private def hadoopStatus(version: Long): FileStatus = { + val stagedPath = new Path(FileNames.stagedCommitFile(s"$tablePath/_delta_log", version)) + new FileStatus(/* length = */ 10L, /* isDir = */ false, /* blockReplication = */ 1, /* blockSize = */ 1, + /* modificationTime = */ version, stagedPath) + } + + private def commit(version: Long): Commit = new Commit(version, hadoopStatus(version), version) + + private class StubUCClient( + response: => GetCommitsResponse, + throwOnGet: Option[Throwable] = None) extends UCClient { + override def getMetastoreId(): String = "meta" + + override def commit( + tableId: String, + tableUri: URI, + commit: Optional[Commit], + lastKnownBackfilledVersion: Optional[java.lang.Long], + disown: Boolean, + newMetadata: Optional[AbstractMetadata], + newProtocol: Optional[AbstractProtocol]): Unit = { + throw new UnsupportedOperationException("not used in tests") + } + + override def getCommits( + tableId: String, + tableUri: URI, + startVersion: Optional[java.lang.Long], + endVersion: Optional[java.lang.Long]): GetCommitsResponse = { + throwOnGet.foreach { + case io: IOException => throw io + case other: Throwable => throw other + } + response + } + + override def close(): Unit = {} + } + + test("getCommits returns response from UCClient") { + val commits = List(2L, 0L, 1L).map(commit) + val adapter = new UnityCatalogAdapter(tableId, tablePath, new StubUCClient( + new GetCommitsResponse(commits.asJava, /* latest */ 2L))) + + val response = adapter.getCommits(0, Optional.empty()) + val versions = response.getCommits.asScala.map(_.getVersion) + assert(versions.toSet == Set(0L, 1L, 2L)) + assert(response.getLatestTableVersion == 2L) + } + + test("getLatestRatifiedVersion returns latest version from response") { + val commits = List(0L, 1L, 2L).map(commit) + val adapter = new UnityCatalogAdapter(tableId, tablePath, new StubUCClient( + new GetCommitsResponse(commits.asJava, /* latest */ 2L))) + + assert(adapter.getLatestRatifiedVersion() == 2L) + } + + test("getLatestRatifiedVersion maps -1 to 0") { + val adapter = new UnityCatalogAdapter(tableId, tablePath, new StubUCClient( + new GetCommitsResponse(List.empty[Commit].asJava, /* latest */ -1L))) + + assert(adapter.getLatestRatifiedVersion() == 0L) + } + + test("getCommits wraps IO failures") { + val adapter = new UnityCatalogAdapter(tableId, tablePath, new StubUCClient( + response = null, + throwOnGet = Some(new IOException("boom")))) + + assertThrows[java.io.UncheckedIOException] { + adapter.getCommits(0, Optional.empty()) + } + } + + test("getCommits wraps UC coordinator failures") { + val adapter = new UnityCatalogAdapter(tableId, tablePath, new StubUCClient( + response = null, + throwOnGet = Some(new CommitLimitReachedException("boom")))) + + assertThrows[RuntimeException] { + adapter.getCommits(0, Optional.empty()) + } + } + + test("getCommits passes endVersion to UCClient") { + var capturedEndVersion: Optional[java.lang.Long] = null + val commits = List(0L, 1L, 2L).map(commit) + + val capturingClient = new UCClient { + override def getMetastoreId(): String = "meta" + override def commit( + tableId: String, + tableUri: URI, + commit: Optional[Commit], + lastKnownBackfilledVersion: Optional[java.lang.Long], + disown: Boolean, + newMetadata: Optional[AbstractMetadata], + newProtocol: Optional[AbstractProtocol]): Unit = { + throw new UnsupportedOperationException("not used") + } + override def getCommits( + tableId: String, + tableUri: URI, + startVersion: Optional[java.lang.Long], + endVersion: Optional[java.lang.Long]): GetCommitsResponse = { + capturedEndVersion = endVersion + new GetCommitsResponse(commits.asJava, /* latest */ 2L) + } + override def close(): Unit = {} + } + + val adapter = new UnityCatalogAdapter(tableId, tablePath, capturingClient) + + // Test with specific endVersion + adapter.getCommits(0, Optional.of(java.lang.Long.valueOf(1L))) + assert(capturedEndVersion.isPresent) + assert(capturedEndVersion.get() == 1L) + + // Test with empty endVersion + adapter.getCommits(0, Optional.empty()) + assert(!capturedEndVersion.isPresent) + } + + test("getCommits filters commits up to endVersion") { + val allCommits = List(0L, 1L, 2L, 3L).map(commit) + + // Client returns filtered commits when endVersion specified + def clientWithEndVersionFilter(endOpt: Optional[java.lang.Long]): UCClient = new UCClient { + override def getMetastoreId(): String = "meta" + override def commit( + tableId: String, + tableUri: URI, + commit: Optional[Commit], + lastKnownBackfilledVersion: Optional[java.lang.Long], + disown: Boolean, + newMetadata: Optional[AbstractMetadata], + newProtocol: Optional[AbstractProtocol]): Unit = { + throw new UnsupportedOperationException("not used") + } + override def getCommits( + tableId: String, + tableUri: URI, + startVersion: Optional[java.lang.Long], + endVersion: Optional[java.lang.Long]): GetCommitsResponse = { + val filtered = if (endVersion.isPresent) { + allCommits.filter(_.getVersion <= endVersion.get()) + } else { + allCommits + } + new GetCommitsResponse(filtered.asJava, /* latest */ 3L) + } + override def close(): Unit = {} + } + + // Request commits up to version 1 + val adapter = new UnityCatalogAdapter(tableId, tablePath, clientWithEndVersionFilter(Optional.of(1L))) + val response = adapter.getCommits(0, Optional.of(java.lang.Long.valueOf(1L))) + val versions = response.getCommits.asScala.map(_.getVersion) + assert(versions == Seq(0L, 1L)) + } + + test("getTableId and getTablePath return constructor values") { + val adapter = new UnityCatalogAdapter(tableId, tablePath, new StubUCClient( + new GetCommitsResponse(List.empty[Commit].asJava, /* latest */ -1L))) + + assert(adapter.getTableId == tableId) + assert(adapter.getTablePath == tablePath) + } + + test("close calls UCClient close") { + var closeCalled = false + val client = new UCClient { + override def getMetastoreId(): String = "meta" + override def commit( + tableId: String, + tableUri: URI, + commit: Optional[Commit], + lastKnownBackfilledVersion: Optional[java.lang.Long], + disown: Boolean, + newMetadata: Optional[AbstractMetadata], + newProtocol: Optional[AbstractProtocol]): Unit = {} + override def getCommits( + tableId: String, + tableUri: URI, + startVersion: Optional[java.lang.Long], + endVersion: Optional[java.lang.Long]): GetCommitsResponse = { + new GetCommitsResponse(List.empty[Commit].asJava, -1L) + } + override def close(): Unit = { closeCalled = true } + } + + val adapter = new UnityCatalogAdapter(tableId, tablePath, client) + adapter.close() + assert(closeCalled) + } + + test("close swallows exceptions from UCClient") { + val client = new UCClient { + override def getMetastoreId(): String = "meta" + override def commit( + tableId: String, + tableUri: URI, + commit: Optional[Commit], + lastKnownBackfilledVersion: Optional[java.lang.Long], + disown: Boolean, + newMetadata: Optional[AbstractMetadata], + newProtocol: Optional[AbstractProtocol]): Unit = {} + override def getCommits( + tableId: String, + tableUri: URI, + startVersion: Optional[java.lang.Long], + endVersion: Optional[java.lang.Long]): GetCommitsResponse = { + new GetCommitsResponse(List.empty[Commit].asJava, -1L) + } + override def close(): Unit = { throw new RuntimeException("close failed") } + } + + val adapter = new UnityCatalogAdapter(tableId, tablePath, client) + // Should not throw + adapter.close() + } + + // Factory method tests + + test("fromConnectionInfo throws on null input") { + assertThrows[NullPointerException] { + UnityCatalogAdapter.fromConnectionInfo(null) + } + } + + test("fromConnectionInfo creates adapter with correct tableId and tablePath") { + val info = new UnityCatalogConnectionInfo( + /* tableId = */ "test-table-id-123", + /* tablePath = */ "/path/to/delta/table", + /* endpoint = */ "https://example.net/api", + /* token = */ "test-token") + + val adapter = UnityCatalogAdapter.fromConnectionInfo(info) + .asInstanceOf[UnityCatalogAdapter] + + assert(adapter.getTableId == "test-table-id-123") + assert(adapter.getTablePath == "/path/to/delta/table") + } +} diff --git a/kernel-spark/src/test/scala/io/delta/kernel/spark/utils/CatalogTableTestUtils.scala b/kernel-spark/src/test/scala/io/delta/kernel/spark/utils/CatalogTableTestUtils.scala index 4f80f346bb8..ea98efb54b5 100644 --- a/kernel-spark/src/test/scala/io/delta/kernel/spark/utils/CatalogTableTestUtils.scala +++ b/kernel-spark/src/test/scala/io/delta/kernel/spark/utils/CatalogTableTestUtils.scala @@ -15,8 +15,6 @@ */ package io.delta.kernel.spark.utils -import scala.collection.immutable.Seq - import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType} import org.apache.spark.sql.types.StructType @@ -30,58 +28,52 @@ import org.apache.spark.sql.types.StructType */ object CatalogTableTestUtils { - def catalogTableWithProperties( - properties: java.util.Map[String, String], - storageProperties: java.util.Map[String, String]): CatalogTable = { + /** + * Creates a [[CatalogTable]] with configurable options. + * + * @param tableName table name (default: "tbl") + * @param catalogName optional catalog name for the identifier + * @param properties table properties (default: empty) + * @param storageProperties storage properties (default: empty) + * @param locationUri optional storage location URI + * @param nullStorage if true, sets storage to null (for edge case testing) + * @param nullStorageProperties if true, sets storage properties to null + */ + def createCatalogTable( + tableName: String = "tbl", + catalogName: Option[String] = None, + properties: java.util.Map[String, String] = new java.util.HashMap[String, String](), + storageProperties: java.util.Map[String, String] = new java.util.HashMap[String, String](), + locationUri: Option[java.net.URI] = None, + nullStorage: Boolean = false, + nullStorageProperties: Boolean = false): CatalogTable = { + val scalaProps = ScalaUtils.toScalaMap(properties) - val scalaStorageProps = ScalaUtils.toScalaMap(storageProperties) + val scalaStorageProps = + if (nullStorageProperties) null else ScalaUtils.toScalaMap(storageProperties) - CatalogTable( - identifier = TableIdentifier("tbl"), - tableType = CatalogTableType.MANAGED, - storage = CatalogStorageFormat( - locationUri = None, + val identifier = catalogName match { + case Some(catalog) => + TableIdentifier(tableName, Some("default") /* database */, Some(catalog) /* catalog */ ) + case None => TableIdentifier(tableName) + } + + val storage = if (nullStorage) { + null + } else { + CatalogStorageFormat( + locationUri = locationUri, inputFormat = None, outputFormat = None, serde = None, compressed = false, - properties = scalaStorageProps), - schema = new StructType(), - provider = None, - partitionColumnNames = Seq.empty, - bucketSpec = None, - properties = scalaProps) - } - - def catalogTableWithNullStorage( - properties: java.util.Map[String, String]): CatalogTable = { - val scalaProps = ScalaUtils.toScalaMap(properties) - - CatalogTable( - identifier = TableIdentifier("tbl"), - tableType = CatalogTableType.MANAGED, - storage = null, - schema = new StructType(), - provider = None, - partitionColumnNames = Seq.empty, - bucketSpec = None, - properties = scalaProps) - } - - def catalogTableWithNullStorageProperties( - properties: java.util.Map[String, String]): CatalogTable = { - val scalaProps = ScalaUtils.toScalaMap(properties) + properties = scalaStorageProps) + } CatalogTable( - identifier = TableIdentifier("tbl"), + identifier = identifier, tableType = CatalogTableType.MANAGED, - storage = CatalogStorageFormat( - locationUri = None, - inputFormat = None, - outputFormat = None, - serde = None, - compressed = false, - properties = null), + storage = storage, schema = new StructType(), provider = None, partitionColumnNames = Seq.empty, diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/coordinatedcommits/UCCommitCoordinatorBuilder.scala b/spark/src/main/scala/org/apache/spark/sql/delta/coordinatedcommits/UCCommitCoordinatorBuilder.scala index 3faab141bf1..377214d45a4 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/coordinatedcommits/UCCommitCoordinatorBuilder.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/coordinatedcommits/UCCommitCoordinatorBuilder.scala @@ -229,6 +229,16 @@ object UCCommitCoordinatorBuilder .toList } + /** + * Returns catalog configurations as a Map for O(1) lookup by catalog name. + * Wraps [[getCatalogConfigs]] results in [[UCCatalogConfig]] for better readability. + */ + private[delta] def getCatalogConfigMap(spark: SparkSession): Map[String, UCCatalogConfig] = { + getCatalogConfigs(spark).map { + case (name, uri, token) => name -> UCCatalogConfig(name, uri, token) + }.toMap + } + private def safeClose(ucClient: UCClient, uri: String): Unit = { try { ucClient.close() @@ -252,3 +262,9 @@ object UCTokenBasedRestClientFactory extends UCClientFactory { override def createUCClient(uri: String, token: String): UCClient = new UCTokenBasedRestClient(uri, token) } + +/** + * Holder for Unity Catalog configuration extracted from Spark configs. + * Used by [[UCCommitCoordinatorBuilder.getCatalogConfigMap]]. + */ +case class UCCatalogConfig(catalogName: String, uri: String, token: String)