diff --git a/build.sbt b/build.sbt
index 357d34fb40d..cba6f615c01 100644
--- a/build.sbt
+++ b/build.sbt
@@ -462,6 +462,7 @@ lazy val sparkV1Filtered = (project in file("spark-v1-filtered"))
lazy val sparkV2 = (project in file("kernel-spark"))
.dependsOn(sparkV1Filtered)
.dependsOn(kernelDefaults)
+ .dependsOn(kernelUnityCatalog)
.dependsOn(goldenTables % "test")
.settings(
name := "delta-spark-v2",
diff --git a/kernel-spark/src/main/java/io/delta/kernel/spark/catalog/SparkTable.java b/kernel-spark/src/main/java/io/delta/kernel/spark/catalog/SparkTable.java
index bef04d4f1f7..afe8abd98ec 100644
--- a/kernel-spark/src/main/java/io/delta/kernel/spark/catalog/SparkTable.java
+++ b/kernel-spark/src/main/java/io/delta/kernel/spark/catalog/SparkTable.java
@@ -21,7 +21,7 @@
import io.delta.kernel.Snapshot;
import io.delta.kernel.spark.read.SparkScanBuilder;
import io.delta.kernel.spark.snapshot.DeltaSnapshotManager;
-import io.delta.kernel.spark.snapshot.PathBasedSnapshotManager;
+import io.delta.kernel.spark.snapshot.DeltaSnapshotManagerFactory;
import io.delta.kernel.spark.utils.SchemaUtils;
import java.util.*;
import org.apache.hadoop.conf.Configuration;
@@ -138,9 +138,11 @@ private SparkTable(
merged.putAll(userOptions);
this.options = Collections.unmodifiableMap(merged);
- this.hadoopConf =
- SparkSession.active().sessionState().newHadoopConfWithOptions(toScalaMap(options));
- this.snapshotManager = new PathBasedSnapshotManager(tablePath, hadoopConf);
+ SparkSession spark = SparkSession.active();
+ this.hadoopConf = spark.sessionState().newHadoopConfWithOptions(toScalaMap(options));
+ // Use factory to create appropriate snapshot manager (catalog-managed vs path-based)
+ this.snapshotManager =
+ DeltaSnapshotManagerFactory.create(tablePath, catalogTable, spark, hadoopConf);
// Load the initial snapshot through the manager
this.initialSnapshot = snapshotManager.loadLatestSnapshot();
this.schema = SchemaUtils.convertKernelSchemaToSparkSchema(initialSnapshot.getSchema());
diff --git a/kernel-spark/src/main/java/io/delta/kernel/spark/snapshot/CatalogManagedSnapshotManager.java b/kernel-spark/src/main/java/io/delta/kernel/spark/snapshot/CatalogManagedSnapshotManager.java
new file mode 100644
index 00000000000..9ef166cdc42
--- /dev/null
+++ b/kernel-spark/src/main/java/io/delta/kernel/spark/snapshot/CatalogManagedSnapshotManager.java
@@ -0,0 +1,160 @@
+/*
+ * Copyright (2025) The Delta Lake Project Authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.delta.kernel.spark.snapshot;
+
+import static io.delta.kernel.internal.util.Preconditions.checkArgument;
+import static java.util.Objects.requireNonNull;
+
+import io.delta.kernel.CommitRange;
+import io.delta.kernel.Snapshot;
+import io.delta.kernel.defaults.engine.DefaultEngine;
+import io.delta.kernel.engine.Engine;
+import io.delta.kernel.exceptions.KernelException;
+import io.delta.kernel.internal.DeltaHistoryManager;
+import io.delta.kernel.spark.exception.VersionNotFoundException;
+import java.util.Optional;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.spark.annotation.Experimental;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation of DeltaSnapshotManager for catalog-managed tables (e.g., UC).
+ *
+ *
This snapshot manager is agnostic to the underlying catalog implementation. It delegates to a
+ * {@link ManagedCommitClient}, keeping catalog-specific wiring out of the manager itself.
+ */
+@Experimental
+public class CatalogManagedSnapshotManager implements DeltaSnapshotManager, AutoCloseable {
+
+ private static final Logger logger = LoggerFactory.getLogger(CatalogManagedSnapshotManager.class);
+
+ private final ManagedCommitClient commitClient;
+ private final Engine kernelEngine;
+
+ public CatalogManagedSnapshotManager(ManagedCommitClient commitClient, Configuration hadoopConf) {
+ this.commitClient = requireNonNull(commitClient, "commitClient is null");
+ requireNonNull(hadoopConf, "hadoopConf is null");
+
+ this.kernelEngine = DefaultEngine.create(hadoopConf);
+ logger.info(
+ "Created CatalogManagedSnapshotManager for table {} at path {}",
+ commitClient.getTableId(),
+ commitClient.getTablePath());
+ }
+
+ /** Loads the latest snapshot of the catalog-managed Delta table. */
+ @Override
+ public Snapshot loadLatestSnapshot() {
+ return commitClient.loadSnapshot(kernelEngine, Optional.empty(), Optional.empty());
+ }
+
+ /**
+ * Loads a specific version of the Unity Catalog managed Delta table.
+ *
+ * @param version the version to load (must be >= 0)
+ * @return the snapshot at the specified version
+ */
+ @Override
+ public Snapshot loadSnapshotAt(long version) {
+ checkArgument(version >= 0, "version must be non-negative");
+ return commitClient.loadSnapshot(kernelEngine, Optional.of(version), Optional.empty());
+ }
+
+ /**
+ * Finds the active commit at a specific timestamp.
+ *
+ *
Note: This operation is not yet supported for Unity Catalog managed tables
+ * because it requires filesystem-based commit history which is not accessible for catalog-managed
+ * tables. Unity Catalog coordinates commits differently than traditional Delta tables.
+ *
+ * @throws UnsupportedOperationException always - not yet implemented for catalog-managed tables
+ */
+ @Override
+ public DeltaHistoryManager.Commit getActiveCommitAtTime(
+ long timestampMillis,
+ boolean canReturnLastCommit,
+ boolean mustBeRecreatable,
+ boolean canReturnEarliestCommit) {
+ throw new UnsupportedOperationException(
+ "getActiveCommitAtTime not yet implemented for catalog-managed tables. "
+ + "This operation requires filesystem-based commit history which may not be "
+ + "available for catalog-managed tables.");
+ }
+
+ /**
+ * Checks if a specific version exists and is accessible.
+ *
+ *
Performance Note: For Unity Catalog managed tables, version checking
+ * requires loading the full snapshot including all file metadata. This is less efficient than
+ * filesystem-based checks which can verify log file existence without reading contents.
+ *
+ *
TODO (Next PR): Add lightweight version checking API to
+ * UCCatalogManagedClient to avoid loading full snapshots for existence checks.
+ *
+ * @throws VersionNotFoundException if the version is not available
+ */
+ @Override
+ public void checkVersionExists(long version, boolean mustBeRecreatable, boolean allowOutOfRange)
+ throws VersionNotFoundException {
+ checkArgument(version >= 0, "version must be non-negative");
+
+ try {
+ // Attempt to load the snapshot at the specified version
+ // Note: This loads the full snapshot - see performance note above
+ loadSnapshotAt(version);
+ } catch (KernelException e) {
+ // Specific Kernel exceptions indicate version doesn't exist or isn't accessible
+ // Let other exceptions (network failures, auth errors, etc.) propagate to caller
+ long latestVersion = loadLatestSnapshot().getVersion();
+ throw new VersionNotFoundException(version, 0, latestVersion);
+ }
+ }
+
+ /**
+ * Gets a range of table changes between versions.
+ *
+ *
Note: This operation delegates to the managed commit client.
+ *
+ * @throws UnsupportedOperationException if not yet implemented for catalog-managed tables
+ */
+ @Override
+ public CommitRange getTableChanges(Engine engine, long startVersion, Optional endVersion) {
+ requireNonNull(engine, "engine is null");
+ checkArgument(startVersion >= 0, "startVersion must be non-negative");
+ endVersion.ifPresent(v -> checkArgument(v >= 0, "endVersion must be non-negative"));
+
+ return commitClient.loadCommitRange(
+ engine, Optional.of(startVersion), Optional.empty(), endVersion, Optional.empty());
+ }
+
+ /**
+ * Closes the UC client and releases resources.
+ *
+ * This method should be called when the snapshot manager is no longer needed. Prefer using
+ * try-with-resources to ensure proper cleanup.
+ */
+ @Override
+ public void close() {
+ try {
+ commitClient.close();
+ logger.info("Closed CatalogManagedSnapshotManager for table {}", commitClient.getTableId());
+ } catch (Exception e) {
+ logger.warn(
+ "Error closing catalog-managed client for table {}", commitClient.getTableId(), e);
+ }
+ }
+}
diff --git a/kernel-spark/src/main/java/io/delta/kernel/spark/snapshot/DeltaSnapshotManagerFactory.java b/kernel-spark/src/main/java/io/delta/kernel/spark/snapshot/DeltaSnapshotManagerFactory.java
new file mode 100644
index 00000000000..bb86b7fc5ea
--- /dev/null
+++ b/kernel-spark/src/main/java/io/delta/kernel/spark/snapshot/DeltaSnapshotManagerFactory.java
@@ -0,0 +1,99 @@
+/*
+ * Copyright (2025) The Delta Lake Project Authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.delta.kernel.spark.snapshot;
+
+import static java.util.Objects.requireNonNull;
+
+import io.delta.kernel.spark.snapshot.unitycatalog.UnityCatalogManagedCommitClient;
+import java.util.Optional;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.spark.annotation.Experimental;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.catalog.CatalogTable;
+
+/**
+ * Factory for creating {@link DeltaSnapshotManager} instances.
+ *
+ *
This factory determines the appropriate snapshot manager implementation based on table
+ * characteristics and automatically handles the selection between:
+ *
+ *
+ * - {@link CatalogManagedSnapshotManager} - for Unity Catalog managed tables (CCv2)
+ *
- {@link PathBasedSnapshotManager} - for regular filesystem-based Delta tables
+ *
+ *
+ * The factory encapsulates the decision logic so that callers (e.g., {@code SparkTable}) don't
+ * need to know about specific manager implementations.
+ *
+ *
Example usage:
+ *
+ *
{@code
+ * DeltaSnapshotManager manager = DeltaSnapshotManagerFactory.create(
+ * tablePath,
+ * Optional.of(catalogTable),
+ * spark,
+ * hadoopConf
+ * );
+ * Snapshot snapshot = manager.loadLatestSnapshot();
+ * }
+ */
+@Experimental
+public final class DeltaSnapshotManagerFactory {
+
+ // Utility class - no instances
+ private DeltaSnapshotManagerFactory() {}
+
+ /**
+ * Creates the appropriate snapshot manager for a Delta table.
+ *
+ * Selection logic:
+ *
+ *
+ * - If {@code catalogTable} is present and UC-managed → {@link CatalogManagedSnapshotManager}
+ *
- Otherwise → {@link PathBasedSnapshotManager}
+ *
+ *
+ * @param tablePath filesystem path to the Delta table root
+ * @param catalogTable optional Spark catalog table metadata
+ * @param spark SparkSession for resolving Unity Catalog configurations
+ * @param hadoopConf Hadoop configuration for the Delta Kernel engine
+ * @return appropriate snapshot manager implementation
+ * @throws NullPointerException if tablePath, spark, or hadoopConf is null
+ * @throws IllegalArgumentException if catalogTable is UC-managed but configuration is invalid
+ */
+ public static DeltaSnapshotManager create(
+ String tablePath,
+ Optional catalogTable,
+ SparkSession spark,
+ Configuration hadoopConf) {
+
+ requireNonNull(tablePath, "tablePath is null");
+ requireNonNull(catalogTable, "catalogTable is null");
+ requireNonNull(spark, "spark is null");
+ requireNonNull(hadoopConf, "hadoopConf is null");
+
+ if (catalogTable.isPresent()) {
+ Optional clientOpt =
+ UnityCatalogManagedCommitClient.fromCatalog(catalogTable.get(), spark);
+ if (clientOpt.isPresent()) {
+ return new CatalogManagedSnapshotManager(clientOpt.get(), hadoopConf);
+ }
+ }
+
+ // Default to path-based snapshot manager
+ return new PathBasedSnapshotManager(tablePath, hadoopConf);
+ }
+}
diff --git a/kernel-spark/src/main/java/io/delta/kernel/spark/snapshot/ManagedCommitClient.java b/kernel-spark/src/main/java/io/delta/kernel/spark/snapshot/ManagedCommitClient.java
new file mode 100644
index 00000000000..ad2088b79ff
--- /dev/null
+++ b/kernel-spark/src/main/java/io/delta/kernel/spark/snapshot/ManagedCommitClient.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright (2025) The Delta Lake Project Authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.delta.kernel.spark.snapshot;
+
+import io.delta.kernel.CommitRange;
+import io.delta.kernel.Snapshot;
+import io.delta.kernel.engine.Engine;
+import java.util.Optional;
+
+/**
+ * Catalog-managed commit client that knows how to load snapshots and commit ranges for a specific
+ * table.
+ */
+public interface ManagedCommitClient extends AutoCloseable {
+
+ /** @return catalog-managed table identifier (for logging/telemetry). */
+ String getTableId();
+
+ /** @return physical table path used by Delta Kernel. */
+ String getTablePath();
+
+ Snapshot loadSnapshot(Engine engine, Optional versionOpt, Optional timestampOpt);
+
+ CommitRange loadCommitRange(
+ Engine engine,
+ Optional startVersionOpt,
+ Optional startTimestampOpt,
+ Optional endVersionOpt,
+ Optional endTimestampOpt);
+
+ @Override
+ void close();
+}
diff --git a/kernel-spark/src/main/java/io/delta/kernel/spark/snapshot/unitycatalog/UnityCatalogManagedCommitClient.java b/kernel-spark/src/main/java/io/delta/kernel/spark/snapshot/unitycatalog/UnityCatalogManagedCommitClient.java
new file mode 100644
index 00000000000..5f3c1b55003
--- /dev/null
+++ b/kernel-spark/src/main/java/io/delta/kernel/spark/snapshot/unitycatalog/UnityCatalogManagedCommitClient.java
@@ -0,0 +1,161 @@
+/*
+ * Copyright (2025) The Delta Lake Project Authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.delta.kernel.spark.snapshot.unitycatalog;
+
+import static java.util.Objects.requireNonNull;
+
+import io.delta.kernel.CommitRange;
+import io.delta.kernel.Snapshot;
+import io.delta.kernel.engine.Engine;
+import io.delta.kernel.spark.snapshot.ManagedCommitClient;
+import io.delta.kernel.spark.utils.CatalogTableUtils;
+import io.delta.kernel.unitycatalog.UCCatalogManagedClient;
+import io.delta.storage.commit.uccommitcoordinator.UCClient;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.catalog.CatalogTable;
+import org.apache.spark.sql.delta.coordinatedcommits.UCCommitCoordinatorBuilder$;
+import org.apache.spark.sql.delta.coordinatedcommits.UCTokenBasedRestClientFactory$;
+
+/** UC-backed implementation of {@link ManagedCommitClient}. */
+public final class UnityCatalogManagedCommitClient implements ManagedCommitClient {
+
+ private final String tableId;
+ private final String tablePath;
+ private final UCClient ucClient;
+ private final UCCatalogManagedClient ucManagedClient;
+
+ public UnityCatalogManagedCommitClient(String tableId, String tablePath, UCClient ucClient) {
+ this.tableId = requireNonNull(tableId, "tableId is null");
+ this.tablePath = requireNonNull(tablePath, "tablePath is null");
+ this.ucClient = requireNonNull(ucClient, "ucClient is null");
+ this.ucManagedClient = new UCCatalogManagedClient(ucClient);
+ }
+
+ /**
+ * Builds a UC-backed {@link ManagedCommitClient} for a UC-managed table.
+ *
+ * @throws IllegalArgumentException if the table lacks UC identifiers or catalog config is missing
+ */
+ public static Optional fromCatalog(
+ CatalogTable catalogTable, SparkSession spark) {
+ requireNonNull(catalogTable, "catalogTable is null");
+ requireNonNull(spark, "spark is null");
+
+ if (!CatalogTableUtils.isUnityCatalogManagedTable(catalogTable)) {
+ return Optional.empty();
+ }
+
+ String tableId = extractUCTableId(catalogTable);
+ String tablePath = extractTablePath(catalogTable);
+ UCClient client = createUCClient(catalogTable, spark);
+ return Optional.of(new UnityCatalogManagedCommitClient(tableId, tablePath, client));
+ }
+
+ @Override
+ public String getTableId() {
+ return tableId;
+ }
+
+ @Override
+ public String getTablePath() {
+ return tablePath;
+ }
+
+ @Override
+ public Snapshot loadSnapshot(
+ Engine engine, Optional versionOpt, Optional timestampOpt) {
+ return ucManagedClient.loadSnapshot(engine, tableId, tablePath, versionOpt, timestampOpt);
+ }
+
+ @Override
+ public CommitRange loadCommitRange(
+ Engine engine,
+ Optional startVersionOpt,
+ Optional startTimestampOpt,
+ Optional endVersionOpt,
+ Optional endTimestampOpt) {
+ return ucManagedClient.loadCommitRange(
+ engine,
+ tableId,
+ tablePath,
+ startVersionOpt,
+ startTimestampOpt,
+ endVersionOpt,
+ endTimestampOpt);
+ }
+
+ @Override
+ public void close() {
+ try {
+ ucClient.close();
+ } catch (Exception e) {
+ // Swallow close errors to avoid disrupting caller cleanup
+ }
+ }
+
+ private static String extractUCTableId(CatalogTable catalogTable) {
+ Map storageProperties =
+ scala.collection.JavaConverters.mapAsJavaMap(catalogTable.storage().properties());
+
+ String ucTableId =
+ storageProperties.get(
+ io.delta.storage.commit.uccommitcoordinator.UCCommitCoordinatorClient.UC_TABLE_ID_KEY);
+ if (ucTableId == null || ucTableId.isEmpty()) {
+ throw new IllegalArgumentException(
+ "Cannot extract ucTableId from table " + catalogTable.identifier());
+ }
+ return ucTableId;
+ }
+
+ private static String extractTablePath(CatalogTable catalogTable) {
+ if (catalogTable.location() == null) {
+ throw new IllegalArgumentException(
+ "Cannot extract table path: location is null for table " + catalogTable.identifier());
+ }
+ return catalogTable.location().toString();
+ }
+
+ private static UCClient createUCClient(CatalogTable catalogTable, SparkSession spark) {
+ scala.Option catalogOption = catalogTable.identifier().catalog();
+ String catalogName =
+ catalogOption.isDefined()
+ ? catalogOption.get()
+ : spark.sessionState().catalogManager().currentCatalog().name();
+
+ scala.collection.immutable.List> scalaConfigs =
+ UCCommitCoordinatorBuilder$.MODULE$.getCatalogConfigs(spark);
+
+ Optional> configTuple =
+ scala.jdk.javaapi.CollectionConverters.asJava(scalaConfigs).stream()
+ .filter(tuple -> tuple._1().equals(catalogName))
+ .findFirst();
+
+ if (!configTuple.isPresent()) {
+ throw new IllegalArgumentException(
+ "Cannot create UC client: Unity Catalog configuration not found for catalog '"
+ + catalogName
+ + "'.");
+ }
+
+ scala.Tuple3 config = configTuple.get();
+ String uri = config._2();
+ String token = config._3();
+
+ return UCTokenBasedRestClientFactory$.MODULE$.createUCClient(uri, token);
+ }
+}
diff --git a/kernel-spark/src/test/java/io/delta/kernel/spark/snapshot/CatalogManagedSnapshotManagerTest.java b/kernel-spark/src/test/java/io/delta/kernel/spark/snapshot/CatalogManagedSnapshotManagerTest.java
new file mode 100644
index 00000000000..a2913160109
--- /dev/null
+++ b/kernel-spark/src/test/java/io/delta/kernel/spark/snapshot/CatalogManagedSnapshotManagerTest.java
@@ -0,0 +1,74 @@
+/*
+ * Copyright (2025) The Delta Lake Project Authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.delta.kernel.spark.snapshot;
+
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import org.apache.hadoop.conf.Configuration;
+import org.junit.jupiter.api.Test;
+
+/** Tests for {@link CatalogManagedSnapshotManager}. */
+class CatalogManagedSnapshotManagerTest {
+
+ @Test
+ void testConstructor_NullHadoopConf_ThrowsException() {
+ assertThrows(
+ NullPointerException.class,
+ () -> new CatalogManagedSnapshotManager(new NoOpClient(), null),
+ "Null hadoopConf should throw NullPointerException");
+ }
+
+ @Test
+ void testConstructor_NullClient_ThrowsException() {
+ assertThrows(
+ NullPointerException.class,
+ () -> new CatalogManagedSnapshotManager(null, new Configuration()),
+ "Null commitClient should throw NullPointerException");
+ }
+
+ private static final class NoOpClient implements ManagedCommitClient {
+ @Override
+ public String getTableId() {
+ return "dummy";
+ }
+
+ @Override
+ public String getTablePath() {
+ return "/tmp/dummy";
+ }
+
+ @Override
+ public io.delta.kernel.Snapshot loadSnapshot(
+ io.delta.kernel.engine.Engine engine,
+ java.util.Optional versionOpt,
+ java.util.Optional timestampOpt) {
+ throw new UnsupportedOperationException("noop");
+ }
+
+ @Override
+ public io.delta.kernel.CommitRange loadCommitRange(
+ io.delta.kernel.engine.Engine engine,
+ java.util.Optional startVersionOpt,
+ java.util.Optional startTimestampOpt,
+ java.util.Optional endVersionOpt,
+ java.util.Optional endTimestampOpt) {
+ throw new UnsupportedOperationException("noop");
+ }
+
+ @Override
+ public void close() {}
+ }
+}
diff --git a/kernel-spark/src/test/java/io/delta/kernel/spark/snapshot/DeltaSnapshotManagerFactoryTest.java b/kernel-spark/src/test/java/io/delta/kernel/spark/snapshot/DeltaSnapshotManagerFactoryTest.java
new file mode 100644
index 00000000000..61c80acac08
--- /dev/null
+++ b/kernel-spark/src/test/java/io/delta/kernel/spark/snapshot/DeltaSnapshotManagerFactoryTest.java
@@ -0,0 +1,80 @@
+/*
+ * Copyright (2025) The Delta Lake Project Authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.delta.kernel.spark.snapshot;
+
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import java.util.Optional;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.jupiter.api.Test;
+
+/** Tests for {@link DeltaSnapshotManagerFactory}. */
+class DeltaSnapshotManagerFactoryTest {
+
+ @Test
+ void testCreate_NullTablePath_ThrowsException() {
+ assertThrows(
+ NullPointerException.class,
+ () -> DeltaSnapshotManagerFactory.create(null, Optional.empty(), null, new Configuration()),
+ "Null tablePath should throw NullPointerException");
+ }
+
+ @Test
+ void testCreate_NullCatalogTable_ThrowsException() {
+ assertThrows(
+ NullPointerException.class,
+ () -> DeltaSnapshotManagerFactory.create("/tmp/test", null, null, new Configuration()),
+ "Null catalogTable should throw NullPointerException");
+ }
+
+ @Test
+ void testCreate_NullSpark_ThrowsException() {
+ assertThrows(
+ NullPointerException.class,
+ () ->
+ DeltaSnapshotManagerFactory.create(
+ "/tmp/test", Optional.empty(), null, new Configuration()),
+ "Null spark should throw NullPointerException");
+ }
+
+ @Test
+ void testCreate_NullHadoopConf_ThrowsException() {
+ assertThrows(
+ NullPointerException.class,
+ () -> DeltaSnapshotManagerFactory.create("/tmp/test", Optional.empty(), null, null),
+ "Null hadoopConf should throw NullPointerException");
+ }
+
+ // Note: Factory behavior tests (which manager type is created) require integration test setup.
+ // The following tests cannot be implemented as unit tests because the factory requires
+ // a non-null SparkSession parameter:
+ //
+ // - testCreate_NonCatalogManagedTable_ReturnsPathBasedManager: Verify non-UC tables use PathBased
+ // - testCreate_EmptyCatalogTable_ReturnsPathBasedManager: Verify empty catalogTable uses
+ // PathBased
+ // - testCreate_UCManagedTable_ReturnsCatalogManagedManager: Verify UC tables use CatalogManaged
+ //
+ // While PathBasedSnapshotManager doesn't technically need SparkSession, the factory API requires
+ // it to maintain a clean, consistent interface (always available in production via SparkTable).
+ // Cannot mock SparkSession effectively for these tests.
+ //
+ // Note: Testing CatalogManagedSnapshotManager creation requires integration tests with
+ // real SparkSession and Unity Catalog configuration. This is because:
+ // 1. CatalogManagedSnapshotManager constructor validates UC table and extracts metadata
+ // 2. It requires configured UC catalog (spark.sql.catalog.*.uri/token)
+ // 3. Unit tests cannot easily mock SparkSession's catalog manager
+ // See CatalogManagedSnapshotManagerTest for integration tests.
+}