From f915cd58b79cae72cdcc74fe00551268abb0deac Mon Sep 17 00:00:00 2001 From: Timothy Wang Date: Wed, 12 Nov 2025 12:15:38 -0800 Subject: [PATCH 01/15] update methods and testS Signed-off-by: Timothy Wang --- .../kernel/spark/utils/delta-uc.code-workspace | 15 +++++++++++++++ 1 file changed, 15 insertions(+) create mode 100644 kernel-spark/src/main/java/io/delta/kernel/spark/utils/delta-uc.code-workspace diff --git a/kernel-spark/src/main/java/io/delta/kernel/spark/utils/delta-uc.code-workspace b/kernel-spark/src/main/java/io/delta/kernel/spark/utils/delta-uc.code-workspace new file mode 100644 index 00000000000..21426cccaaa --- /dev/null +++ b/kernel-spark/src/main/java/io/delta/kernel/spark/utils/delta-uc.code-workspace @@ -0,0 +1,15 @@ +{ + "folders": [ + { + "path": "../../../../../../../../.." + }, + { + "path": "../../../../../../../../../../unitycatalog" + } + ], + "settings": { + "files.watcherExclude": { + "**/target": true + } + } +} \ No newline at end of file From 2781acfa43845713720f98b3bef43a122810c3ba Mon Sep 17 00:00:00 2001 From: Timothy Wang Date: Wed, 12 Nov 2025 12:16:29 -0800 Subject: [PATCH 02/15] delete workspace Signed-off-by: Timothy Wang --- .../kernel/spark/utils/delta-uc.code-workspace | 18 +++++------------- 1 file changed, 5 insertions(+), 13 deletions(-) diff --git a/kernel-spark/src/main/java/io/delta/kernel/spark/utils/delta-uc.code-workspace b/kernel-spark/src/main/java/io/delta/kernel/spark/utils/delta-uc.code-workspace index 21426cccaaa..ebd79b0a59d 100644 --- a/kernel-spark/src/main/java/io/delta/kernel/spark/utils/delta-uc.code-workspace +++ b/kernel-spark/src/main/java/io/delta/kernel/spark/utils/delta-uc.code-workspace @@ -1,15 +1,7 @@ { - "folders": [ - { - "path": "../../../../../../../../.." - }, - { - "path": "../../../../../../../../../../unitycatalog" - } - ], - "settings": { - "files.watcherExclude": { - "**/target": true - } - } + "settings": { + "files.watcherExclude": { + "**/target": true + } + } } \ No newline at end of file From ff4b2e566ffb75341be0e441a1e1a8770768da8a Mon Sep 17 00:00:00 2001 From: Timothy Wang Date: Wed, 12 Nov 2025 12:16:52 -0800 Subject: [PATCH 03/15] delete Signed-off-by: Timothy Wang --- .../io/delta/kernel/spark/utils/delta-uc.code-workspace | 7 ------- 1 file changed, 7 deletions(-) delete mode 100644 kernel-spark/src/main/java/io/delta/kernel/spark/utils/delta-uc.code-workspace diff --git a/kernel-spark/src/main/java/io/delta/kernel/spark/utils/delta-uc.code-workspace b/kernel-spark/src/main/java/io/delta/kernel/spark/utils/delta-uc.code-workspace deleted file mode 100644 index ebd79b0a59d..00000000000 --- a/kernel-spark/src/main/java/io/delta/kernel/spark/utils/delta-uc.code-workspace +++ /dev/null @@ -1,7 +0,0 @@ -{ - "settings": { - "files.watcherExclude": { - "**/target": true - } - } -} \ No newline at end of file From 9a6785bb1dcb78a56260b09eb217f9b3a3ad7610 Mon Sep 17 00:00:00 2001 From: TimothyW553 Date: Thu, 11 Dec 2025 00:42:02 +0000 Subject: [PATCH 04/15] implementation --- .../UCManagedTableSnapshotManager.java | 64 +++++++++++++++---- 1 file changed, 53 insertions(+), 11 deletions(-) diff --git a/kernel-spark/src/main/java/io/delta/kernel/spark/snapshot/unitycatalog/UCManagedTableSnapshotManager.java b/kernel-spark/src/main/java/io/delta/kernel/spark/snapshot/unitycatalog/UCManagedTableSnapshotManager.java index 6787728fd65..d9dc5688266 100644 --- a/kernel-spark/src/main/java/io/delta/kernel/spark/snapshot/unitycatalog/UCManagedTableSnapshotManager.java +++ b/kernel-spark/src/main/java/io/delta/kernel/spark/snapshot/unitycatalog/UCManagedTableSnapshotManager.java @@ -21,8 +21,11 @@ import io.delta.kernel.Snapshot; import io.delta.kernel.engine.Engine; import io.delta.kernel.internal.DeltaHistoryManager; +import io.delta.kernel.internal.SnapshotImpl; +import io.delta.kernel.spark.exception.VersionNotFoundException; import io.delta.kernel.spark.snapshot.DeltaSnapshotManager; import io.delta.kernel.unitycatalog.UCCatalogManagedClient; +import java.util.ArrayList; import java.util.Optional; /** @@ -57,35 +60,74 @@ public UCManagedTableSnapshotManager( @Override public Snapshot loadLatestSnapshot() { - throw new UnsupportedOperationException( - "UCManagedTableSnapshotManager.loadLatestSnapshot is not yet implemented"); + return ucCatalogManagedClient.loadSnapshot( + engine, + tableId, + tablePath, + Optional.empty() /* versionOpt */, + Optional.empty() /* timestampOpt */); } @Override public Snapshot loadSnapshotAt(long version) { - throw new UnsupportedOperationException( - "UCManagedTableSnapshotManager.loadSnapshotAt is not yet implemented"); + return ucCatalogManagedClient.loadSnapshot( + engine, tableId, tablePath, Optional.of(version), Optional.empty() /* timestampOpt */); } + /** + * Finds the active commit at a specific timestamp. + * + *

For UC-managed tables, this loads the latest snapshot and uses {@link + * DeltaHistoryManager#getActiveCommitAtTimestamp} to resolve the timestamp to a commit. + */ @Override public DeltaHistoryManager.Commit getActiveCommitAtTime( long timestampMillis, boolean canReturnLastCommit, boolean mustBeRecreatable, boolean canReturnEarliestCommit) { - throw new UnsupportedOperationException( - "UCManagedTableSnapshotManager.getActiveCommitAtTime is not yet implemented"); + SnapshotImpl snapshot = (SnapshotImpl) loadLatestSnapshot(); + return DeltaHistoryManager.getActiveCommitAtTimestamp( + engine, + snapshot, + snapshot.getLogPath(), + timestampMillis, + mustBeRecreatable, + canReturnLastCommit, + canReturnEarliestCommit, + new ArrayList<>() /* catalogCommits */); } + /** + * Checks if a specific version exists and is accessible. + * + *

For UC-managed tables, all ratified commits are available, so the earliest version is + * typically 0. This method validates that the requested version is within the valid range. + */ @Override - public void checkVersionExists(long version, boolean mustBeRecreatable, boolean allowOutOfRange) { - throw new UnsupportedOperationException( - "UCManagedTableSnapshotManager.checkVersionExists is not yet implemented"); + public void checkVersionExists(long version, boolean mustBeRecreatable, boolean allowOutOfRange) + throws VersionNotFoundException { + // Load latest to get the current version bounds + Snapshot latestSnapshot = loadLatestSnapshot(); + long latestVersion = latestSnapshot.getVersion(); + + // For UC tables, earliest recreatable version is 0 (all ratified commits are available) + long earliestVersion = 0; + + if (version < earliestVersion || ((version > latestVersion) && !allowOutOfRange)) { + throw new VersionNotFoundException(version, earliestVersion, latestVersion); + } } @Override public CommitRange getTableChanges(Engine engine, long startVersion, Optional endVersion) { - throw new UnsupportedOperationException( - "UCManagedTableSnapshotManager.getTableChanges is not yet implemented"); + return ucCatalogManagedClient.loadCommitRange( + engine, + tableId, + tablePath, + Optional.of(startVersion) /* startVersionOpt */, + Optional.empty() /* startTimestampOpt */, + endVersion /* endVersionOpt */, + Optional.empty() /* endTimestampOpt */); } } From e5d89e5a8486bdbf727a7a01f9f10f780ae52c27 Mon Sep 17 00:00:00 2001 From: TimothyW553 Date: Thu, 11 Dec 2025 04:25:12 +0000 Subject: [PATCH 05/15] Add catalogCommits to getActiveCommitAtTime --- .../unitycatalog/UCManagedTableSnapshotManager.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/kernel-spark/src/main/java/io/delta/kernel/spark/snapshot/unitycatalog/UCManagedTableSnapshotManager.java b/kernel-spark/src/main/java/io/delta/kernel/spark/snapshot/unitycatalog/UCManagedTableSnapshotManager.java index d9dc5688266..75546600ea6 100644 --- a/kernel-spark/src/main/java/io/delta/kernel/spark/snapshot/unitycatalog/UCManagedTableSnapshotManager.java +++ b/kernel-spark/src/main/java/io/delta/kernel/spark/snapshot/unitycatalog/UCManagedTableSnapshotManager.java @@ -22,10 +22,11 @@ import io.delta.kernel.engine.Engine; import io.delta.kernel.internal.DeltaHistoryManager; import io.delta.kernel.internal.SnapshotImpl; +import io.delta.kernel.internal.data.ParsedCatalogCommitData; import io.delta.kernel.spark.exception.VersionNotFoundException; import io.delta.kernel.spark.snapshot.DeltaSnapshotManager; import io.delta.kernel.unitycatalog.UCCatalogManagedClient; -import java.util.ArrayList; +import java.util.List; import java.util.Optional; /** @@ -87,6 +88,7 @@ public DeltaHistoryManager.Commit getActiveCommitAtTime( boolean mustBeRecreatable, boolean canReturnEarliestCommit) { SnapshotImpl snapshot = (SnapshotImpl) loadLatestSnapshot(); + List catalogCommits = snapshot.getLogSegment().getAllCatalogCommits(); return DeltaHistoryManager.getActiveCommitAtTimestamp( engine, snapshot, @@ -95,7 +97,7 @@ public DeltaHistoryManager.Commit getActiveCommitAtTime( mustBeRecreatable, canReturnLastCommit, canReturnEarliestCommit, - new ArrayList<>() /* catalogCommits */); + catalogCommits); } /** From f7fca7e54a32ea801f390bf14e6ba528ea3f5ab3 Mon Sep 17 00:00:00 2001 From: TimothyW553 Date: Thu, 11 Dec 2025 05:52:45 +0000 Subject: [PATCH 06/15] fix import --- .../snapshot/unitycatalog/UCManagedTableSnapshotManager.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kernel-spark/src/main/java/io/delta/kernel/spark/snapshot/unitycatalog/UCManagedTableSnapshotManager.java b/kernel-spark/src/main/java/io/delta/kernel/spark/snapshot/unitycatalog/UCManagedTableSnapshotManager.java index 75546600ea6..69fa18b44ce 100644 --- a/kernel-spark/src/main/java/io/delta/kernel/spark/snapshot/unitycatalog/UCManagedTableSnapshotManager.java +++ b/kernel-spark/src/main/java/io/delta/kernel/spark/snapshot/unitycatalog/UCManagedTableSnapshotManager.java @@ -22,7 +22,7 @@ import io.delta.kernel.engine.Engine; import io.delta.kernel.internal.DeltaHistoryManager; import io.delta.kernel.internal.SnapshotImpl; -import io.delta.kernel.internal.data.ParsedCatalogCommitData; +import io.delta.kernel.internal.files.ParsedCatalogCommitData; import io.delta.kernel.spark.exception.VersionNotFoundException; import io.delta.kernel.spark.snapshot.DeltaSnapshotManager; import io.delta.kernel.unitycatalog.UCCatalogManagedClient; From c270aadc00855bb6d7bfa88efc346bd9885471fd Mon Sep 17 00:00:00 2001 From: TimothyW553 Date: Thu, 11 Dec 2025 06:16:43 +0000 Subject: [PATCH 07/15] integration tests --- .../UCManagedTableSnapshotManagerTest.java | 100 ++++ .../UCManagedTableSnapshotManagerSuite.scala | 532 ++++++++++++++++++ 2 files changed, 632 insertions(+) create mode 100644 kernel-spark/src/test/java/io/delta/kernel/spark/snapshot/unitycatalog/UCManagedTableSnapshotManagerTest.java create mode 100644 kernel-spark/src/test/scala/io/delta/kernel/spark/snapshot/unitycatalog/UCManagedTableSnapshotManagerSuite.scala diff --git a/kernel-spark/src/test/java/io/delta/kernel/spark/snapshot/unitycatalog/UCManagedTableSnapshotManagerTest.java b/kernel-spark/src/test/java/io/delta/kernel/spark/snapshot/unitycatalog/UCManagedTableSnapshotManagerTest.java new file mode 100644 index 00000000000..b01f4b3e411 --- /dev/null +++ b/kernel-spark/src/test/java/io/delta/kernel/spark/snapshot/unitycatalog/UCManagedTableSnapshotManagerTest.java @@ -0,0 +1,100 @@ +/* + * Copyright (2025) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.spark.snapshot.unitycatalog; + +import static org.junit.jupiter.api.Assertions.*; + +import io.delta.kernel.defaults.engine.DefaultEngine; +import io.delta.kernel.engine.Engine; +import io.delta.kernel.unitycatalog.InMemoryUCClient; +import io.delta.kernel.unitycatalog.UCCatalogManagedClient; +import org.apache.hadoop.conf.Configuration; +import org.junit.jupiter.api.Test; + +/** + * Unit tests for {@link UCManagedTableSnapshotManager}. + * + *

These tests focus on constructor validation and basic instantiation. Integration tests for + * actual functionality are in {@code UCManagedTableSnapshotManagerSuite}. + */ +public class UCManagedTableSnapshotManagerTest { + + private static final String TEST_TABLE_ID = "test_uc_table_id"; + private static final String TEST_TABLE_PATH = "/test/path/to/table"; + private static final String TEST_UC_URI = "https://uc.example.com"; + private static final String TEST_UC_TOKEN = "test_token"; + + private UCTableInfo createTestTableInfo() { + return new UCTableInfo(TEST_TABLE_ID, TEST_TABLE_PATH, TEST_UC_URI, TEST_UC_TOKEN); + } + + private UCCatalogManagedClient createTestClient() { + InMemoryUCClient ucClient = new InMemoryUCClient("test_metastore"); + return new UCCatalogManagedClient(ucClient); + } + + private Engine createTestEngine() { + return DefaultEngine.create(new Configuration()); + } + + @Test + public void testConstructor_NullClient_ThrowsNPE() { + UCTableInfo tableInfo = createTestTableInfo(); + Engine engine = createTestEngine(); + + NullPointerException exception = + assertThrows( + NullPointerException.class, + () -> new UCManagedTableSnapshotManager(null, tableInfo, engine)); + assertEquals("ucCatalogManagedClient is null", exception.getMessage()); + } + + @Test + public void testConstructor_NullTableInfo_ThrowsNPE() { + UCCatalogManagedClient client = createTestClient(); + Engine engine = createTestEngine(); + + NullPointerException exception = + assertThrows( + NullPointerException.class, + () -> new UCManagedTableSnapshotManager(client, null, engine)); + assertEquals("tableInfo is null", exception.getMessage()); + } + + @Test + public void testConstructor_NullEngine_ThrowsNPE() { + UCCatalogManagedClient client = createTestClient(); + UCTableInfo tableInfo = createTestTableInfo(); + + NullPointerException exception = + assertThrows( + NullPointerException.class, + () -> new UCManagedTableSnapshotManager(client, tableInfo, null)); + assertEquals("engine is null", exception.getMessage()); + } + + @Test + public void testConstructor_ValidInputs_CreatesManager() { + UCCatalogManagedClient client = createTestClient(); + UCTableInfo tableInfo = createTestTableInfo(); + Engine engine = createTestEngine(); + + UCManagedTableSnapshotManager manager = + new UCManagedTableSnapshotManager(client, tableInfo, engine); + + assertNotNull(manager); + } +} diff --git a/kernel-spark/src/test/scala/io/delta/kernel/spark/snapshot/unitycatalog/UCManagedTableSnapshotManagerSuite.scala b/kernel-spark/src/test/scala/io/delta/kernel/spark/snapshot/unitycatalog/UCManagedTableSnapshotManagerSuite.scala new file mode 100644 index 00000000000..7dd595fde74 --- /dev/null +++ b/kernel-spark/src/test/scala/io/delta/kernel/spark/snapshot/unitycatalog/UCManagedTableSnapshotManagerSuite.scala @@ -0,0 +1,532 @@ +/* + * Copyright (2025) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.spark.snapshot.unitycatalog + +import java.util.Optional + +import io.delta.kernel.{CommitActions, CommitRangeBuilder, Operation, ScanBuilder} +import io.delta.kernel.{CommitRange, Snapshot} +import io.delta.kernel.data.ColumnarBatch +import io.delta.kernel.engine.Engine +import io.delta.kernel.internal.DeltaLogActionUtils +import io.delta.kernel.spark.exception.VersionNotFoundException +import io.delta.kernel.statistics.SnapshotStatistics +import io.delta.kernel.transaction.UpdateTableTransactionBuilder +import io.delta.kernel.types.StructType +import io.delta.kernel.unitycatalog.InMemoryUCClient +import io.delta.kernel.unitycatalog.UCCatalogManagedClient +import io.delta.kernel.utils.CloseableIterator + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.test.SharedSparkSession + +/** + * Integration tests for [[UCManagedTableSnapshotManager]]. + * + * These tests use mock implementations of [[UCCatalogManagedClient]], [[Snapshot]], and + * [[CommitRange]] to verify the behavior of all [[DeltaSnapshotManager]] interface methods. + */ +class UCManagedTableSnapshotManagerSuite extends SparkFunSuite with SharedSparkSession { + + // Test constants + private val TEST_TABLE_ID = "test_uc_table_id" + private val TEST_TABLE_PATH = "/test/path/to/table" + private val TEST_UC_URI = "https://uc.example.com" + private val TEST_UC_TOKEN = "test_token" + private val TEST_LATEST_VERSION = 10L + + // ==================== Mock Classes ==================== + + /** + * Mock Snapshot implementation that returns a configured version. + */ + class MockSnapshot(version: Long, path: String = TEST_TABLE_PATH) extends Snapshot { + override def getPath: String = path + override def getVersion: Long = version + override def getPartitionColumnNames: java.util.List[String] = java.util.Collections.emptyList() + override def getTimestamp(engine: Engine): Long = System.currentTimeMillis() + override def getSchema: StructType = new StructType() + override def getDomainMetadata(domain: String): Optional[String] = Optional.empty() + override def getTableProperties: java.util.Map[String, String] = + java.util.Collections.emptyMap() + override def getStatistics: SnapshotStatistics = + throw new UnsupportedOperationException("Not implemented in mock") + override def getScanBuilder: ScanBuilder = + throw new UnsupportedOperationException("Not implemented in mock") + override def buildUpdateTableTransaction( + engineInfo: String, + operation: Operation): UpdateTableTransactionBuilder = + throw new UnsupportedOperationException("Not implemented in mock") + override def publish(engine: Engine): Unit = {} + override def writeChecksum(engine: Engine, mode: Snapshot.ChecksumWriteMode): Unit = {} + } + + /** + * Mock CommitRange implementation that returns configured start/end versions. + */ + class MockCommitRange(startVersion: Long, endVersion: Long) extends CommitRange { + override def getStartVersion: Long = startVersion + override def getEndVersion: Long = endVersion + override def getQueryStartBoundary: CommitRangeBuilder.CommitBoundary = + CommitRangeBuilder.CommitBoundary.atVersion(startVersion) + override def getQueryEndBoundary: Optional[CommitRangeBuilder.CommitBoundary] = + Optional.of(CommitRangeBuilder.CommitBoundary.atVersion(endVersion)) + override def getActions( + engine: Engine, + startSnapshot: Snapshot, + actionSet: java.util.Set[DeltaLogActionUtils.DeltaAction]) + : CloseableIterator[ColumnarBatch] = + throw new UnsupportedOperationException("Not implemented in mock") + override def getCommitActions( + engine: Engine, + startSnapshot: Snapshot, + actionSet: java.util.Set[DeltaLogActionUtils.DeltaAction]) + : CloseableIterator[CommitActions] = + throw new UnsupportedOperationException("Not implemented in mock") + } + + /** + * Test UCCatalogManagedClient that returns controlled snapshots and commit ranges. + * + * Key design: The mock returns DIFFERENT values based on input to ensure tests catch + * incorrect parameter passing. If version is empty, returns latestVersion; if version is + * specified, returns that exact version. This means tests will fail if: + * - loadLatestSnapshot passes a specific version instead of empty + * - loadSnapshotAt passes empty instead of the specific version + * - loadSnapshotAt passes a wrong version number + */ + class TestUCCatalogManagedClient(latestVersion: Long) + extends UCCatalogManagedClient(new InMemoryUCClient("test-metastore")) { + + // Track calls for verification - allows asserting exact parameters + var lastLoadSnapshotVersionOpt: Optional[java.lang.Long] = _ + var lastLoadSnapshotTimestampOpt: Optional[java.lang.Long] = _ + var lastCommitRangeStartVersion: Optional[java.lang.Long] = _ + var lastCommitRangeEndVersion: Optional[java.lang.Long] = _ + var loadSnapshotCallCount: Int = 0 + var loadCommitRangeCallCount: Int = 0 + + override def loadSnapshot( + engine: Engine, + ucTableId: String, + tablePath: String, + versionOpt: Optional[java.lang.Long], + timestampOpt: Optional[java.lang.Long]): Snapshot = { + loadSnapshotCallCount += 1 + lastLoadSnapshotVersionOpt = versionOpt + lastLoadSnapshotTimestampOpt = timestampOpt + + // Return different values based on input to catch incorrect parameter passing + val version = if (versionOpt.isPresent) { + versionOpt.get().longValue() + } else { + latestVersion + } + new MockSnapshot(version, tablePath) + } + + override def loadCommitRange( + engine: Engine, + ucTableId: String, + tablePath: String, + startVersionOpt: Optional[java.lang.Long], + startTimestampOpt: Optional[java.lang.Long], + endVersionOpt: Optional[java.lang.Long], + endTimestampOpt: Optional[java.lang.Long]): CommitRange = { + loadCommitRangeCallCount += 1 + lastCommitRangeStartVersion = startVersionOpt + lastCommitRangeEndVersion = endVersionOpt + + val startVersion = startVersionOpt.orElse(0L).longValue() + val endVersion = endVersionOpt.orElse(latestVersion).longValue() + new MockCommitRange(startVersion, endVersion) + } + } + + // ==================== Helper Methods ==================== + + private def createTestTableInfo(): UCTableInfo = { + new UCTableInfo(TEST_TABLE_ID, TEST_TABLE_PATH, TEST_UC_URI, TEST_UC_TOKEN) + } + + private def createTestEngine(): Engine = { + io.delta.kernel.defaults.engine.DefaultEngine.create( + spark.sessionState.newHadoopConf()) + } + + private def createManager( + client: UCCatalogManagedClient, + engine: Engine = createTestEngine()): UCManagedTableSnapshotManager = { + new UCManagedTableSnapshotManager(client, createTestTableInfo(), engine) + } + + // ==================== loadLatestSnapshot Tests ==================== + + test("loadLatestSnapshot returns snapshot at latest version") { + val client = new TestUCCatalogManagedClient(TEST_LATEST_VERSION) + val manager = createManager(client) + + val snapshot = manager.loadLatestSnapshot() + + // The mock returns latestVersion only when versionOpt is empty. + // If impl passed Optional.of(someOtherVersion), we'd get that version back. + assert(snapshot.getVersion == TEST_LATEST_VERSION) + } + + test("loadLatestSnapshot calls client with empty version and timestamp") { + val client = new TestUCCatalogManagedClient(TEST_LATEST_VERSION) + val manager = createManager(client) + + manager.loadLatestSnapshot() + + // Verify exact parameters - this catches bugs where impl passes wrong values + assert( + !client.lastLoadSnapshotVersionOpt.isPresent, + "loadLatestSnapshot should pass empty versionOpt, not a specific version") + assert( + !client.lastLoadSnapshotTimestampOpt.isPresent, + "loadLatestSnapshot should pass empty timestampOpt") + assert(client.loadSnapshotCallCount == 1, "loadSnapshot should be called exactly once") + } + + test("loadLatestSnapshot would return wrong version if wrong parameters passed") { + // This test verifies our mock behavior is correct - it returns different values + // based on input, which is crucial for valid testing + val client = new TestUCCatalogManagedClient(TEST_LATEST_VERSION) + + // When version is specified, mock returns that version (not latest) + val snapshotAt5 = client.loadSnapshot( + createTestEngine(), + TEST_TABLE_ID, + TEST_TABLE_PATH, + Optional.of(5L: java.lang.Long), + Optional.empty()) + assert(snapshotAt5.getVersion == 5L, "Mock should return requested version") + + // When version is empty, mock returns latest + val snapshotLatest = client.loadSnapshot( + createTestEngine(), + TEST_TABLE_ID, + TEST_TABLE_PATH, + Optional.empty(), + Optional.empty()) + assert(snapshotLatest.getVersion == TEST_LATEST_VERSION, "Mock should return latest when empty") + } + + // ==================== loadSnapshotAt Tests ==================== + + test("loadSnapshotAt returns snapshot at specific version") { + val client = new TestUCCatalogManagedClient(TEST_LATEST_VERSION) + val manager = createManager(client) + + val snapshot = manager.loadSnapshotAt(5L) + + assert(snapshot.getVersion == 5L) + } + + test("loadSnapshotAt at version zero returns correct snapshot") { + val client = new TestUCCatalogManagedClient(TEST_LATEST_VERSION) + val manager = createManager(client) + + val snapshot = manager.loadSnapshotAt(0L) + + assert(snapshot.getVersion == 0L) + } + + test("loadSnapshotAt passes version correctly to client") { + val client = new TestUCCatalogManagedClient(TEST_LATEST_VERSION) + val manager = createManager(client) + + manager.loadSnapshotAt(7L) + + assert(client.lastLoadSnapshotVersionOpt.isPresent) + assert(client.lastLoadSnapshotVersionOpt.get() == 7L) + assert(!client.lastLoadSnapshotTimestampOpt.isPresent) + } + + // ==================== checkVersionExists Tests ==================== + + test("checkVersionExists: version within valid range does not throw") { + val client = new TestUCCatalogManagedClient(TEST_LATEST_VERSION) + val manager = createManager(client) + + // Should not throw - version 5 is within [0, 10] + manager.checkVersionExists(5L, true /* mustBeRecreatable */, false /* allowOutOfRange */ ) + + // Verify the implementation actually called loadLatestSnapshot to get bounds + assert( + client.loadSnapshotCallCount == 1, + "checkVersionExists should call loadLatestSnapshot to determine version bounds") + } + + test("checkVersionExists: version at boundary zero does not throw") { + val client = new TestUCCatalogManagedClient(TEST_LATEST_VERSION) + val manager = createManager(client) + + // Version 0 is the earliest version for UC tables (always 0) + manager.checkVersionExists(0L, true /* mustBeRecreatable */, false /* allowOutOfRange */ ) + + // Verify the check was actually performed + assert(client.loadSnapshotCallCount == 1) + } + + test("checkVersionExists: version at boundary latest does not throw") { + val client = new TestUCCatalogManagedClient(TEST_LATEST_VERSION) + val manager = createManager(client) + + // Version 10 (TEST_LATEST_VERSION) should be valid + manager.checkVersionExists( + TEST_LATEST_VERSION, + true /* mustBeRecreatable */, + false /* allowOutOfRange */ ) + + // Verify the check was actually performed + assert(client.loadSnapshotCallCount == 1) + } + + test("checkVersionExists: version below zero throws VersionNotFoundException") { + val client = new TestUCCatalogManagedClient(TEST_LATEST_VERSION) + val manager = createManager(client) + + val exception = intercept[VersionNotFoundException] { + manager.checkVersionExists(-1L, true /* mustBeRecreatable */, false /* allowOutOfRange */ ) + } + assert(exception.getUserVersion == -1L) + assert(exception.getEarliest == 0L) + assert(exception.getLatest == TEST_LATEST_VERSION) + } + + test("checkVersionExists: version above latest with allowOutOfRange=false throws") { + val client = new TestUCCatalogManagedClient(TEST_LATEST_VERSION) + val manager = createManager(client) + + val exception = intercept[VersionNotFoundException] { + manager.checkVersionExists(15L, true /* mustBeRecreatable */, false /* allowOutOfRange */ ) + } + assert(exception.getUserVersion == 15L) + assert(exception.getEarliest == 0L) + assert(exception.getLatest == TEST_LATEST_VERSION) + } + + test("checkVersionExists: version above latest with allowOutOfRange=true does not throw") { + val client = new TestUCCatalogManagedClient(TEST_LATEST_VERSION) + val manager = createManager(client) + + // Should not throw + manager.checkVersionExists(15L, true /* mustBeRecreatable */, true /* allowOutOfRange */ ) + } + + // ==================== getTableChanges Tests ==================== + + test("getTableChanges with explicit end version returns correct commit range") { + val client = new TestUCCatalogManagedClient(TEST_LATEST_VERSION) + val engine = createTestEngine() + val manager = createManager(client, engine) + + val commitRange = manager.getTableChanges(engine, 3L, Optional.of(7L)) + + // Mock returns the exact start/end versions passed, so this verifies correct parameter passing + assert( + commitRange.getStartVersion == 3L, + "Start version should match what was passed to getTableChanges") + assert( + commitRange.getEndVersion == 7L, + "End version should match what was passed to getTableChanges") + assert(client.loadCommitRangeCallCount == 1, "loadCommitRange should be called exactly once") + } + + test("getTableChanges without end version returns range to latest") { + val client = new TestUCCatalogManagedClient(TEST_LATEST_VERSION) + val engine = createTestEngine() + val manager = createManager(client, engine) + + val commitRange = manager.getTableChanges(engine, 5L, Optional.empty()) + + // When endVersion is empty, mock returns latestVersion + assert(commitRange.getStartVersion == 5L) + assert( + commitRange.getEndVersion == TEST_LATEST_VERSION, + "When endVersion is empty, should extend to latest") + } + + test("getTableChanges passes parameters correctly to loadCommitRange") { + val client = new TestUCCatalogManagedClient(TEST_LATEST_VERSION) + val engine = createTestEngine() + val manager = createManager(client, engine) + + manager.getTableChanges(engine, 2L, Optional.of(8L)) + + // Verify exact parameters passed to underlying client + assert(client.lastCommitRangeStartVersion.isPresent, "startVersionOpt should be present") + assert( + client.lastCommitRangeStartVersion.get() == 2L, + s"startVersion should be 2, got ${client.lastCommitRangeStartVersion.get()}") + assert( + client.lastCommitRangeEndVersion.isPresent, + "endVersionOpt should be present when explicit end provided") + assert( + client.lastCommitRangeEndVersion.get() == 8L, + s"endVersion should be 8, got ${client.lastCommitRangeEndVersion.get()}") + } + + test("getTableChanges with empty end version passes empty to loadCommitRange") { + val client = new TestUCCatalogManagedClient(TEST_LATEST_VERSION) + val engine = createTestEngine() + val manager = createManager(client, engine) + + manager.getTableChanges(engine, 1L, Optional.empty()) + + assert(client.lastCommitRangeStartVersion.isPresent) + assert(client.lastCommitRangeStartVersion.get() == 1L) + assert( + !client.lastCommitRangeEndVersion.isPresent, + "When endVersion is Optional.empty, should pass empty to client") + } + + // ==================== Additional Edge Case Tests (from adversarial review) ==================== + + test( + "checkVersionExists: mustBeRecreatable flag is accepted but behavior unchanged for UC tables") { + // For UC-managed tables, all ratified commits are available (earliest = 0) + // mustBeRecreatable doesn't change behavior since UC guarantees all versions are recreatable + val client = new TestUCCatalogManagedClient(TEST_LATEST_VERSION) + val manager = createManager(client) + + // Both should succeed - mustBeRecreatable doesn't affect UC tables + manager.checkVersionExists(5L, true /* mustBeRecreatable */, false /* allowOutOfRange */ ) + manager.checkVersionExists(5L, false /* mustBeRecreatable */, false /* allowOutOfRange */ ) + } + + test("checkVersionExists: Long.MAX_VALUE with allowOutOfRange=false throws") { + val client = new TestUCCatalogManagedClient(TEST_LATEST_VERSION) + val manager = createManager(client) + + val exception = intercept[VersionNotFoundException] { + manager.checkVersionExists( + Long.MaxValue, + true /* mustBeRecreatable */, + false /* allowOutOfRange */ ) + } + assert(exception.getUserVersion == Long.MaxValue) + } + + test("checkVersionExists: Long.MAX_VALUE with allowOutOfRange=true does not throw") { + val client = new TestUCCatalogManagedClient(TEST_LATEST_VERSION) + val manager = createManager(client) + + // Should not throw + manager.checkVersionExists( + Long.MaxValue, + true /* mustBeRecreatable */, + true /* allowOutOfRange */ ) + } + + test("getTableChanges with start version 0 returns range from beginning") { + val client = new TestUCCatalogManagedClient(TEST_LATEST_VERSION) + val engine = createTestEngine() + val manager = createManager(client, engine) + + val commitRange = manager.getTableChanges(engine, 0L, Optional.empty()) + + assert(commitRange.getStartVersion == 0L) + assert(commitRange.getEndVersion == TEST_LATEST_VERSION) + } + + test("getTableChanges with start equals end returns single-version range") { + val client = new TestUCCatalogManagedClient(TEST_LATEST_VERSION) + val engine = createTestEngine() + val manager = createManager(client, engine) + + val commitRange = manager.getTableChanges(engine, 5L, Optional.of(5L)) + + assert(commitRange.getStartVersion == 5L) + assert(commitRange.getEndVersion == 5L) + } + + // ==================== Exception Propagation Tests ==================== + + /** + * Client that throws exceptions to test error handling. + */ + class ThrowingUCCatalogManagedClient(exceptionToThrow: RuntimeException) + extends UCCatalogManagedClient(new InMemoryUCClient("test-metastore")) { + + override def loadSnapshot( + engine: Engine, + ucTableId: String, + tablePath: String, + versionOpt: Optional[java.lang.Long], + timestampOpt: Optional[java.lang.Long]): Snapshot = { + throw exceptionToThrow + } + + override def loadCommitRange( + engine: Engine, + ucTableId: String, + tablePath: String, + startVersionOpt: Optional[java.lang.Long], + startTimestampOpt: Optional[java.lang.Long], + endVersionOpt: Optional[java.lang.Long], + endTimestampOpt: Optional[java.lang.Long]): CommitRange = { + throw exceptionToThrow + } + } + + test("loadLatestSnapshot propagates exceptions from client") { + val expectedException = new RuntimeException("UC connection failed") + val client = new ThrowingUCCatalogManagedClient(expectedException) + val manager = createManager(client) + + val thrown = intercept[RuntimeException] { + manager.loadLatestSnapshot() + } + assert(thrown eq expectedException) + } + + test("loadSnapshotAt propagates exceptions from client") { + val expectedException = new IllegalArgumentException("Invalid version") + val client = new ThrowingUCCatalogManagedClient(expectedException) + val manager = createManager(client) + + val thrown = intercept[IllegalArgumentException] { + manager.loadSnapshotAt(5L) + } + assert(thrown eq expectedException) + } + + test("checkVersionExists propagates exceptions from loadLatestSnapshot") { + val expectedException = new RuntimeException("Network error") + val client = new ThrowingUCCatalogManagedClient(expectedException) + val manager = createManager(client) + + val thrown = intercept[RuntimeException] { + manager.checkVersionExists(5L, true /* mustBeRecreatable */, false /* allowOutOfRange */ ) + } + assert(thrown eq expectedException) + } + + test("getTableChanges propagates exceptions from client") { + val expectedException = new RuntimeException("Commit range not available") + val client = new ThrowingUCCatalogManagedClient(expectedException) + val engine = createTestEngine() + val manager = createManager(client, engine) + + val thrown = intercept[RuntimeException] { + manager.getTableChanges(engine, 1L, Optional.of(5L)) + } + assert(thrown eq expectedException) + } +} From f1c5aa844ca1a9f46ce10a5a5d63be30169f178a Mon Sep 17 00:00:00 2001 From: TimothyW553 Date: Thu, 11 Dec 2025 06:16:53 +0000 Subject: [PATCH 08/15] integration tests From 458fdf00b5e6a6860ed110e28a4baab74c60861b Mon Sep 17 00:00:00 2001 From: TimothyW553 Date: Thu, 11 Dec 2025 09:34:58 +0000 Subject: [PATCH 09/15] Update tests --- .../UCManagedTableSnapshotManagerSuite.scala | 671 +++++++----------- .../snapshot/unitycatalog/UCUtilsSuite.scala | 4 +- 2 files changed, 270 insertions(+), 405 deletions(-) diff --git a/kernel-spark/src/test/scala/io/delta/kernel/spark/snapshot/unitycatalog/UCManagedTableSnapshotManagerSuite.scala b/kernel-spark/src/test/scala/io/delta/kernel/spark/snapshot/unitycatalog/UCManagedTableSnapshotManagerSuite.scala index 7dd595fde74..26d12d6f3a1 100644 --- a/kernel-spark/src/test/scala/io/delta/kernel/spark/snapshot/unitycatalog/UCManagedTableSnapshotManagerSuite.scala +++ b/kernel-spark/src/test/scala/io/delta/kernel/spark/snapshot/unitycatalog/UCManagedTableSnapshotManagerSuite.scala @@ -17,516 +17,379 @@ package io.delta.kernel.spark.snapshot.unitycatalog import java.util.Optional -import io.delta.kernel.{CommitActions, CommitRangeBuilder, Operation, ScanBuilder} -import io.delta.kernel.{CommitRange, Snapshot} -import io.delta.kernel.data.ColumnarBatch -import io.delta.kernel.engine.Engine -import io.delta.kernel.internal.DeltaLogActionUtils +import io.delta.kernel.exceptions.KernelException import io.delta.kernel.spark.exception.VersionNotFoundException -import io.delta.kernel.statistics.SnapshotStatistics -import io.delta.kernel.transaction.UpdateTableTransactionBuilder -import io.delta.kernel.types.StructType -import io.delta.kernel.unitycatalog.InMemoryUCClient -import io.delta.kernel.unitycatalog.UCCatalogManagedClient -import io.delta.kernel.utils.CloseableIterator - -import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.test.SharedSparkSession - -/** - * Integration tests for [[UCManagedTableSnapshotManager]]. - * - * These tests use mock implementations of [[UCCatalogManagedClient]], [[Snapshot]], and - * [[CommitRange]] to verify the behavior of all [[DeltaSnapshotManager]] interface methods. - */ -class UCManagedTableSnapshotManagerSuite extends SparkFunSuite with SharedSparkSession { - - // Test constants - private val TEST_TABLE_ID = "test_uc_table_id" - private val TEST_TABLE_PATH = "/test/path/to/table" - private val TEST_UC_URI = "https://uc.example.com" - private val TEST_UC_TOKEN = "test_token" - private val TEST_LATEST_VERSION = 10L - - // ==================== Mock Classes ==================== - - /** - * Mock Snapshot implementation that returns a configured version. - */ - class MockSnapshot(version: Long, path: String = TEST_TABLE_PATH) extends Snapshot { - override def getPath: String = path - override def getVersion: Long = version - override def getPartitionColumnNames: java.util.List[String] = java.util.Collections.emptyList() - override def getTimestamp(engine: Engine): Long = System.currentTimeMillis() - override def getSchema: StructType = new StructType() - override def getDomainMetadata(domain: String): Optional[String] = Optional.empty() - override def getTableProperties: java.util.Map[String, String] = - java.util.Collections.emptyMap() - override def getStatistics: SnapshotStatistics = - throw new UnsupportedOperationException("Not implemented in mock") - override def getScanBuilder: ScanBuilder = - throw new UnsupportedOperationException("Not implemented in mock") - override def buildUpdateTableTransaction( - engineInfo: String, - operation: Operation): UpdateTableTransactionBuilder = - throw new UnsupportedOperationException("Not implemented in mock") - override def publish(engine: Engine): Unit = {} - override def writeChecksum(engine: Engine, mode: Snapshot.ChecksumWriteMode): Unit = {} - } - - /** - * Mock CommitRange implementation that returns configured start/end versions. - */ - class MockCommitRange(startVersion: Long, endVersion: Long) extends CommitRange { - override def getStartVersion: Long = startVersion - override def getEndVersion: Long = endVersion - override def getQueryStartBoundary: CommitRangeBuilder.CommitBoundary = - CommitRangeBuilder.CommitBoundary.atVersion(startVersion) - override def getQueryEndBoundary: Optional[CommitRangeBuilder.CommitBoundary] = - Optional.of(CommitRangeBuilder.CommitBoundary.atVersion(endVersion)) - override def getActions( - engine: Engine, - startSnapshot: Snapshot, - actionSet: java.util.Set[DeltaLogActionUtils.DeltaAction]) - : CloseableIterator[ColumnarBatch] = - throw new UnsupportedOperationException("Not implemented in mock") - override def getCommitActions( - engine: Engine, - startSnapshot: Snapshot, - actionSet: java.util.Set[DeltaLogActionUtils.DeltaAction]) - : CloseableIterator[CommitActions] = - throw new UnsupportedOperationException("Not implemented in mock") - } - - /** - * Test UCCatalogManagedClient that returns controlled snapshots and commit ranges. - * - * Key design: The mock returns DIFFERENT values based on input to ensure tests catch - * incorrect parameter passing. If version is empty, returns latestVersion; if version is - * specified, returns that exact version. This means tests will fail if: - * - loadLatestSnapshot passes a specific version instead of empty - * - loadSnapshotAt passes empty instead of the specific version - * - loadSnapshotAt passes a wrong version number - */ - class TestUCCatalogManagedClient(latestVersion: Long) - extends UCCatalogManagedClient(new InMemoryUCClient("test-metastore")) { - - // Track calls for verification - allows asserting exact parameters - var lastLoadSnapshotVersionOpt: Optional[java.lang.Long] = _ - var lastLoadSnapshotTimestampOpt: Optional[java.lang.Long] = _ - var lastCommitRangeStartVersion: Optional[java.lang.Long] = _ - var lastCommitRangeEndVersion: Optional[java.lang.Long] = _ - var loadSnapshotCallCount: Int = 0 - var loadCommitRangeCallCount: Int = 0 - - override def loadSnapshot( - engine: Engine, - ucTableId: String, - tablePath: String, - versionOpt: Optional[java.lang.Long], - timestampOpt: Optional[java.lang.Long]): Snapshot = { - loadSnapshotCallCount += 1 - lastLoadSnapshotVersionOpt = versionOpt - lastLoadSnapshotTimestampOpt = timestampOpt - - // Return different values based on input to catch incorrect parameter passing - val version = if (versionOpt.isPresent) { - versionOpt.get().longValue() - } else { - latestVersion - } - new MockSnapshot(version, tablePath) - } - - override def loadCommitRange( - engine: Engine, - ucTableId: String, - tablePath: String, - startVersionOpt: Optional[java.lang.Long], - startTimestampOpt: Optional[java.lang.Long], - endVersionOpt: Optional[java.lang.Long], - endTimestampOpt: Optional[java.lang.Long]): CommitRange = { - loadCommitRangeCallCount += 1 - lastCommitRangeStartVersion = startVersionOpt - lastCommitRangeEndVersion = endVersionOpt - - val startVersion = startVersionOpt.orElse(0L).longValue() - val endVersion = endVersionOpt.orElse(latestVersion).longValue() - new MockCommitRange(startVersion, endVersion) - } - } +import io.delta.kernel.unitycatalog.{InMemoryUCClient, UCCatalogManagedClient, UCCatalogManagedTestUtils} +import io.delta.storage.commit.uccommitcoordinator.InvalidTargetTableException - // ==================== Helper Methods ==================== +import org.scalatest.funsuite.AnyFunSuite - private def createTestTableInfo(): UCTableInfo = { - new UCTableInfo(TEST_TABLE_ID, TEST_TABLE_PATH, TEST_UC_URI, TEST_UC_TOKEN) - } +/** Integration tests for [[UCManagedTableSnapshotManager]]. */ +class UCManagedTableSnapshotManagerSuite + extends AnyFunSuite + with UCCatalogManagedTestUtils { - private def createTestEngine(): Engine = { - io.delta.kernel.defaults.engine.DefaultEngine.create( - spark.sessionState.newHadoopConf()) - } + private val testUcTableId = "testUcTableId" + private val testUcUri = "https://test-uc.example.com" + private val testUcToken = "test-token" private def createManager( - client: UCCatalogManagedClient, - engine: Engine = createTestEngine()): UCManagedTableSnapshotManager = { - new UCManagedTableSnapshotManager(client, createTestTableInfo(), engine) + ucClient: InMemoryUCClient, + tablePath: String) = { + val client = new UCCatalogManagedClient(ucClient) + val tableInfo = new UCTableInfo(testUcTableId, tablePath, testUcUri, testUcToken) + new UCManagedTableSnapshotManager(client, tableInfo, defaultEngine) } - // ==================== loadLatestSnapshot Tests ==================== + // ==================== loadLatestSnapshot ==================== - test("loadLatestSnapshot returns snapshot at latest version") { - val client = new TestUCCatalogManagedClient(TEST_LATEST_VERSION) - val manager = createManager(client) + test("loadLatestSnapshot returns snapshot at max ratified version") { + withUCClientAndTestTable { (ucClient, tablePath, maxRatifiedVersion) => + val manager = createManager(ucClient, tablePath) - val snapshot = manager.loadLatestSnapshot() + val snapshot = manager.loadLatestSnapshot() - // The mock returns latestVersion only when versionOpt is empty. - // If impl passed Optional.of(someOtherVersion), we'd get that version back. - assert(snapshot.getVersion == TEST_LATEST_VERSION) + assert(snapshot.getVersion == maxRatifiedVersion) + } } - test("loadLatestSnapshot calls client with empty version and timestamp") { - val client = new TestUCCatalogManagedClient(TEST_LATEST_VERSION) - val manager = createManager(client) - - manager.loadLatestSnapshot() - - // Verify exact parameters - this catches bugs where impl passes wrong values - assert( - !client.lastLoadSnapshotVersionOpt.isPresent, - "loadLatestSnapshot should pass empty versionOpt, not a specific version") - assert( - !client.lastLoadSnapshotTimestampOpt.isPresent, - "loadLatestSnapshot should pass empty timestampOpt") - assert(client.loadSnapshotCallCount == 1, "loadSnapshot should be called exactly once") - } + test("loadLatestSnapshot throws when table does not exist in catalog") { + val ucClient = new InMemoryUCClient("ucMetastoreId") + val tableInfo = new UCTableInfo("nonExistentTableId", "/fake/path", testUcUri, testUcToken) + val client = new UCCatalogManagedClient(ucClient) + val manager = new UCManagedTableSnapshotManager(client, tableInfo, defaultEngine) - test("loadLatestSnapshot would return wrong version if wrong parameters passed") { - // This test verifies our mock behavior is correct - it returns different values - // based on input, which is crucial for valid testing - val client = new TestUCCatalogManagedClient(TEST_LATEST_VERSION) - - // When version is specified, mock returns that version (not latest) - val snapshotAt5 = client.loadSnapshot( - createTestEngine(), - TEST_TABLE_ID, - TEST_TABLE_PATH, - Optional.of(5L: java.lang.Long), - Optional.empty()) - assert(snapshotAt5.getVersion == 5L, "Mock should return requested version") - - // When version is empty, mock returns latest - val snapshotLatest = client.loadSnapshot( - createTestEngine(), - TEST_TABLE_ID, - TEST_TABLE_PATH, - Optional.empty(), - Optional.empty()) - assert(snapshotLatest.getVersion == TEST_LATEST_VERSION, "Mock should return latest when empty") + val ex = intercept[RuntimeException] { + manager.loadLatestSnapshot() + } + assert(ex.getCause.isInstanceOf[InvalidTargetTableException]) } - // ==================== loadSnapshotAt Tests ==================== + // ==================== loadSnapshotAt ==================== - test("loadSnapshotAt returns snapshot at specific version") { - val client = new TestUCCatalogManagedClient(TEST_LATEST_VERSION) - val manager = createManager(client) + test("loadSnapshotAt returns snapshot at specified version") { + withUCClientAndTestTable { (ucClient, tablePath, _) => + val manager = createManager(ucClient, tablePath) - val snapshot = manager.loadSnapshotAt(5L) + val snapshot = manager.loadSnapshotAt(1L) - assert(snapshot.getVersion == 5L) + assert(snapshot.getVersion == 1L) + } } test("loadSnapshotAt at version zero returns correct snapshot") { - val client = new TestUCCatalogManagedClient(TEST_LATEST_VERSION) - val manager = createManager(client) + withUCClientAndTestTable { (ucClient, tablePath, _) => + val manager = createManager(ucClient, tablePath) - val snapshot = manager.loadSnapshotAt(0L) + val snapshot = manager.loadSnapshotAt(0L) - assert(snapshot.getVersion == 0L) + assert(snapshot.getVersion == 0L) + } } - test("loadSnapshotAt passes version correctly to client") { - val client = new TestUCCatalogManagedClient(TEST_LATEST_VERSION) - val manager = createManager(client) + test("loadSnapshotAt with negative version throws IllegalArgumentException") { + withUCClientAndTestTable { (ucClient, tablePath, _) => + val manager = createManager(ucClient, tablePath) - manager.loadSnapshotAt(7L) + val ex = intercept[IllegalArgumentException] { + manager.loadSnapshotAt(-1L) + } + } + } + + test("loadSnapshotAt with version beyond max throws IllegalArgumentException") { + withUCClientAndTestTable { (ucClient, tablePath, maxRatifiedVersion) => + val manager = createManager(ucClient, tablePath) - assert(client.lastLoadSnapshotVersionOpt.isPresent) - assert(client.lastLoadSnapshotVersionOpt.get() == 7L) - assert(!client.lastLoadSnapshotTimestampOpt.isPresent) + val ex = intercept[IllegalArgumentException] { + manager.loadSnapshotAt(maxRatifiedVersion + 10) + } + } } - // ==================== checkVersionExists Tests ==================== + // ==================== checkVersionExists ==================== test("checkVersionExists: version within valid range does not throw") { - val client = new TestUCCatalogManagedClient(TEST_LATEST_VERSION) - val manager = createManager(client) - - // Should not throw - version 5 is within [0, 10] - manager.checkVersionExists(5L, true /* mustBeRecreatable */, false /* allowOutOfRange */ ) + withUCClientAndTestTable { (ucClient, tablePath, maxRatifiedVersion) => + val manager = createManager(ucClient, tablePath) - // Verify the implementation actually called loadLatestSnapshot to get bounds - assert( - client.loadSnapshotCallCount == 1, - "checkVersionExists should call loadLatestSnapshot to determine version bounds") + manager.checkVersionExists( + maxRatifiedVersion - 1, + true /* mustBeRecreatable */, + false /* allowOutOfRange */ ) + } } test("checkVersionExists: version at boundary zero does not throw") { - val client = new TestUCCatalogManagedClient(TEST_LATEST_VERSION) - val manager = createManager(client) - - // Version 0 is the earliest version for UC tables (always 0) - manager.checkVersionExists(0L, true /* mustBeRecreatable */, false /* allowOutOfRange */ ) + withUCClientAndTestTable { (ucClient, tablePath, _) => + val manager = createManager(ucClient, tablePath) - // Verify the check was actually performed - assert(client.loadSnapshotCallCount == 1) + manager.checkVersionExists(0L, true /* mustBeRecreatable */, false /* allowOutOfRange */ ) + } } test("checkVersionExists: version at boundary latest does not throw") { - val client = new TestUCCatalogManagedClient(TEST_LATEST_VERSION) - val manager = createManager(client) - - // Version 10 (TEST_LATEST_VERSION) should be valid - manager.checkVersionExists( - TEST_LATEST_VERSION, - true /* mustBeRecreatable */, - false /* allowOutOfRange */ ) + withUCClientAndTestTable { (ucClient, tablePath, maxRatifiedVersion) => + val manager = createManager(ucClient, tablePath) - // Verify the check was actually performed - assert(client.loadSnapshotCallCount == 1) + manager.checkVersionExists( + maxRatifiedVersion, + true /* mustBeRecreatable */, + false /* allowOutOfRange */ ) + } } test("checkVersionExists: version below zero throws VersionNotFoundException") { - val client = new TestUCCatalogManagedClient(TEST_LATEST_VERSION) - val manager = createManager(client) + withUCClientAndTestTable { (ucClient, tablePath, maxRatifiedVersion) => + val manager = createManager(ucClient, tablePath) - val exception = intercept[VersionNotFoundException] { - manager.checkVersionExists(-1L, true /* mustBeRecreatable */, false /* allowOutOfRange */ ) + val exception = intercept[VersionNotFoundException] { + manager.checkVersionExists(-1L, true /* mustBeRecreatable */, false /* allowOutOfRange */ ) + } + assert(exception.getUserVersion == -1L) + assert(exception.getEarliest == 0L) + assert(exception.getLatest == maxRatifiedVersion) } - assert(exception.getUserVersion == -1L) - assert(exception.getEarliest == 0L) - assert(exception.getLatest == TEST_LATEST_VERSION) } test("checkVersionExists: version above latest with allowOutOfRange=false throws") { - val client = new TestUCCatalogManagedClient(TEST_LATEST_VERSION) - val manager = createManager(client) - - val exception = intercept[VersionNotFoundException] { - manager.checkVersionExists(15L, true /* mustBeRecreatable */, false /* allowOutOfRange */ ) + withUCClientAndTestTable { (ucClient, tablePath, maxRatifiedVersion) => + val manager = createManager(ucClient, tablePath) + + val exception = intercept[VersionNotFoundException] { + manager.checkVersionExists( + maxRatifiedVersion + 10, + true /* mustBeRecreatable */, + false /* allowOutOfRange */ ) + } + assert(exception.getUserVersion == maxRatifiedVersion + 10) + assert(exception.getEarliest == 0L) + assert(exception.getLatest == maxRatifiedVersion) } - assert(exception.getUserVersion == 15L) - assert(exception.getEarliest == 0L) - assert(exception.getLatest == TEST_LATEST_VERSION) } test("checkVersionExists: version above latest with allowOutOfRange=true does not throw") { - val client = new TestUCCatalogManagedClient(TEST_LATEST_VERSION) - val manager = createManager(client) + withUCClientAndTestTable { (ucClient, tablePath, maxRatifiedVersion) => + val manager = createManager(ucClient, tablePath) - // Should not throw - manager.checkVersionExists(15L, true /* mustBeRecreatable */, true /* allowOutOfRange */ ) + manager.checkVersionExists( + maxRatifiedVersion + 10, + true /* mustBeRecreatable */, + true /* allowOutOfRange */ ) + } } - // ==================== getTableChanges Tests ==================== + test("checkVersionExists: mustBeRecreatable flag is accepted (UC tables always recreatable)") { + withUCClientAndTestTable { (ucClient, tablePath, _) => + val manager = createManager(ucClient, tablePath) - test("getTableChanges with explicit end version returns correct commit range") { - val client = new TestUCCatalogManagedClient(TEST_LATEST_VERSION) - val engine = createTestEngine() - val manager = createManager(client, engine) - - val commitRange = manager.getTableChanges(engine, 3L, Optional.of(7L)) - - // Mock returns the exact start/end versions passed, so this verifies correct parameter passing - assert( - commitRange.getStartVersion == 3L, - "Start version should match what was passed to getTableChanges") - assert( - commitRange.getEndVersion == 7L, - "End version should match what was passed to getTableChanges") - assert(client.loadCommitRangeCallCount == 1, "loadCommitRange should be called exactly once") + manager.checkVersionExists(1L, true /* mustBeRecreatable */, false /* allowOutOfRange */ ) + manager.checkVersionExists(1L, false /* mustBeRecreatable */, false /* allowOutOfRange */ ) + } } - test("getTableChanges without end version returns range to latest") { - val client = new TestUCCatalogManagedClient(TEST_LATEST_VERSION) - val engine = createTestEngine() - val manager = createManager(client, engine) + // ==================== getActiveCommitAtTime ==================== - val commitRange = manager.getTableChanges(engine, 5L, Optional.empty()) + test("getActiveCommitAtTime: before earliest without earliest flag throws") { + withUCClientAndTestTable { (ucClient, tablePath, _) => + val manager = createManager(ucClient, tablePath) - // When endVersion is empty, mock returns latestVersion - assert(commitRange.getStartVersion == 5L) - assert( - commitRange.getEndVersion == TEST_LATEST_VERSION, - "When endVersion is empty, should extend to latest") + val ex = intercept[KernelException] { + manager.getActiveCommitAtTime( + v0Ts - 1, + false /* canReturnLastCommit */, + true /* mustBeRecreatable */, + false /* canReturnEarliestCommit */ ) + } + } } - test("getTableChanges passes parameters correctly to loadCommitRange") { - val client = new TestUCCatalogManagedClient(TEST_LATEST_VERSION) - val engine = createTestEngine() - val manager = createManager(client, engine) - - manager.getTableChanges(engine, 2L, Optional.of(8L)) - - // Verify exact parameters passed to underlying client - assert(client.lastCommitRangeStartVersion.isPresent, "startVersionOpt should be present") - assert( - client.lastCommitRangeStartVersion.get() == 2L, - s"startVersion should be 2, got ${client.lastCommitRangeStartVersion.get()}") - assert( - client.lastCommitRangeEndVersion.isPresent, - "endVersionOpt should be present when explicit end provided") - assert( - client.lastCommitRangeEndVersion.get() == 8L, - s"endVersion should be 8, got ${client.lastCommitRangeEndVersion.get()}") + test("getActiveCommitAtTime: before earliest with earliest flag returns v0") { + withUCClientAndTestTable { (ucClient, tablePath, _) => + val manager = createManager(ucClient, tablePath) + + val active = manager.getActiveCommitAtTime( + v0Ts - 1, + false /* canReturnLastCommit */, + true /* mustBeRecreatable */, + true /* canReturnEarliestCommit */ ) + + assert(active.getVersion == 0L) + } } - test("getTableChanges with empty end version passes empty to loadCommitRange") { - val client = new TestUCCatalogManagedClient(TEST_LATEST_VERSION) - val engine = createTestEngine() - val manager = createManager(client, engine) + test("getActiveCommitAtTime: exact boundaries and between commits") { + withUCClientAndTestTable { (ucClient, tablePath, _) => + val manager = createManager(ucClient, tablePath) - manager.getTableChanges(engine, 1L, Optional.empty()) + def activeVersion(ts: Long): Long = { + manager + .getActiveCommitAtTime( + ts, + false /* canReturnLastCommit */, + true /* mustBeRecreatable */, + false /* canReturnEarliestCommit */ ) + .getVersion + } - assert(client.lastCommitRangeStartVersion.isPresent) - assert(client.lastCommitRangeStartVersion.get() == 1L) - assert( - !client.lastCommitRangeEndVersion.isPresent, - "When endVersion is Optional.empty, should pass empty to client") + assert(activeVersion(v0Ts) == 0L) + assert(activeVersion(v0Ts + 1) == 0L) + assert(activeVersion(v1Ts) == 1L) + assert(activeVersion(v1Ts + 1) == 1L) + assert(activeVersion(v2Ts) == 2L) + } } - // ==================== Additional Edge Case Tests (from adversarial review) ==================== - - test( - "checkVersionExists: mustBeRecreatable flag is accepted but behavior unchanged for UC tables") { - // For UC-managed tables, all ratified commits are available (earliest = 0) - // mustBeRecreatable doesn't change behavior since UC guarantees all versions are recreatable - val client = new TestUCCatalogManagedClient(TEST_LATEST_VERSION) - val manager = createManager(client) + test("getActiveCommitAtTime: after latest without last-commit flag throws") { + withUCClientAndTestTable { (ucClient, tablePath, _) => + val manager = createManager(ucClient, tablePath) - // Both should succeed - mustBeRecreatable doesn't affect UC tables - manager.checkVersionExists(5L, true /* mustBeRecreatable */, false /* allowOutOfRange */ ) - manager.checkVersionExists(5L, false /* mustBeRecreatable */, false /* allowOutOfRange */ ) + val ex = intercept[KernelException] { + manager.getActiveCommitAtTime( + v2Ts + 1, + false /* canReturnLastCommit */, + true /* mustBeRecreatable */, + false /* canReturnEarliestCommit */ ) + } + } } - test("checkVersionExists: Long.MAX_VALUE with allowOutOfRange=false throws") { - val client = new TestUCCatalogManagedClient(TEST_LATEST_VERSION) - val manager = createManager(client) + test("getActiveCommitAtTime: after latest with last-commit flag returns last version") { + withUCClientAndTestTable { (ucClient, tablePath, _) => + val manager = createManager(ucClient, tablePath) - val exception = intercept[VersionNotFoundException] { - manager.checkVersionExists( - Long.MaxValue, + val active = manager.getActiveCommitAtTime( + v2Ts + 1, + true /* canReturnLastCommit */, true /* mustBeRecreatable */, - false /* allowOutOfRange */ ) + false /* canReturnEarliestCommit */ ) + + assert(active.getVersion == 2L) } - assert(exception.getUserVersion == Long.MaxValue) } - test("checkVersionExists: Long.MAX_VALUE with allowOutOfRange=true does not throw") { - val client = new TestUCCatalogManagedClient(TEST_LATEST_VERSION) - val manager = createManager(client) + test("getActiveCommitAtTime: negative and very large timestamps are handled") { + withUCClientAndTestTable { (ucClient, tablePath, _) => + val manager = createManager(ucClient, tablePath) - // Should not throw - manager.checkVersionExists( - Long.MaxValue, - true /* mustBeRecreatable */, - true /* allowOutOfRange */ ) + val negativeTsEx = intercept[KernelException] { + manager.getActiveCommitAtTime( + -100L, + false /* canReturnLastCommit */, + true /* mustBeRecreatable */, + false /* canReturnEarliestCommit */ ) + } + + val largeTsEx = intercept[KernelException] { + manager.getActiveCommitAtTime( + Long.MaxValue, + false /* canReturnLastCommit */, + true /* mustBeRecreatable */, + false /* canReturnEarliestCommit */ ) + } + } } - test("getTableChanges with start version 0 returns range from beginning") { - val client = new TestUCCatalogManagedClient(TEST_LATEST_VERSION) - val engine = createTestEngine() - val manager = createManager(client, engine) + // ==================== getTableChanges ==================== + + test("getTableChanges with explicit end version returns correct commit range") { + withUCClientAndTestTable { (ucClient, tablePath, maxRatifiedVersion) => + val manager = createManager(ucClient, tablePath) - val commitRange = manager.getTableChanges(engine, 0L, Optional.empty()) + val commitRange = manager.getTableChanges(defaultEngine, 0L, Optional.of(maxRatifiedVersion)) - assert(commitRange.getStartVersion == 0L) - assert(commitRange.getEndVersion == TEST_LATEST_VERSION) + assert(commitRange.getStartVersion == 0L) + assert(commitRange.getEndVersion == maxRatifiedVersion) + } } - test("getTableChanges with start equals end returns single-version range") { - val client = new TestUCCatalogManagedClient(TEST_LATEST_VERSION) - val engine = createTestEngine() - val manager = createManager(client, engine) + test("getTableChanges without end version returns range to latest") { + withUCClientAndTestTable { (ucClient, tablePath, maxRatifiedVersion) => + val manager = createManager(ucClient, tablePath) - val commitRange = manager.getTableChanges(engine, 5L, Optional.of(5L)) + val commitRange = manager.getTableChanges(defaultEngine, 1L, Optional.empty()) - assert(commitRange.getStartVersion == 5L) - assert(commitRange.getEndVersion == 5L) + assert(commitRange.getStartVersion == 1L) + assert(commitRange.getEndVersion == maxRatifiedVersion) + } } - // ==================== Exception Propagation Tests ==================== - - /** - * Client that throws exceptions to test error handling. - */ - class ThrowingUCCatalogManagedClient(exceptionToThrow: RuntimeException) - extends UCCatalogManagedClient(new InMemoryUCClient("test-metastore")) { - - override def loadSnapshot( - engine: Engine, - ucTableId: String, - tablePath: String, - versionOpt: Optional[java.lang.Long], - timestampOpt: Optional[java.lang.Long]): Snapshot = { - throw exceptionToThrow + test("getTableChanges with start equals end returns single-version range") { + withUCClientAndTestTable { (ucClient, tablePath, _) => + val manager = createManager(ucClient, tablePath) + + val commitRange = manager.getTableChanges(defaultEngine, 1L, Optional.of(1L)) + + assert(commitRange.getStartVersion == 1L) + assert(commitRange.getEndVersion == 1L) } + } - override def loadCommitRange( - engine: Engine, - ucTableId: String, - tablePath: String, - startVersionOpt: Optional[java.lang.Long], - startTimestampOpt: Optional[java.lang.Long], - endVersionOpt: Optional[java.lang.Long], - endTimestampOpt: Optional[java.lang.Long]): CommitRange = { - throw exceptionToThrow + test("getTableChanges with inverted range (start > end) throws IllegalArgumentException") { + withUCClientAndTestTable { (ucClient, tablePath, _) => + val manager = createManager(ucClient, tablePath) + + val ex = intercept[IllegalArgumentException] { + manager.getTableChanges(defaultEngine, 2L, Optional.of(1L)) + } } } - test("loadLatestSnapshot propagates exceptions from client") { - val expectedException = new RuntimeException("UC connection failed") - val client = new ThrowingUCCatalogManagedClient(expectedException) - val manager = createManager(client) + test("getTableChanges with start beyond latest throws") { + withUCClientAndTestTable { (ucClient, tablePath, maxRatifiedVersion) => + val manager = createManager(ucClient, tablePath) - val thrown = intercept[RuntimeException] { - manager.loadLatestSnapshot() + val ex = intercept[IllegalArgumentException] { + manager.getTableChanges(defaultEngine, maxRatifiedVersion + 5, Optional.empty()) + } } - assert(thrown eq expectedException) } - test("loadSnapshotAt propagates exceptions from client") { - val expectedException = new IllegalArgumentException("Invalid version") - val client = new ThrowingUCCatalogManagedClient(expectedException) - val manager = createManager(client) + test("getTableChanges with start=end=0 returns first commit") { + withUCClientAndTestTable { (ucClient, tablePath, _) => + val manager = createManager(ucClient, tablePath) + + val commitRange = manager.getTableChanges(defaultEngine, 0L, Optional.of(0L)) - val thrown = intercept[IllegalArgumentException] { - manager.loadSnapshotAt(5L) + assert(commitRange.getStartVersion == 0L) + assert(commitRange.getEndVersion == 0L) } - assert(thrown eq expectedException) } - test("checkVersionExists propagates exceptions from loadLatestSnapshot") { - val expectedException = new RuntimeException("Network error") - val client = new ThrowingUCCatalogManagedClient(expectedException) - val manager = createManager(client) + test("getTableChanges with start=end=max returns latest commit") { + withUCClientAndTestTable { (ucClient, tablePath, maxRatifiedVersion) => + val manager = createManager(ucClient, tablePath) - val thrown = intercept[RuntimeException] { - manager.checkVersionExists(5L, true /* mustBeRecreatable */, false /* allowOutOfRange */ ) + val commitRange = manager.getTableChanges( + defaultEngine, + maxRatifiedVersion, + Optional.of(maxRatifiedVersion)) + + assert(commitRange.getStartVersion == maxRatifiedVersion) + assert(commitRange.getEndVersion == maxRatifiedVersion) } - assert(thrown eq expectedException) } - test("getTableChanges propagates exceptions from client") { - val expectedException = new RuntimeException("Commit range not available") - val client = new ThrowingUCCatalogManagedClient(expectedException) - val engine = createTestEngine() - val manager = createManager(client, engine) + // ==================== Exception Propagation ==================== + + test("operations propagate InvalidTargetTableException from client") { + val ucClient = new InMemoryUCClient("ucMetastoreId") + val tableInfo = new UCTableInfo("nonExistentTableId", "/fake/path", testUcUri, testUcToken) + val client = new UCCatalogManagedClient(ucClient) + val manager = new UCManagedTableSnapshotManager(client, tableInfo, defaultEngine) + + val ex1 = intercept[RuntimeException] { manager.loadLatestSnapshot() } + assert(ex1.getCause.isInstanceOf[InvalidTargetTableException]) + + val ex2 = intercept[RuntimeException] { manager.loadSnapshotAt(0L) } + assert(ex2.getCause.isInstanceOf[InvalidTargetTableException]) + + val ex3 = intercept[RuntimeException] { manager.checkVersionExists(0L, true, false) } + assert(ex3.getCause.isInstanceOf[InvalidTargetTableException]) - val thrown = intercept[RuntimeException] { - manager.getTableChanges(engine, 1L, Optional.of(5L)) + val ex4 = intercept[RuntimeException] { + manager.getTableChanges(defaultEngine, 0L, Optional.empty()) } - assert(thrown eq expectedException) + assert(ex4.getCause.isInstanceOf[InvalidTargetTableException]) } } diff --git a/kernel-spark/src/test/scala/io/delta/kernel/spark/snapshot/unitycatalog/UCUtilsSuite.scala b/kernel-spark/src/test/scala/io/delta/kernel/spark/snapshot/unitycatalog/UCUtilsSuite.scala index fa29ca500ff..5f8ec4288b9 100644 --- a/kernel-spark/src/test/scala/io/delta/kernel/spark/snapshot/unitycatalog/UCUtilsSuite.scala +++ b/kernel-spark/src/test/scala/io/delta/kernel/spark/snapshot/unitycatalog/UCUtilsSuite.scala @@ -208,7 +208,9 @@ class UCUtilsSuite extends SparkFunSuite with SharedSparkSession { assert( info.getUcUri == ucUriBeta, s"Should use catalogBeta's URI, got: ${info.getUcUri}") - assert(info.getUcToken == ucTokenBeta, s"Should use catalogBeta's token, got: ${info.getUcToken}") + assert( + info.getUcToken == ucTokenBeta, + s"Should use catalogBeta's token, got: ${info.getUcToken}") assert(info.getTableId == tableIdBeta, s"Should extract tableIdBeta, got: ${info.getTableId}") assert( info.getTablePath == tablePathBeta, From 3c2fa1bdc03f05ca7f819145a592bb235993198a Mon Sep 17 00:00:00 2001 From: TimothyW553 Date: Thu, 11 Dec 2025 09:35:08 +0000 Subject: [PATCH 10/15] Update tests From 1d0ed1df35648a80754ec38442643fc0d1b4a22c Mon Sep 17 00:00:00 2001 From: TimothyW553 Date: Thu, 11 Dec 2025 10:02:39 +0000 Subject: [PATCH 11/15] revert accidental fmt --- .../kernel/spark/snapshot/unitycatalog/UCUtilsSuite.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/kernel-spark/src/test/scala/io/delta/kernel/spark/snapshot/unitycatalog/UCUtilsSuite.scala b/kernel-spark/src/test/scala/io/delta/kernel/spark/snapshot/unitycatalog/UCUtilsSuite.scala index 5f8ec4288b9..fa29ca500ff 100644 --- a/kernel-spark/src/test/scala/io/delta/kernel/spark/snapshot/unitycatalog/UCUtilsSuite.scala +++ b/kernel-spark/src/test/scala/io/delta/kernel/spark/snapshot/unitycatalog/UCUtilsSuite.scala @@ -208,9 +208,7 @@ class UCUtilsSuite extends SparkFunSuite with SharedSparkSession { assert( info.getUcUri == ucUriBeta, s"Should use catalogBeta's URI, got: ${info.getUcUri}") - assert( - info.getUcToken == ucTokenBeta, - s"Should use catalogBeta's token, got: ${info.getUcToken}") + assert(info.getUcToken == ucTokenBeta, s"Should use catalogBeta's token, got: ${info.getUcToken}") assert(info.getTableId == tableIdBeta, s"Should extract tableIdBeta, got: ${info.getTableId}") assert( info.getTablePath == tablePathBeta, From cf02a71eeb3c9fea85bd143f5faaff108ca77f21 Mon Sep 17 00:00:00 2001 From: TimothyW553 Date: Thu, 11 Dec 2025 22:27:48 +0000 Subject: [PATCH 12/15] fix earliest version from delta --- .../UCManagedTableSnapshotManager.java | 22 ++++++++++++++----- 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/kernel-spark/src/main/java/io/delta/kernel/spark/snapshot/unitycatalog/UCManagedTableSnapshotManager.java b/kernel-spark/src/main/java/io/delta/kernel/spark/snapshot/unitycatalog/UCManagedTableSnapshotManager.java index 69fa18b44ce..b6840479e88 100644 --- a/kernel-spark/src/main/java/io/delta/kernel/spark/snapshot/unitycatalog/UCManagedTableSnapshotManager.java +++ b/kernel-spark/src/main/java/io/delta/kernel/spark/snapshot/unitycatalog/UCManagedTableSnapshotManager.java @@ -103,18 +103,28 @@ public DeltaHistoryManager.Commit getActiveCommitAtTime( /** * Checks if a specific version exists and is accessible. * - *

For UC-managed tables, all ratified commits are available, so the earliest version is - * typically 0. This method validates that the requested version is within the valid range. + *

For UC-managed tables with CCv2, log files may be cleaned up, so we need to use + * DeltaHistoryManager to find the earliest available version based on filesystem state. */ @Override public void checkVersionExists(long version, boolean mustBeRecreatable, boolean allowOutOfRange) throws VersionNotFoundException { // Load latest to get the current version bounds - Snapshot latestSnapshot = loadLatestSnapshot(); - long latestVersion = latestSnapshot.getVersion(); + SnapshotImpl snapshot = (SnapshotImpl) loadLatestSnapshot(); + long latestVersion = snapshot.getVersion(); + + // Compute earliestRatifiedCommitVersion from catalog commits + List catalogCommits = snapshot.getLogSegment().getAllCatalogCommits(); + Optional earliestRatifiedCommitVersion = + catalogCommits.stream().map(ParsedCatalogCommitData::getVersion).min(Long::compare); - // For UC tables, earliest recreatable version is 0 (all ratified commits are available) - long earliestVersion = 0; + // Use DeltaHistoryManager to find earliest version based on filesystem state + long earliestVersion = + mustBeRecreatable + ? DeltaHistoryManager.getEarliestRecreatableCommit( + engine, snapshot.getLogPath(), earliestRatifiedCommitVersion) + : DeltaHistoryManager.getEarliestDeltaFile( + engine, snapshot.getLogPath(), earliestRatifiedCommitVersion); if (version < earliestVersion || ((version > latestVersion) && !allowOutOfRange)) { throw new VersionNotFoundException(version, earliestVersion, latestVersion); From 0d469d88c70f1223b2a071abb1823acca8d66cd4 Mon Sep 17 00:00:00 2001 From: TimothyW553 Date: Thu, 11 Dec 2025 22:27:57 +0000 Subject: [PATCH 13/15] fix earliest version from delta From 7c9350966d99c0c2a5a675755d5ff4e87627f0c2 Mon Sep 17 00:00:00 2001 From: TimothyW553 Date: Tue, 16 Dec 2025 00:06:11 +0000 Subject: [PATCH 14/15] Address comments regarding naming, early exit, and consolidating tests together --- .../UCManagedTableSnapshotManager.java | 11 +- .../UCManagedTableSnapshotManagerTest.java | 100 ------------------ .../UCManagedTableSnapshotManagerSuite.scala | 23 ++++ 3 files changed, 31 insertions(+), 103 deletions(-) delete mode 100644 kernel-spark/src/test/java/io/delta/kernel/spark/snapshot/unitycatalog/UCManagedTableSnapshotManagerTest.java diff --git a/kernel-spark/src/main/java/io/delta/kernel/spark/snapshot/unitycatalog/UCManagedTableSnapshotManager.java b/kernel-spark/src/main/java/io/delta/kernel/spark/snapshot/unitycatalog/UCManagedTableSnapshotManager.java index b6840479e88..10bb213a75c 100644 --- a/kernel-spark/src/main/java/io/delta/kernel/spark/snapshot/unitycatalog/UCManagedTableSnapshotManager.java +++ b/kernel-spark/src/main/java/io/delta/kernel/spark/snapshot/unitycatalog/UCManagedTableSnapshotManager.java @@ -111,7 +111,12 @@ public void checkVersionExists(long version, boolean mustBeRecreatable, boolean throws VersionNotFoundException { // Load latest to get the current version bounds SnapshotImpl snapshot = (SnapshotImpl) loadLatestSnapshot(); - long latestVersion = snapshot.getVersion(); + long latestRatifiedVersion = snapshot.getVersion(); + + // Fast path: check upper bound before expensive filesystem operations + if ((version > latestRatifiedVersion) && !allowOutOfRange) { + throw new VersionNotFoundException(version, 0, latestRatifiedVersion); + } // Compute earliestRatifiedCommitVersion from catalog commits List catalogCommits = snapshot.getLogSegment().getAllCatalogCommits(); @@ -126,8 +131,8 @@ public void checkVersionExists(long version, boolean mustBeRecreatable, boolean : DeltaHistoryManager.getEarliestDeltaFile( engine, snapshot.getLogPath(), earliestRatifiedCommitVersion); - if (version < earliestVersion || ((version > latestVersion) && !allowOutOfRange)) { - throw new VersionNotFoundException(version, earliestVersion, latestVersion); + if (version < earliestVersion) { + throw new VersionNotFoundException(version, earliestVersion, latestRatifiedVersion); } } diff --git a/kernel-spark/src/test/java/io/delta/kernel/spark/snapshot/unitycatalog/UCManagedTableSnapshotManagerTest.java b/kernel-spark/src/test/java/io/delta/kernel/spark/snapshot/unitycatalog/UCManagedTableSnapshotManagerTest.java deleted file mode 100644 index b01f4b3e411..00000000000 --- a/kernel-spark/src/test/java/io/delta/kernel/spark/snapshot/unitycatalog/UCManagedTableSnapshotManagerTest.java +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Copyright (2025) The Delta Lake Project Authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.delta.kernel.spark.snapshot.unitycatalog; - -import static org.junit.jupiter.api.Assertions.*; - -import io.delta.kernel.defaults.engine.DefaultEngine; -import io.delta.kernel.engine.Engine; -import io.delta.kernel.unitycatalog.InMemoryUCClient; -import io.delta.kernel.unitycatalog.UCCatalogManagedClient; -import org.apache.hadoop.conf.Configuration; -import org.junit.jupiter.api.Test; - -/** - * Unit tests for {@link UCManagedTableSnapshotManager}. - * - *

These tests focus on constructor validation and basic instantiation. Integration tests for - * actual functionality are in {@code UCManagedTableSnapshotManagerSuite}. - */ -public class UCManagedTableSnapshotManagerTest { - - private static final String TEST_TABLE_ID = "test_uc_table_id"; - private static final String TEST_TABLE_PATH = "/test/path/to/table"; - private static final String TEST_UC_URI = "https://uc.example.com"; - private static final String TEST_UC_TOKEN = "test_token"; - - private UCTableInfo createTestTableInfo() { - return new UCTableInfo(TEST_TABLE_ID, TEST_TABLE_PATH, TEST_UC_URI, TEST_UC_TOKEN); - } - - private UCCatalogManagedClient createTestClient() { - InMemoryUCClient ucClient = new InMemoryUCClient("test_metastore"); - return new UCCatalogManagedClient(ucClient); - } - - private Engine createTestEngine() { - return DefaultEngine.create(new Configuration()); - } - - @Test - public void testConstructor_NullClient_ThrowsNPE() { - UCTableInfo tableInfo = createTestTableInfo(); - Engine engine = createTestEngine(); - - NullPointerException exception = - assertThrows( - NullPointerException.class, - () -> new UCManagedTableSnapshotManager(null, tableInfo, engine)); - assertEquals("ucCatalogManagedClient is null", exception.getMessage()); - } - - @Test - public void testConstructor_NullTableInfo_ThrowsNPE() { - UCCatalogManagedClient client = createTestClient(); - Engine engine = createTestEngine(); - - NullPointerException exception = - assertThrows( - NullPointerException.class, - () -> new UCManagedTableSnapshotManager(client, null, engine)); - assertEquals("tableInfo is null", exception.getMessage()); - } - - @Test - public void testConstructor_NullEngine_ThrowsNPE() { - UCCatalogManagedClient client = createTestClient(); - UCTableInfo tableInfo = createTestTableInfo(); - - NullPointerException exception = - assertThrows( - NullPointerException.class, - () -> new UCManagedTableSnapshotManager(client, tableInfo, null)); - assertEquals("engine is null", exception.getMessage()); - } - - @Test - public void testConstructor_ValidInputs_CreatesManager() { - UCCatalogManagedClient client = createTestClient(); - UCTableInfo tableInfo = createTestTableInfo(); - Engine engine = createTestEngine(); - - UCManagedTableSnapshotManager manager = - new UCManagedTableSnapshotManager(client, tableInfo, engine); - - assertNotNull(manager); - } -} diff --git a/kernel-spark/src/test/scala/io/delta/kernel/spark/snapshot/unitycatalog/UCManagedTableSnapshotManagerSuite.scala b/kernel-spark/src/test/scala/io/delta/kernel/spark/snapshot/unitycatalog/UCManagedTableSnapshotManagerSuite.scala index 26d12d6f3a1..2be85911597 100644 --- a/kernel-spark/src/test/scala/io/delta/kernel/spark/snapshot/unitycatalog/UCManagedTableSnapshotManagerSuite.scala +++ b/kernel-spark/src/test/scala/io/delta/kernel/spark/snapshot/unitycatalog/UCManagedTableSnapshotManagerSuite.scala @@ -41,6 +41,29 @@ class UCManagedTableSnapshotManagerSuite new UCManagedTableSnapshotManager(client, tableInfo, defaultEngine) } + // ==================== Constructor ==================== + + test("constructor rejects null arguments") { + val ucClient = new InMemoryUCClient("testMetastore") + val client = new UCCatalogManagedClient(ucClient) + val tableInfo = new UCTableInfo(testUcTableId, "/test/path", testUcUri, testUcToken) + + val ex1 = intercept[NullPointerException] { + new UCManagedTableSnapshotManager(null, tableInfo, defaultEngine) + } + assert(ex1.getMessage == "ucCatalogManagedClient is null") + + val ex2 = intercept[NullPointerException] { + new UCManagedTableSnapshotManager(client, null, defaultEngine) + } + assert(ex2.getMessage == "tableInfo is null") + + val ex3 = intercept[NullPointerException] { + new UCManagedTableSnapshotManager(client, tableInfo, null) + } + assert(ex3.getMessage == "engine is null") + } + // ==================== loadLatestSnapshot ==================== test("loadLatestSnapshot returns snapshot at max ratified version") { From 5ee508eb5acbf27a9d30d4b6f05fc2e106393261 Mon Sep 17 00:00:00 2001 From: TimothyW553 Date: Tue, 16 Dec 2025 04:23:26 +0000 Subject: [PATCH 15/15] add test --- .../UCManagedTableSnapshotManagerSuite.scala | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/kernel-spark/src/test/scala/io/delta/kernel/spark/snapshot/unitycatalog/UCManagedTableSnapshotManagerSuite.scala b/kernel-spark/src/test/scala/io/delta/kernel/spark/snapshot/unitycatalog/UCManagedTableSnapshotManagerSuite.scala index 2be85911597..82887d39f49 100644 --- a/kernel-spark/src/test/scala/io/delta/kernel/spark/snapshot/unitycatalog/UCManagedTableSnapshotManagerSuite.scala +++ b/kernel-spark/src/test/scala/io/delta/kernel/spark/snapshot/unitycatalog/UCManagedTableSnapshotManagerSuite.scala @@ -241,6 +241,20 @@ class UCManagedTableSnapshotManagerSuite } } + test("getActiveCommitAtTime: non-recreatable path returns earliest delta file") { + withUCClientAndTestTable { (ucClient, tablePath, _) => + val manager = createManager(ucClient, tablePath) + + val active = manager.getActiveCommitAtTime( + v0Ts - 1, + false /* canReturnLastCommit */, + false /* mustBeRecreatable */, + true /* canReturnEarliestCommit */ ) + + assert(active.getVersion == 0L) + } + } + test("getActiveCommitAtTime: exact boundaries and between commits") { withUCClientAndTestTable { (ucClient, tablePath, _) => val manager = createManager(ucClient, tablePath)