From a69728ccfbed1b036a3951720c5a6fd4dde9e7b9 Mon Sep 17 00:00:00 2001 From: Scott Sandre Date: Tue, 2 Dec 2025 14:31:39 -0800 Subject: [PATCH 1/6] [Kernel] [UC] Handle UC now returning maxCatalogVersion 0 for a newly created table (#5570) Use this [link](https://github.com/delta-io/delta/pull/5570/files) to review incremental changes. - [**stack/kernel_uc_returns_max_ratified_version_0_after_create**](https://github.com/delta-io/delta/pull/5570) [[Files changed](https://github.com/delta-io/delta/pull/5570/files)] --------- - [ ] Spark - [ ] Standalone - [ ] Flink - [X] Kernel - [ ] Other (fill in here) UC now returns 0 as the maxCatalogVersion for newly created tables, instead of -1. This PR removes code that handled such cases (mapping -1 to the "true" version of 0). While we're at it, also does some minor test cleanup and refactors. Minor logic change. Updated existing UTs. No. (cherry picked from commit d6c53a3f47c406821df711535eccd197cd9feb2f) --- .../unitycatalog/UCCatalogManagedClient.java | 35 +++++++------------ .../unitycatalog/InMemoryUCClient.scala | 25 +++++++------ .../unitycatalog/InMemoryUCClientSuite.scala | 26 ++++---------- ...CatalogManagedClientCommitRangeSuite.scala | 9 ++--- .../UCCatalogManagedClientSuite.scala | 8 ++--- .../UCCatalogManagedCommitterSuite.scala | 16 ++++----- .../UCCatalogManagedTestUtils.scala | 22 ++---------- .../kernel/unitycatalog/UCE2ESuite.scala | 9 ++--- .../unitycatalog/UcCommitTelemetrySuite.scala | 28 ++++++++------- .../UcLoadSnapshotTelemetrySuite.scala | 21 ++++++----- .../UcPublishTelemetrySuite.scala | 20 ++++++----- 11 files changed, 91 insertions(+), 128 deletions(-) diff --git a/kernel/unitycatalog/src/main/java/io/delta/kernel/unitycatalog/UCCatalogManagedClient.java b/kernel/unitycatalog/src/main/java/io/delta/kernel/unitycatalog/UCCatalogManagedClient.java index fc43646688a..435b4081c01 100644 --- a/kernel/unitycatalog/src/main/java/io/delta/kernel/unitycatalog/UCCatalogManagedClient.java +++ b/kernel/unitycatalog/src/main/java/io/delta/kernel/unitycatalog/UCCatalogManagedClient.java @@ -126,10 +126,12 @@ public Snapshot loadSnapshot( metricsCollector.setNumCatalogCommits(response.getCommits().size()); - final long ucTableVersion = - getTrueUCTableVersion(ucTableId, response.getLatestTableVersion()); + final long maxUcTableVersion = response.getLatestTableVersion(); + versionOpt.ifPresent( - version -> validateLoadTableVersionExists(ucTableId, version, ucTableVersion)); + version -> + validateTimeTravelVersionNotPastMax(ucTableId, version, maxUcTableVersion)); + final List logData = getSortedKernelParsedDeltaDataFromRatifiedCommits( ucTableId, response.getCommits()); @@ -151,7 +153,11 @@ public Snapshot loadSnapshot( .timeChecked( () -> loadLatestSnapshotForTimestampResolution( - engine, ucTableId, tablePath, logData, ucTableVersion)); + engine, + ucTableId, + tablePath, + logData, + maxUcTableVersion)); snapshotBuilder = snapshotBuilder.atTimestamp(timestampOpt.get(), latestSnapshot); } @@ -160,7 +166,7 @@ public Snapshot loadSnapshot( snapshotBuilder .withCommitter(createUCCommitter(ucClient, ucTableId, tablePath)) .withLogData(logData) - .withMaxCatalogVersion(ucTableVersion) + .withMaxCatalogVersion(maxUcTableVersion) .build(engine); metricsCollector.setResolvedSnapshotVersion(snapshot.getVersion()); return snapshot; @@ -269,7 +275,7 @@ public CommitRange loadCommitRange( endVersionOpt.filter(v -> !startTimestampOpt.isPresent()); final GetCommitsResponse response = getRatifiedCommitsFromUC(ucTableId, tablePath, endVersionOptForCommitQuery); - final long ucTableVersion = getTrueUCTableVersion(ucTableId, response.getLatestTableVersion()); + final long ucTableVersion = response.getLatestTableVersion(); validateVersionBoundariesExist(ucTableId, startVersionOpt, endVersionOpt, ucTableVersion); final List logData = getSortedKernelParsedDeltaDataFromRatifiedCommits(ucTableId, response.getCommits()); @@ -406,22 +412,7 @@ private GetCommitsResponse getRatifiedCommitsFromUC( return response; } - // TODO: [delta-io/delta#5118] If UC changes CREATE semantics, update logic here. - /** - * As of this writing, UC catalog service is not informed when 0.json is successfully written - * during table creation. Thus, when 0.json exists, the max ratified version returned by UC is -1. - */ - private long getTrueUCTableVersion(String ucTableId, long maxRatifiedVersion) { - if (maxRatifiedVersion == -1) { - logger.info( - "[{}] UC max ratified version is -1. This means 0.json exists. Version is 0.", ucTableId); - return 0; - } - - return maxRatifiedVersion; - } - - private void validateLoadTableVersionExists( + private void validateTimeTravelVersionNotPastMax( String ucTableId, long tableVersionToLoad, long maxRatifiedVersion) { if (tableVersionToLoad > maxRatifiedVersion) { throw new IllegalArgumentException( diff --git a/kernel/unitycatalog/src/test/scala/io/delta/kernel/unitycatalog/InMemoryUCClient.scala b/kernel/unitycatalog/src/test/scala/io/delta/kernel/unitycatalog/InMemoryUCClient.scala index 04dffaea04f..4ff8c2c3ebc 100644 --- a/kernel/unitycatalog/src/test/scala/io/delta/kernel/unitycatalog/InMemoryUCClient.scala +++ b/kernel/unitycatalog/src/test/scala/io/delta/kernel/unitycatalog/InMemoryUCClient.scala @@ -38,14 +38,14 @@ object InMemoryUCClient { * concurrently. */ class TableData( - private var maxRatifiedVersion: Long = -1L, - private val commits: ArrayBuffer[Commit] = ArrayBuffer.empty) { + private var maxRatifiedVersion: Long, + private val commits: ArrayBuffer[Commit]) { // For test only, since UC doesn't store these as top-level entities. private var currentProtocolOpt: Option[AbstractProtocol] = None private var currentMetadataOpt: Option[AbstractMetadata] = None - /** @return the maximum ratified version, or -1 if no commits have been made. */ + /** @return the maximum ratified version. */ def getMaxRatifiedVersion: Long = synchronized { maxRatifiedVersion } /** @return An immutable list of all commits. */ @@ -74,9 +74,7 @@ object InMemoryUCClient { commit: Commit, newProtocol: Optional[AbstractProtocol] = Optional.empty(), newMetadata: Optional[AbstractMetadata] = Optional.empty()): Unit = synchronized { - // TODO: [delta-io/delta#5118] If UC changes CREATE semantics, update logic here. - // For UC, commit 0 is expected to go through the filesystem - val expectedCommitVersion = if (maxRatifiedVersion == -1L) 1 else maxRatifiedVersion + 1 + val expectedCommitVersion = maxRatifiedVersion + 1 if (commit.getVersion != expectedCommitVersion) { throw new CommitFailedException( @@ -103,6 +101,10 @@ object InMemoryUCClient { } } } + + object TableData { + def afterCreate(): TableData = new TableData(0, ArrayBuffer.empty[Commit]) + } } /** @@ -186,9 +188,12 @@ class InMemoryUCClient(ucMetastoreId: String) extends UCClient { /** Visible for testing. Can be overridden to force an exception in commit method. */ protected def forceThrowInCommitMethod(): Unit = {} - private[unitycatalog] def createTableIfNotExistsOrThrow( - ucTableId: String, - tableData: TableData): Unit = { + private[unitycatalog] def insertTableDataAfterCreate(ucTableId: String): Unit = { + Option(tables.putIfAbsent(ucTableId, TableData.afterCreate())) + .foreach(_ => throw new IllegalArgumentException(s"Table $ucTableId already exists")) + } + + private[unitycatalog] def insertTableData(ucTableId: String, tableData: TableData): Unit = { Option(tables.putIfAbsent(ucTableId, tableData)) .foreach(_ => throw new IllegalArgumentException(s"Table $ucTableId already exists")) } @@ -205,6 +210,6 @@ class InMemoryUCClient(ucMetastoreId: String) extends UCClient { /** Retrieves the table data for the given table ID, creating it if it does not exist. */ private def getOrCreateTableIfNotExists(tableId: String): TableData = { - tables.computeIfAbsent(tableId, _ => new TableData) + tables.computeIfAbsent(tableId, _ => TableData.afterCreate()) } } diff --git a/kernel/unitycatalog/src/test/scala/io/delta/kernel/unitycatalog/InMemoryUCClientSuite.scala b/kernel/unitycatalog/src/test/scala/io/delta/kernel/unitycatalog/InMemoryUCClientSuite.scala index bac259dc9e6..a593d4609c0 100644 --- a/kernel/unitycatalog/src/test/scala/io/delta/kernel/unitycatalog/InMemoryUCClientSuite.scala +++ b/kernel/unitycatalog/src/test/scala/io/delta/kernel/unitycatalog/InMemoryUCClientSuite.scala @@ -21,8 +21,9 @@ import java.net.URI import java.util.Optional import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer -import io.delta.storage.commit.CommitFailedException +import io.delta.storage.commit.{Commit, CommitFailedException} import io.delta.storage.commit.uccommitcoordinator.InvalidTargetTableException import org.scalatest.funsuite.AnyFunSuite @@ -42,21 +43,9 @@ class InMemoryUCClientSuite extends AnyFunSuite with UCCatalogManagedTestUtils { assert(actualVersions == expectedVersions) } - // TODO: [delta-io/delta#5118] If UC changes CREATE semantics, update logic here. - test("TableData::appendCommit throws on commit v0 (since CREATE does not go through UC)") { - val tableData = new InMemoryUCClient.TableData - - val exMsg = intercept[CommitFailedException] { - tableData.appendCommit(createCommit(0L)) - }.getMessage - - assert(exMsg.contains("Expected commit version 1 but got 0")) - } - - // TODO: [delta-io/delta#5118] If UC changes CREATE semantics, update logic here. test("TableData::appendCommit handles commit version 1 (since CREATE does not go through UC)") { - val tableData = new InMemoryUCClient.TableData - assert(tableData.getMaxRatifiedVersion == -1L) + val tableData = InMemoryUCClient.TableData.afterCreate() + assert(tableData.getMaxRatifiedVersion == 0L) tableData.appendCommit(createCommit(1L)) @@ -65,9 +54,8 @@ class InMemoryUCClientSuite extends AnyFunSuite with UCCatalogManagedTestUtils { assert(tableData.getCommits.head.getVersion == 1L) } - test("TableData::appendCommit throws if commit version is not maxRatifiedVersion + 1 " + - "(excluding v1 edge case)") { - val tableData = new InMemoryUCClient.TableData + test("TableData::appendCommit throws if commit version is not maxRatifiedVersion + 1") { + val tableData = InMemoryUCClient.TableData.afterCreate() tableData.appendCommit(createCommit(1L)) val exMsg = intercept[CommitFailedException] { @@ -78,7 +66,7 @@ class InMemoryUCClientSuite extends AnyFunSuite with UCCatalogManagedTestUtils { } test("TableData::appendCommit appends the commit and updates the maxRatifiedVersion") { - val tableData = new InMemoryUCClient.TableData + val tableData = InMemoryUCClient.TableData.afterCreate() tableData.appendCommit(createCommit(1L)) assert(tableData.getMaxRatifiedVersion == 1L) diff --git a/kernel/unitycatalog/src/test/scala/io/delta/kernel/unitycatalog/UCCatalogManagedClientCommitRangeSuite.scala b/kernel/unitycatalog/src/test/scala/io/delta/kernel/unitycatalog/UCCatalogManagedClientCommitRangeSuite.scala index b60dac7c83a..42303a2286d 100644 --- a/kernel/unitycatalog/src/test/scala/io/delta/kernel/unitycatalog/UCCatalogManagedClientCommitRangeSuite.scala +++ b/kernel/unitycatalog/src/test/scala/io/delta/kernel/unitycatalog/UCCatalogManagedClientCommitRangeSuite.scala @@ -265,13 +265,10 @@ class UCCatalogManagedClientCommitRangeSuite extends AnyFunSuite with UCCatalogM assert(ex.getCause.isInstanceOf[InvalidTargetTableException]) } - test("loadCommitRange for new table when UC maxRatifiedVersion is -1") { + test("loadCommitRange for new table when UC maxRatifiedVersion is 0") { val tablePath = getTestResourceFilePath("catalog-owned-preview") - val ucCatalogManagedClient = - createUCCatalogManagedClientForTableWithMaxRatifiedVersionNegativeOne() - val commitRange = loadCommitRange( - ucCatalogManagedClient, - tablePath = tablePath) + val ucCatalogManagedClient = createUCCatalogManagedClientForTableAfterCreate() + val commitRange = loadCommitRange(ucCatalogManagedClient, tablePath = tablePath) assert(commitRange.getStartVersion == 0) assert(commitRange.getEndVersion == 0) diff --git a/kernel/unitycatalog/src/test/scala/io/delta/kernel/unitycatalog/UCCatalogManagedClientSuite.scala b/kernel/unitycatalog/src/test/scala/io/delta/kernel/unitycatalog/UCCatalogManagedClientSuite.scala index 7ac89314103..7a490caac39 100644 --- a/kernel/unitycatalog/src/test/scala/io/delta/kernel/unitycatalog/UCCatalogManagedClientSuite.scala +++ b/kernel/unitycatalog/src/test/scala/io/delta/kernel/unitycatalog/UCCatalogManagedClientSuite.scala @@ -134,10 +134,9 @@ class UCCatalogManagedClientSuite extends AnyFunSuite with UCCatalogManagedTestU (javaLongOpt(0L), emptyLongOpt, "v0 (explicitly by version)"), (emptyLongOpt, javaLongOpt(1749830855993L), "v0 (explicitly by timestamp")).foreach { case (versionToLoad, timestampToLoad, description) => - test(s"table version 0 is loaded when UC maxRatifiedVersion is -1 -- $description") { + test(s"table version 0 is loaded when UC maxRatifiedVersion is 0 -- $description") { val tablePath = getTestResourceFilePath("catalog-owned-preview") - val ucCatalogManagedClient = - createUCCatalogManagedClientForTableWithMaxRatifiedVersionNegativeOne() + val ucCatalogManagedClient = createUCCatalogManagedClientForTableAfterCreate() val snapshot = loadSnapshot( ucCatalogManagedClient, tablePath = tablePath, @@ -263,8 +262,7 @@ class UCCatalogManagedClientSuite extends AnyFunSuite with UCCatalogManagedTestU test("creates snapshot with UCCatalogManagedCommitter") { val tablePath = getTestResourceFilePath("catalog-owned-preview") - val ucCatalogManagedClient = - createUCCatalogManagedClientForTableWithMaxRatifiedVersionNegativeOne() + val ucCatalogManagedClient = createUCCatalogManagedClientForTableAfterCreate() val snapshot = loadSnapshot(ucCatalogManagedClient, tablePath = tablePath, versionToLoad = Optional.of(0L)) assert(snapshot.getCommitter.isInstanceOf[UCCatalogManagedCommitter]) diff --git a/kernel/unitycatalog/src/test/scala/io/delta/kernel/unitycatalog/UCCatalogManagedCommitterSuite.scala b/kernel/unitycatalog/src/test/scala/io/delta/kernel/unitycatalog/UCCatalogManagedCommitterSuite.scala index 91bc1ad420b..ac9dde4d75e 100644 --- a/kernel/unitycatalog/src/test/scala/io/delta/kernel/unitycatalog/UCCatalogManagedCommitterSuite.scala +++ b/kernel/unitycatalog/src/test/scala/io/delta/kernel/unitycatalog/UCCatalogManagedCommitterSuite.scala @@ -210,8 +210,7 @@ class UCCatalogManagedCommitterSuite withTempDirAndAllDeltaSubDirs { case (tablePath, logPath) => // ===== GIVEN ===== val ucClient = new InMemoryUCClient("ucMetastoreId") - ucClient - .createTableIfNotExistsOrThrow(testUcTableId, new TableData(-1, ArrayBuffer[Commit]())) + ucClient.insertTableDataAfterCreate(testUcTableId) val committer = new UCCatalogManagedCommitter(ucClient, testUcTableId, tablePath) // ===== WHEN ===== @@ -244,11 +243,8 @@ class UCCatalogManagedCommitterSuite test("CATALOG_WRITE: writes staged commit file and invokes UC client commit API (no P&M change") { withTempDirAndAllDeltaSubDirs { case (tablePath, logPath) => // ===== GIVEN ===== - // Set up UC client with initial table with maxRatifiedVersion = -1, numCommits = 0. This - // represents a table that was just created and at version 0. We will then commit version 1. val ucClient = new InMemoryUCClient("ucMetastoreId") - val tableData = new TableData(-1, ArrayBuffer[Commit]()) - ucClient.createTableIfNotExistsOrThrow(testUcTableId, tableData) + ucClient.insertTableDataAfterCreate(testUcTableId) val testValue = "TEST_COMMIT_DATA_12345" val actionsIterator = getSingleElementRowIter(testValue) @@ -304,7 +300,7 @@ class UCCatalogManagedCommitterSuite val ucClient = new InMemoryUCClient("ucMetastoreId") val tableData = new TableData(maxRatifiedVersion = 1, commits = ArrayBuffer.empty[Commit]) - ucClient.createTableIfNotExistsOrThrow(testUcTableId, tableData) + ucClient.insertTableData(testUcTableId, tableData) val committer = new UCCatalogManagedCommitter(ucClient, testUcTableId, tablePath) val commitMetadata = catalogManagedWriteCommitMetadata(2, logPath = logPath) @@ -331,7 +327,7 @@ class UCCatalogManagedCommitterSuite null) } val tableData = new TableData(maxRatifiedVersion = 1, commits = ArrayBuffer.empty[Commit]) - ucClient.createTableIfNotExistsOrThrow(testUcTableId, tableData) + ucClient.insertTableData(testUcTableId, tableData) val committer = new UCCatalogManagedCommitter(ucClient, testUcTableId, tablePath) val commitMetadata = catalogManagedWriteCommitMetadata(2, logPath = logPath) // ===== WHEN ===== @@ -352,7 +348,7 @@ class UCCatalogManagedCommitterSuite override def forceThrowInCommitMethod(): Unit = throw new IOException("UC network error") } val tableData = new TableData(maxRatifiedVersion = 1, commits = ArrayBuffer.empty[Commit]) - ucClient.createTableIfNotExistsOrThrow(testUcTableId, tableData) + ucClient.insertTableData(testUcTableId, tableData) val committer = new UCCatalogManagedCommitter(ucClient, testUcTableId, tablePath) val commitMetadata = catalogManagedWriteCommitMetadata(2, logPath = logPath) @@ -377,7 +373,7 @@ class UCCatalogManagedCommitterSuite } } val tableData = new TableData(maxRatifiedVersion = 1, commits = ArrayBuffer.empty[Commit]) - ucClient.createTableIfNotExistsOrThrow(testUcTableId, tableData) + ucClient.insertTableData(testUcTableId, tableData) val committer = new UCCatalogManagedCommitter(ucClient, "unknownTableId", tablePath) val commitMetadata = catalogManagedWriteCommitMetadata(2, logPath = logPath) diff --git a/kernel/unitycatalog/src/test/scala/io/delta/kernel/unitycatalog/UCCatalogManagedTestUtils.scala b/kernel/unitycatalog/src/test/scala/io/delta/kernel/unitycatalog/UCCatalogManagedTestUtils.scala index 8bf6e1fbf43..83c4a9ecf0f 100644 --- a/kernel/unitycatalog/src/test/scala/io/delta/kernel/unitycatalog/UCCatalogManagedTestUtils.scala +++ b/kernel/unitycatalog/src/test/scala/io/delta/kernel/unitycatalog/UCCatalogManagedTestUtils.scala @@ -152,16 +152,6 @@ trait UCCatalogManagedTestUtils (ucClient, ucCatalogManagedClient) } - /** - * Initializes a UC table in the InMemoryUCClient after creation. - * This should be called after creating a table with buildCreateTableTransaction. - */ - def initializeUCTable(ucClient: InMemoryUCClient, ucTableId: String): Unit = { - val tableData = - new InMemoryUCClient.TableData(-1, scala.collection.mutable.ArrayBuffer[Commit]()) - ucClient.createTableIfNotExistsOrThrow(ucTableId, tableData) - } - /** Version TS for the test table used in [[withUCClientAndTestTable]] */ val v0Ts = 1749830855993L // published commit val v1Ts = 1749830871085L // ratified staged commit @@ -189,20 +179,14 @@ trait UCCatalogManagedTestUtils fileStatus.getModificationTime) } val tableData = new TableData(maxRatifiedVersion, ArrayBuffer(catalogCommits: _*)) - ucClient.createTableIfNotExistsOrThrow("testUcTableId", tableData) + ucClient.insertTableData("testUcTableId", tableData) textFx(ucClient, tablePath, maxRatifiedVersion) } - // TODO: [delta-io/delta#5118] If UC changes CREATE semantics, update logic here. - /** - * When a new UC table is created, it will have Delta version 0 but the max ratified verison in - * UC is -1. This is a special edge case. - */ - def createUCCatalogManagedClientForTableWithMaxRatifiedVersionNegativeOne( + def createUCCatalogManagedClientForTableAfterCreate( ucTableId: String = "testUcTableId"): UCCatalogManagedClient = { val ucClient = new InMemoryUCClient("ucMetastoreId") - val tableData = new TableData(-1, ArrayBuffer[Commit]()) - ucClient.createTableIfNotExistsOrThrow(ucTableId, tableData) + ucClient.insertTableDataAfterCreate(ucTableId) new UCCatalogManagedClient(ucClient) } diff --git a/kernel/unitycatalog/src/test/scala/io/delta/kernel/unitycatalog/UCE2ESuite.scala b/kernel/unitycatalog/src/test/scala/io/delta/kernel/unitycatalog/UCE2ESuite.scala index ab33d0bb1c6..781b61e1f9d 100644 --- a/kernel/unitycatalog/src/test/scala/io/delta/kernel/unitycatalog/UCE2ESuite.scala +++ b/kernel/unitycatalog/src/test/scala/io/delta/kernel/unitycatalog/UCE2ESuite.scala @@ -64,8 +64,7 @@ class UCE2ESuite extends AnyFunSuite with UCCatalogManagedTestUtils { .buildCreateTableTransaction(testUcTableId, tablePath, testSchema, "test-engine") .build(engine) .commit(engine, CloseableIterable.emptyIterable() /* dataActions */ ) - val tableData0 = new TableData(-1, ArrayBuffer[Commit]()) - ucClient.createTableIfNotExistsOrThrow(testUcTableId, tableData0) + ucClient.insertTableDataAfterCreate(testUcTableId) result0.getPostCommitSnapshot.get().publish(engine) // Should be no-op! // Step 2: WRITE -- v1.uuid.json @@ -131,8 +130,7 @@ class UCE2ESuite extends AnyFunSuite with UCCatalogManagedTestUtils { .buildCreateTableTransaction(testUcTableId, tablePath, testSchema, "test-engine") .build(engine) .commit(engine, CloseableIterable.emptyIterable()) - val tableData0 = new TableData(-1, ArrayBuffer[Commit]()) - ucClient.createTableIfNotExistsOrThrow(testUcTableId, tableData0) + ucClient.insertTableDataAfterCreate(testUcTableId) var currentSnapshot = result0.getPostCommitSnapshot.get() @@ -198,8 +196,7 @@ class UCE2ESuite extends AnyFunSuite with UCCatalogManagedTestUtils { .buildCreateTableTransaction(testUcTableId, tablePath, testSchema, "test-engine") .build(engine) .commit(engine, CloseableIterable.emptyIterable()) - val tableData0 = new TableData(-1, ArrayBuffer[Commit]()) - ucClient.createTableIfNotExistsOrThrow(testUcTableId, tableData0) + ucClient.insertTableDataAfterCreate(testUcTableId) // Step 2: WRITE and commit data up to version 2 val postCommitSnapshot1 = writeDataAndVerify( diff --git a/kernel/unitycatalog/src/test/scala/io/delta/kernel/unitycatalog/UcCommitTelemetrySuite.scala b/kernel/unitycatalog/src/test/scala/io/delta/kernel/unitycatalog/UcCommitTelemetrySuite.scala index 233c8ef7899..bbc4ab2583c 100644 --- a/kernel/unitycatalog/src/test/scala/io/delta/kernel/unitycatalog/UcCommitTelemetrySuite.scala +++ b/kernel/unitycatalog/src/test/scala/io/delta/kernel/unitycatalog/UcCommitTelemetrySuite.scala @@ -18,14 +18,18 @@ package io.delta.kernel.unitycatalog import java.util.Optional +import scala.collection.mutable.ArrayBuffer + import io.delta.kernel.Operation import io.delta.kernel.commit.{CommitFailedException, CommitMetadata} import io.delta.kernel.data.Row import io.delta.kernel.defaults.engine.DefaultEngine import io.delta.kernel.exceptions.MaxCommitRetryLimitReachedException import io.delta.kernel.test.{BaseMockJsonHandler, MockFileSystemClientUtils} +import io.delta.kernel.unitycatalog.InMemoryUCClient.TableData import io.delta.kernel.unitycatalog.metrics.UcCommitTelemetry import io.delta.kernel.utils.{CloseableIterable, CloseableIterator} +import io.delta.storage.commit.Commit import org.apache.hadoop.conf.Configuration import org.scalatest.funsuite.AnyFunSuite @@ -44,16 +48,16 @@ class UcCommitTelemetrySuite // CREATE -- v0.json val result0 = ucCatalogManagedClient - .buildCreateTableTransaction("ucTableId", tablePath, testSchema, "test-engine") + .buildCreateTableTransaction("testUcTableId", tablePath, testSchema, "test-engine") .build(engine) .commit(engine, CloseableIterable.emptyIterable() /* dataActions */ ) - initializeUCTable(ucClient, "ucTableId") + ucClient.insertTableDataAfterCreate("testUcTableId") // Verify CREATE metrics assert(reporter.reports.size === 1) val createReport = reporter.reports.head assert(createReport.operationType === "UcCommit") - assert(createReport.ucTableId === "ucTableId") + assert(createReport.ucTableId === "testUcTableId") assert(createReport.ucTablePath === tablePath) assert(createReport.commitVersion === 0) assert(createReport.commitType === CommitMetadata.CommitType.CATALOG_CREATE) @@ -78,7 +82,7 @@ class UcCommitTelemetrySuite assert(reporter.reports.size === 1) val writeReport = reporter.reports.head assert(writeReport.operationType === "UcCommit") - assert(writeReport.ucTableId === "ucTableId") + assert(writeReport.ucTableId === "testUcTableId") assert(writeReport.ucTablePath === tablePath) assert(writeReport.commitVersion === 1) assert(writeReport.commitType === CommitMetadata.CommitType.CATALOG_WRITE) @@ -118,7 +122,7 @@ class UcCommitTelemetrySuite // ===== WHEN ===== intercept[MaxCommitRetryLimitReachedException] { ucCatalogManagedClient - .buildCreateTableTransaction("ucTableId", tablePath, testSchema, "test-engine") + .buildCreateTableTransaction("testUcTableId", tablePath, testSchema, "test-engine") .withMaxRetries(0) .build(throwingEngineWithReporter) .commit(throwingEngineWithReporter, CloseableIterable.emptyIterable()) @@ -128,7 +132,7 @@ class UcCommitTelemetrySuite assert(reporter.reports.size === 1) val report = reporter.reports.head assert(report.operationType === "UcCommit") - assert(report.ucTableId === "ucTableId") + assert(report.ucTableId === "testUcTableId") assert(report.commitVersion === 0) assert(report.commitType === CommitMetadata.CommitType.CATALOG_CREATE) assert(report.exception.isPresent) @@ -146,7 +150,7 @@ class UcCommitTelemetrySuite newProtocolOpt = Optional.of(protocolWithCatalogManagedSupport), newMetadataOpt = Optional.of(basicPartitionedMetadata)) - val telemetry = new UcCommitTelemetry("ucTableId", "ucTablePath", commitMetadata) + val telemetry = new UcCommitTelemetry("testUcTableId", "ucTablePath", commitMetadata) telemetry.getMetricsCollector.totalCommitTimer.record(200) telemetry.getMetricsCollector.writeCommitFileTimer.record(200) // Note: commitToUcServerTimer is not invoked for CREATE operations @@ -158,7 +162,7 @@ class UcCommitTelemetrySuite s""" |{"operationType":"UcCommit", |"reportUUID":"${report.reportUUID}", - |"ucTableId":"ucTableId", + |"ucTableId":"testUcTableId", |"ucTablePath":"ucTablePath", |"commitVersion":0, |"commitType":"CATALOG_CREATE", @@ -173,7 +177,7 @@ class UcCommitTelemetrySuite test("JSON serialization: success + update (version >= 1)") { val commitMetadata = catalogManagedWriteCommitMetadata(version = 5) - val telemetry = new UcCommitTelemetry("ucTableId", "ucTablePath", commitMetadata) + val telemetry = new UcCommitTelemetry("testUcTableId", "ucTablePath", commitMetadata) telemetry.getMetricsCollector.totalCommitTimer.record(300) telemetry.getMetricsCollector.writeCommitFileTimer.record(200) telemetry.getMetricsCollector.commitToUcServerTimer.record(100) @@ -185,7 +189,7 @@ class UcCommitTelemetrySuite s""" |{"operationType":"UcCommit", |"reportUUID":"${report.reportUUID}", - |"ucTableId":"ucTableId", + |"ucTableId":"testUcTableId", |"ucTablePath":"ucTablePath", |"commitVersion":5, |"commitType":"CATALOG_WRITE", @@ -200,7 +204,7 @@ class UcCommitTelemetrySuite test("JSON serialization: fail + update") { val commitMetadata = catalogManagedWriteCommitMetadata(version = 3) - val telemetry = new UcCommitTelemetry("ucTableId", "ucTablePath", commitMetadata) + val telemetry = new UcCommitTelemetry("testUcTableId", "ucTablePath", commitMetadata) telemetry.getMetricsCollector.totalCommitTimer.record(300) telemetry.getMetricsCollector.writeCommitFileTimer.record(200) telemetry.getMetricsCollector.commitToUcServerTimer.record(100) @@ -213,7 +217,7 @@ class UcCommitTelemetrySuite s""" |{"operationType":"UcCommit", |"reportUUID":"${report.reportUUID}", - |"ucTableId":"ucTableId", + |"ucTableId":"testUcTableId", |"ucTablePath":"ucTablePath", |"commitVersion":3, |"commitType":"CATALOG_WRITE", diff --git a/kernel/unitycatalog/src/test/scala/io/delta/kernel/unitycatalog/UcLoadSnapshotTelemetrySuite.scala b/kernel/unitycatalog/src/test/scala/io/delta/kernel/unitycatalog/UcLoadSnapshotTelemetrySuite.scala index db4565a1095..338a12695b9 100644 --- a/kernel/unitycatalog/src/test/scala/io/delta/kernel/unitycatalog/UcLoadSnapshotTelemetrySuite.scala +++ b/kernel/unitycatalog/src/test/scala/io/delta/kernel/unitycatalog/UcLoadSnapshotTelemetrySuite.scala @@ -41,12 +41,11 @@ class UcLoadSnapshotTelemetrySuite ucClient: InMemoryUCClient, ucCatalogManagedClient: UCCatalogManagedClient): Long = { val result0 = ucCatalogManagedClient - .buildCreateTableTransaction("ucTableId", tablePath, testSchema, "test-engine") + .buildCreateTableTransaction("testUcTableId", tablePath, testSchema, "test-engine") .build(engine) .commit(engine, CloseableIterable.emptyIterable()) - val tableData = new InMemoryUCClient.TableData(-1, scala.collection.mutable.ArrayBuffer()) - ucClient.createTableIfNotExistsOrThrow("ucTableId", tableData) + ucClient.insertTableDataAfterCreate("testUcTableId") val result1 = result0.getPostCommitSnapshot.get() .buildUpdateTableTransaction("engineInfo", io.delta.kernel.Operation.MANUAL_UPDATE) @@ -78,7 +77,7 @@ class UcLoadSnapshotTelemetrySuite // ===== WHEN ===== ucCatalogManagedClient.loadSnapshot( engine, - "ucTableId", + "testUcTableId", tablePath, Optional.empty(), Optional.empty()) @@ -87,7 +86,7 @@ class UcLoadSnapshotTelemetrySuite assert(reporter.reports.size === 1) val report = reporter.reports.head assert(report.operationType === "UcLoadSnapshot") - assert(report.ucTableId === "ucTableId") + assert(report.ucTableId === "testUcTableId") assert(report.ucTablePath === tablePath) assert(report.versionOpt.isEmpty) assert(report.timestampOpt.isEmpty) @@ -121,7 +120,7 @@ class UcLoadSnapshotTelemetrySuite // Time travel to timestamp between v1 and v2 - should resolve to v1 ucCatalogManagedClient.loadSnapshot( engine, - "ucTableId", + "testUcTableId", tablePath, Optional.empty(), Optional.of(timestampBetweenV1AndV2)) @@ -130,7 +129,7 @@ class UcLoadSnapshotTelemetrySuite assert(reporter.reports.size === 1) val report = reporter.reports.head assert(report.operationType === "UcLoadSnapshot") - assert(report.ucTableId === "ucTableId") + assert(report.ucTableId === "testUcTableId") assert(report.ucTablePath === tablePath) assert(report.versionOpt.isEmpty) assert(report.timestampOpt.isPresent) @@ -181,7 +180,7 @@ class UcLoadSnapshotTelemetrySuite test("JSON serialization: success report for latest version") { val telemetry = new UcLoadSnapshotTelemetry( - "ucTableId", + "testUcTableId", "ucTablePath", Optional.empty(), // versionOpt Optional.empty() // timestampOpt @@ -200,7 +199,7 @@ class UcLoadSnapshotTelemetrySuite s""" |{"operationType":"UcLoadSnapshot", |"reportUUID":"${report.reportUUID}", - |"ucTableId":"ucTableId", + |"ucTableId":"testUcTableId", |"ucTablePath":"ucTablePath", |"versionOpt":null, |"timestampOpt":null, @@ -214,7 +213,7 @@ class UcLoadSnapshotTelemetrySuite test("JSON serialization: failure report") { val telemetry = new UcLoadSnapshotTelemetry( - "ucTableId", + "testUcTableId", "ucTablePath", Optional.empty(), // versionOpt Optional.of(123456789L) // timestampOpt @@ -231,7 +230,7 @@ class UcLoadSnapshotTelemetrySuite s""" |{"operationType":"UcLoadSnapshot", |"reportUUID":"${report.reportUUID}", - |"ucTableId":"ucTableId", + |"ucTableId":"testUcTableId", |"ucTablePath":"ucTablePath", |"versionOpt":null, |"timestampOpt":123456789, diff --git a/kernel/unitycatalog/src/test/scala/io/delta/kernel/unitycatalog/UcPublishTelemetrySuite.scala b/kernel/unitycatalog/src/test/scala/io/delta/kernel/unitycatalog/UcPublishTelemetrySuite.scala index f1f21e5914a..9e7ae6b5468 100644 --- a/kernel/unitycatalog/src/test/scala/io/delta/kernel/unitycatalog/UcPublishTelemetrySuite.scala +++ b/kernel/unitycatalog/src/test/scala/io/delta/kernel/unitycatalog/UcPublishTelemetrySuite.scala @@ -16,11 +16,15 @@ package io.delta.kernel.unitycatalog +import scala.collection.mutable.ArrayBuffer + import io.delta.kernel.Operation import io.delta.kernel.commit.PublishFailedException import io.delta.kernel.test.MockFileSystemClientUtils +import io.delta.kernel.unitycatalog.InMemoryUCClient.TableData import io.delta.kernel.unitycatalog.metrics.UcPublishTelemetry import io.delta.kernel.utils.CloseableIterable +import io.delta.storage.commit.Commit import org.scalatest.funsuite.AnyFunSuite @@ -38,11 +42,11 @@ class UcPublishTelemetrySuite val (ucClient, ucCatalogManagedClient) = createUCClientAndCatalogManagedClient() val result0 = ucCatalogManagedClient - .buildCreateTableTransaction("ucTableId", tablePath, testSchema, "test-engine") + .buildCreateTableTransaction("testUcTableId", tablePath, testSchema, "test-engine") .build(engine) .commit(engine, CloseableIterable.emptyIterable()) - initializeUCTable(ucClient, "ucTableId") + ucClient.insertTableDataAfterCreate("testUcTableId") val resultV1 = result0.getPostCommitSnapshot.get() .buildUpdateTableTransaction("engineInfo", Operation.MANUAL_UPDATE) @@ -65,7 +69,7 @@ class UcPublishTelemetrySuite val firstPublish = reporter.reports(0) assert(firstPublish.operationType === "UcPublish") - assert(firstPublish.ucTableId === "ucTableId") + assert(firstPublish.ucTableId === "testUcTableId") assert(firstPublish.snapshotVersion === 1) assert(firstPublish.numCommitsToPublish === 1) assert(firstPublish.metrics.numCommitsPublished === 1) @@ -73,7 +77,7 @@ class UcPublishTelemetrySuite val secondPublish = reporter.reports(1) assert(secondPublish.operationType === "UcPublish") - assert(secondPublish.ucTableId === "ucTableId") + assert(secondPublish.ucTableId === "testUcTableId") assert(secondPublish.snapshotVersion === 2) assert(secondPublish.numCommitsToPublish === 2) // Both 01.uuid.json and 02.uuid.json assert(secondPublish.metrics.numCommitsPublished === 1) // Only 02.uuid.json @@ -82,7 +86,7 @@ class UcPublishTelemetrySuite } test("JSON serialization: success report") { - val telemetry = new UcPublishTelemetry("ucTableId", "ucTablePath", 5, 3) + val telemetry = new UcPublishTelemetry("testUcTableId", "ucTablePath", 5, 3) val collector = telemetry.getMetricsCollector collector.totalPublishTimer.record(500) collector.incrementCommitsPublished() @@ -96,7 +100,7 @@ class UcPublishTelemetrySuite s""" |{"operationType":"UcPublish", |"reportUUID":"${report.reportUUID}", - |"ucTableId":"ucTableId", + |"ucTableId":"testUcTableId", |"ucTablePath":"ucTablePath", |"snapshotVersion":5, |"numCommitsToPublish":3, @@ -109,7 +113,7 @@ class UcPublishTelemetrySuite } test("JSON serialization: failure report") { - val telemetry = new UcPublishTelemetry("ucTableId", "ucTablePath", 3, 2) + val telemetry = new UcPublishTelemetry("testUcTableId", "ucTablePath", 3, 2) val collector = telemetry.getMetricsCollector collector.totalPublishTimer.record(300) collector.incrementCommitsPublished() @@ -122,7 +126,7 @@ class UcPublishTelemetrySuite s""" |{"operationType":"UcPublish", |"reportUUID":"${report.reportUUID}", - |"ucTableId":"ucTableId", + |"ucTableId":"testUcTableId", |"ucTablePath":"ucTablePath", |"snapshotVersion":3, |"numCommitsToPublish":2, From 15503c8948daf1ed27c69caa33ac802986e10ddd Mon Sep 17 00:00:00 2001 From: Scott Sandre Date: Thu, 4 Dec 2025 11:22:21 -0800 Subject: [PATCH 2/6] [Kernel] Revert catalogManaged feature rename (#5584) Use this [link](https://github.com/delta-io/delta/pull/5584/files) to review incremental changes. - [**stack/kernel_revert_catalogManaged_rename**](https://github.com/delta-io/delta/pull/5584) [[Files changed](https://github.com/delta-io/delta/pull/5584/files)] --------- Revert catalogManaged feature rename (cherry picked from commit a7e9d62c4cc6e0df9f16bae7c6373123da14c408) --- .../kernel/internal/tablefeatures/TableFeatures.java | 2 +- ...terCompatV1MetadataValidatorAndUpdaterSuite.scala | 2 +- ...terCompatV3MetadataValidatorAndUpdaterSuite.scala | 2 +- .../internal/tablefeatures/TableFeaturesSuite.scala | 12 ++++++------ .../_delta_log/00000000000000000000.json | 2 +- .../test/resources/catalog-owned-preview/info.txt | 2 +- .../DeltaTableWritesTransactionBuilderV2Suite.scala | 5 ++++- .../kernel/defaults/DirectoryCreationSuite.scala | 4 ++-- .../kernel/defaults/IcebergWriterCompatV1Suite.scala | 2 +- .../CatalogManagedPropertyValidationSuite.scala | 7 ++++--- .../unitycatalog/UCCatalogManagedClientSuite.scala | 4 +++- 11 files changed, 25 insertions(+), 19 deletions(-) diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/tablefeatures/TableFeatures.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/tablefeatures/TableFeatures.java index a3e6e1fd7d3..2caae11b3bb 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/tablefeatures/TableFeatures.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/tablefeatures/TableFeatures.java @@ -83,7 +83,7 @@ public boolean hasKernelWriteSupport(Metadata metadata) { } public static final TableFeature CATALOG_MANAGED_RW_FEATURE = - new CatalogManagedFeatureBase("catalogManaged"); + new CatalogManagedFeatureBase("catalogOwned-preview"); private static class CatalogManagedFeatureBase extends TableFeature.ReaderWriterFeature { CatalogManagedFeatureBase(String featureName) { diff --git a/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/icebergcompat/IcebergWriterCompatV1MetadataValidatorAndUpdaterSuite.scala b/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/icebergcompat/IcebergWriterCompatV1MetadataValidatorAndUpdaterSuite.scala index 9d0c35103de..b168b8df484 100644 --- a/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/icebergcompat/IcebergWriterCompatV1MetadataValidatorAndUpdaterSuite.scala +++ b/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/icebergcompat/IcebergWriterCompatV1MetadataValidatorAndUpdaterSuite.scala @@ -271,7 +271,7 @@ class IcebergWriterCompatV1MetadataValidatorAndUpdaterSuite "typeWidening", "typeWidening-preview", "timestampNtz", - "catalogManaged") + "catalogOwned-preview") val protocol = new Protocol(3, 7, readerFeatures.asJava, writerFeatures.asJava) val metadata = getCompatEnabledMetadata(cmTestSchema()) validateAndUpdateIcebergWriterCompatV1Metadata(true, metadata, protocol) diff --git a/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/icebergcompat/IcebergWriterCompatV3MetadataValidatorAndUpdaterSuite.scala b/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/icebergcompat/IcebergWriterCompatV3MetadataValidatorAndUpdaterSuite.scala index 8995e6b2c61..aa16687d33d 100644 --- a/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/icebergcompat/IcebergWriterCompatV3MetadataValidatorAndUpdaterSuite.scala +++ b/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/icebergcompat/IcebergWriterCompatV3MetadataValidatorAndUpdaterSuite.scala @@ -385,7 +385,7 @@ class IcebergWriterCompatV3MetadataValidatorAndUpdaterSuite "variantShredding-preview", "icebergCompatV2", "icebergWriterCompatV1", - "catalogManaged") + "catalogOwned-preview") val protocol = new Protocol(3, 7, readerFeatures.asJava, writerFeatures.asJava) val metadata = getCompatEnabledMetadata(cmTestSchema()) validateAndUpdateIcebergWriterCompatV3Metadata(true, metadata, protocol) diff --git a/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/tablefeatures/TableFeaturesSuite.scala b/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/tablefeatures/TableFeaturesSuite.scala index 590235dd7a8..c51e18a6c40 100644 --- a/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/tablefeatures/TableFeaturesSuite.scala +++ b/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/tablefeatures/TableFeaturesSuite.scala @@ -40,7 +40,7 @@ class TableFeaturesSuite extends AnyFunSuite { // Tests for [[TableFeature]] implementations // ///////////////////////////////////////////////////////////////////////////////////////////////// val readerWriterFeatures = Seq( - "catalogManaged", + "catalogOwned-preview", "columnMapping", "deletionVectors", "timestampNtz", @@ -201,7 +201,7 @@ class TableFeaturesSuite extends AnyFunSuite { "domainMetadata", "vacuumProtocolCheck", "clustering", - "catalogManaged", + "catalogOwned-preview", "allowColumnDefaults").foreach { feature => test(s"doesn't support auto enable by metadata: $feature") { @@ -234,7 +234,7 @@ class TableFeaturesSuite extends AnyFunSuite { .collect(toList()).asScala val expected = Seq( - "catalogManaged", + "catalogOwned-preview", "columnMapping", "v2Checkpoint", "variantType", @@ -258,7 +258,7 @@ class TableFeaturesSuite extends AnyFunSuite { // are writable because the metadata has not been set the info that // these features are enabled val expected = Seq( - "catalogManaged", + "catalogOwned-preview", "columnMapping", "allowColumnDefaults", "v2Checkpoint", @@ -323,7 +323,7 @@ class TableFeaturesSuite extends AnyFunSuite { // Reads: Supported table features represented as readerFeatures in the protocol Seq( - "catalogManaged", + "catalogOwned-preview", "variantType", "variantType-preview", "variantShredding-preview", @@ -380,7 +380,7 @@ class TableFeaturesSuite extends AnyFunSuite { checkWriteSupported( "validateKernelCanWriteToTable: protocol 7 with catalogManaged", - new Protocol(3, 7, singleton("catalogManaged"), singleton("catalogManaged")), + new Protocol(3, 7, singleton("catalogOwned-preview"), singleton("catalogOwned-preview")), testMetadata()) checkWriteUnsupported( diff --git a/kernel/kernel-defaults/src/test/resources/catalog-owned-preview/_delta_log/00000000000000000000.json b/kernel/kernel-defaults/src/test/resources/catalog-owned-preview/_delta_log/00000000000000000000.json index 8bf3a475a8c..a7ec1654633 100644 --- a/kernel/kernel-defaults/src/test/resources/catalog-owned-preview/_delta_log/00000000000000000000.json +++ b/kernel/kernel-defaults/src/test/resources/catalog-owned-preview/_delta_log/00000000000000000000.json @@ -1,3 +1,3 @@ {"commitInfo":{"inCommitTimestamp":1749830855993,"timestamp":1749830855992,"operation":"CREATE TABLE","operationParameters":{"partitionBy":"[\"part1\"]","clusterBy":"[]","description":null,"isManaged":"false","properties":"{}"},"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{},"engineInfo":"Apache-Spark/4.0.0 Delta-Lake/4.0.0","txnId":"d108f896-9662-4eda-b4de-444a99850aa8"}} {"metaData":{"id":"64dcd182-b3b4-4ee0-88e0-63c159a4121c","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"part1\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"col1\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":["part1"],"configuration":{"delta.enableInCommitTimestamps":"true"},"createdTime":1749830855646}} -{"protocol":{"minReaderVersion":3,"minWriterVersion":7,"readerFeatures":["catalogManaged"],"writerFeatures":["catalogManaged","inCommitTimestamp","invariants","appendOnly"]}} +{"protocol":{"minReaderVersion":3,"minWriterVersion":7,"readerFeatures":["catalogOwned-preview"],"writerFeatures":["catalogOwned-preview","inCommitTimestamp","invariants","appendOnly"]}} diff --git a/kernel/kernel-defaults/src/test/resources/catalog-owned-preview/info.txt b/kernel/kernel-defaults/src/test/resources/catalog-owned-preview/info.txt index 00a2625c928..07e21cd47a0 100644 --- a/kernel/kernel-defaults/src/test/resources/catalog-owned-preview/info.txt +++ b/kernel/kernel-defaults/src/test/resources/catalog-owned-preview/info.txt @@ -45,7 +45,7 @@ FROM ( ) """) -# Then, add `"readerFeatures":["catalogManaged"]` to the _delta_log/001.json protocol +# Then, add `"readerFeatures":["catalogOwned-preview"]` to the _delta_log/001.json protocol # Then, for commits version $v in [1, 2] move _delta_log/$v.json into # _delta_log/_staged_commits/$v.$uuid.json, where $uuid is taken from the commitInfo.txnId in # $v.json \ No newline at end of file diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWritesTransactionBuilderV2Suite.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWritesTransactionBuilderV2Suite.scala index 0bc5a8dc467..01a91d28edc 100644 --- a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWritesTransactionBuilderV2Suite.scala +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWritesTransactionBuilderV2Suite.scala @@ -164,7 +164,10 @@ class DeltaTableWritesTransactionBuilderV2Suite extends DeltaTableWritesSuite // Now create it again but with catalogManaged supported. This should NOT throw. TableManager.buildCreateTableTransaction(tablePath, testSchema, testEngineInfo) - .withTableProperties(Map("delta.feature.catalogManaged" -> "supported").asJava) + .withTableProperties( + Map( + TableFeatures.CATALOG_MANAGED_RW_FEATURE.getTableFeatureSupportKey + -> "supported").asJava) .build(engine) } } diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DirectoryCreationSuite.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DirectoryCreationSuite.scala index 41762717938..e4398f9ed01 100644 --- a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DirectoryCreationSuite.scala +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DirectoryCreationSuite.scala @@ -48,7 +48,7 @@ class DirectoryCreationSuite extends AnyFunSuite with WriteUtils with ActionUtil TableManager .buildCreateTableTransaction(tablePath, testSchema, "engineInfo") .withTableProperties(Map( - "delta.feature.catalogManaged" -> "supported", + TableFeatures.CATALOG_MANAGED_RW_FEATURE.getTableFeatureSupportKey -> "supported", "delta.checkpointPolicy" -> "v2").asJava) .withCommitter(committerUsingPutIfAbsent) .build(engine) @@ -79,7 +79,7 @@ class DirectoryCreationSuite extends AnyFunSuite with WriteUtils with ActionUtil .build(engine) .buildUpdateTableTransaction("engineInfo", Operation.MANUAL_UPDATE) .withTablePropertiesAdded(Map( - "delta.feature.catalogManaged" -> "supported", + TableFeatures.CATALOG_MANAGED_RW_FEATURE.getTableFeatureSupportKey -> "supported", "delta.checkpointPolicy" -> "v2").asJava) .build(engine) .commit(engine, emptyIterable()) diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/IcebergWriterCompatV1Suite.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/IcebergWriterCompatV1Suite.scala index 27930f6c36b..02c64d0046b 100644 --- a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/IcebergWriterCompatV1Suite.scala +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/IcebergWriterCompatV1Suite.scala @@ -55,7 +55,7 @@ class CatalogManagedWithIcebergWriterCompatV1Suite .withCommitter(committerUsingPutIfAbsent) .withTableProperties( Map( - "delta.feature.catalogManaged" -> "supported", + TableFeatures.CATALOG_MANAGED_RW_FEATURE.getTableFeatureSupportKey -> "supported", TableConfig.ICEBERG_WRITER_COMPAT_V1_ENABLED.getKey -> "true").asJava) .build(engine) diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/catalogManaged/CatalogManagedPropertyValidationSuite.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/catalogManaged/CatalogManagedPropertyValidationSuite.scala index 3bfa0ea9e95..3a5cc3e6fb1 100644 --- a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/catalogManaged/CatalogManagedPropertyValidationSuite.scala +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/catalogManaged/CatalogManagedPropertyValidationSuite.scala @@ -32,7 +32,8 @@ import org.scalatest.funsuite.AnyFunSuite class CatalogManagedPropertyValidationSuite extends AnyFunSuite with TestUtils { - val catalogManagedFeaturePropMap = Map("delta.feature.catalogManaged" -> "supported") + val catalogManagedFeaturePropMap = Map( + TableFeatures.CATALOG_MANAGED_RW_FEATURE.getTableFeatureSupportKey -> "supported") val validRequiredCatalogPropMap = Map( customCatalogCommitter.REQUIRED_PROPERTY_KEY -> customCatalogCommitter.REQUIRED_PROPERTY_VALUE) val invalidRequiredCatalogPropMap = Map( @@ -65,7 +66,7 @@ class CatalogManagedPropertyValidationSuite extends AnyFunSuite with TestUtils { testName = "ILLEGAL CREATE: set catalogManaged=supported and explicitly disable ICT => THROW", operationType = "CREATE", transactionProperties = Map( - "delta.feature.catalogManaged" -> "supported", + TableFeatures.CATALOG_MANAGED_RW_FEATURE.getTableFeatureSupportKey -> "supported", "delta.enableInCommitTimestamps" -> "false"), expectedSuccess = false, expectedExceptionMessage = @@ -92,7 +93,7 @@ class CatalogManagedPropertyValidationSuite extends AnyFunSuite with TestUtils { operationType = "UPDATE", initialTableProperties = Map.empty, transactionProperties = Map( - "delta.feature.catalogManaged" -> "supported", + TableFeatures.CATALOG_MANAGED_RW_FEATURE.getTableFeatureSupportKey -> "supported", "delta.enableInCommitTimestamps" -> "false"), expectedSuccess = false, expectedExceptionMessage = diff --git a/kernel/unitycatalog/src/test/scala/io/delta/kernel/unitycatalog/UCCatalogManagedClientSuite.scala b/kernel/unitycatalog/src/test/scala/io/delta/kernel/unitycatalog/UCCatalogManagedClientSuite.scala index 7a490caac39..8dcdcf19b94 100644 --- a/kernel/unitycatalog/src/test/scala/io/delta/kernel/unitycatalog/UCCatalogManagedClientSuite.scala +++ b/kernel/unitycatalog/src/test/scala/io/delta/kernel/unitycatalog/UCCatalogManagedClientSuite.scala @@ -22,6 +22,7 @@ import scala.collection.JavaConverters._ import io.delta.kernel.exceptions.KernelException import io.delta.kernel.internal.CreateTableTransactionBuilderImpl +import io.delta.kernel.internal.tablefeatures.TableFeatures import io.delta.kernel.internal.tablefeatures.TableFeatures.{CATALOG_MANAGED_RW_FEATURE, TABLE_FEATURES_MIN_READER_VERSION, TABLE_FEATURES_MIN_WRITER_VERSION} import io.delta.storage.commit.uccommitcoordinator.InvalidTargetTableException @@ -281,7 +282,8 @@ class UCCatalogManagedClientSuite extends AnyFunSuite with UCCatalogManagedTestU // ===== THEN ===== val builderTableProperties = createTableTxnBuilder.getTablePropertiesOpt.get() - assert(builderTableProperties.get("delta.feature.catalogManaged") == "supported") + assert(builderTableProperties + .get(TableFeatures.CATALOG_MANAGED_RW_FEATURE.getTableFeatureSupportKey) == "supported") assert(builderTableProperties.get("io.unitycatalog.tableId") == testUcTableId) assert(builderTableProperties.get("foo") == "bar") From 08a0af6d2d9f712f72c4fb9654fd121ae686936f Mon Sep 17 00:00:00 2001 From: Allison Portis Date: Thu, 4 Dec 2025 16:04:08 -0800 Subject: [PATCH 3/6] [Kernel][Kernel UC] Make startBoundary required for CommitRange (#5537) #### Which Delta project/connector is this regarding? - [ ] Spark - [ ] Standalone - [ ] Flink - [X] Kernel - [ ] Other (fill in here) ## Description Currently when building a commit range start boundary is optional and defaults to 0 when not provided. However, there is no good use-case during which the start version is not provided so this PR makes it required. The prior behavior can always be implemented by the caller by providing version=0 start boundary. ## How was this patch tested? Updates/adds tests. (cherry picked from commit 9357c6f04022514a786ecd5c61a24397a3fa3939) --- .../snapshot/PathBasedSnapshotManager.java | 4 +- .../java/io/delta/kernel/CommitRange.java | 5 +- .../io/delta/kernel/CommitRangeBuilder.java | 16 +- .../java/io/delta/kernel/TableManager.java | 11 +- .../commitrange/CommitRangeBuilderImpl.java | 22 +-- .../commitrange/CommitRangeFactory.java | 20 +-- .../internal/commitrange/CommitRangeImpl.java | 10 +- .../internal/CommitRangeBuilderSuite.scala | 161 ++++++++---------- .../kernel/defaults/TableChangesSuite.scala | 61 +++---- .../CatalogManagedE2EReadSuite.scala | 15 +- .../unitycatalog/UCCatalogManagedClient.java | 40 +++-- ...CatalogManagedClientCommitRangeSuite.scala | 31 +++- 12 files changed, 198 insertions(+), 198 deletions(-) diff --git a/kernel-spark/src/main/java/io/delta/kernel/spark/snapshot/PathBasedSnapshotManager.java b/kernel-spark/src/main/java/io/delta/kernel/spark/snapshot/PathBasedSnapshotManager.java index 3b8cf0c48b4..6a26a21ac8e 100644 --- a/kernel-spark/src/main/java/io/delta/kernel/spark/snapshot/PathBasedSnapshotManager.java +++ b/kernel-spark/src/main/java/io/delta/kernel/spark/snapshot/PathBasedSnapshotManager.java @@ -131,8 +131,8 @@ public void checkVersionExists(long version, boolean mustBeRecreatable, boolean @Override public CommitRange getTableChanges(Engine engine, long startVersion, Optional endVersion) { CommitRangeBuilder builder = - TableManager.loadCommitRange(tablePath) - .withStartBoundary(CommitRangeBuilder.CommitBoundary.atVersion(startVersion)); + TableManager.loadCommitRange( + tablePath, CommitRangeBuilder.CommitBoundary.atVersion(startVersion)); if (endVersion.isPresent()) { builder = diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/CommitRange.java b/kernel/kernel-api/src/main/java/io/delta/kernel/CommitRange.java index f7df04d5d36..f06a98ad034 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/CommitRange.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/CommitRange.java @@ -59,10 +59,9 @@ public interface CommitRange { *

The boundary indicates whether the range was defined using a specific version number or a * timestamp. * - * @return an {@link Optional} containing the start boundary, or empty if the range was created - * with default start parameters (version 0) + * @return the start boundary for this commit range */ - Optional getQueryStartBoundary(); + CommitRangeBuilder.CommitBoundary getQueryStartBoundary(); /** * Returns the original query boundary used to define the end boundary of this commit range, if diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/CommitRangeBuilder.java b/kernel/kernel-api/src/main/java/io/delta/kernel/CommitRangeBuilder.java index 5a56fda932d..43bd3d84f5e 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/CommitRangeBuilder.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/CommitRangeBuilder.java @@ -29,25 +29,15 @@ * A builder for creating {@link CommitRange} instances that define a contiguous range of commits in * a Delta Lake table. * - *

If no start specification is provided, the range defaults to starting at version 0. If no end - * specification is provided, the range defaults to the latest available version. + *

The start boundary is required and provided via {@link TableManager#loadCommitRange(String, + * CommitBoundary)}. If no end specification is provided, the range defaults to the latest available + * version. * * @since 3.4.0 */ @Experimental public interface CommitRangeBuilder { - /** - * Configures the builder to start the commit range at a specific version or timestamp. - * - *

If not specified, the commit range will default to starting at version 0. - * - * @param startBoundary the boundary specification for the start of the commit range, must not be - * null - * @return this builder instance configured with the specified start boundary - */ - CommitRangeBuilder withStartBoundary(CommitBoundary startBoundary); - /** * Configures the builder to end the commit range at a specific version or timestamp. * diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/TableManager.java b/kernel/kernel-api/src/main/java/io/delta/kernel/TableManager.java index 2d18eafe42c..2bc4b089a59 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/TableManager.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/TableManager.java @@ -64,14 +64,17 @@ static CreateTableTransactionBuilder buildCreateTableTransaction( /** * Creates a builder for loading a CommitRange at a given path. * - *

The returned builder can be configured with start version or timestamp and an end version or - * timestamp, and with additional metadata to optimize the loading process. + *

The returned builder can be configured with an end version or timestamp, and with additional + * metadata to optimize the loading process. * * @param path the file system path to the Delta table + * @param startBoundary the boundary specification for the start of the commit range, must not be + * null * @return a {@link CommitRangeBuilder} that can be used to load a {@link CommitRange} at the * given path */ - static CommitRangeBuilder loadCommitRange(String path) { - return new CommitRangeBuilderImpl(path); + static CommitRangeBuilder loadCommitRange( + String path, CommitRangeBuilder.CommitBoundary startBoundary) { + return new CommitRangeBuilderImpl(path, startBoundary); } } diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/commitrange/CommitRangeBuilderImpl.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/commitrange/CommitRangeBuilderImpl.java index 3d234cd99cc..a96b7f2a187 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/commitrange/CommitRangeBuilderImpl.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/commitrange/CommitRangeBuilderImpl.java @@ -38,31 +38,26 @@ public class CommitRangeBuilderImpl implements CommitRangeBuilder { public static class Context { public final String unresolvedPath; - public Optional startBoundaryOpt = Optional.empty(); + public final CommitBoundary startBoundary; public Optional endBoundaryOpt = Optional.empty(); public List logDatas = Collections.emptyList(); - public Context(String unresolvedPath) { + public Context(String unresolvedPath, CommitBoundary startBoundary) { this.unresolvedPath = requireNonNull(unresolvedPath, "unresolvedPath is null"); + this.startBoundary = requireNonNull(startBoundary, "startBoundary is null"); } } private final Context ctx; - public CommitRangeBuilderImpl(String unresolvedPath) { - ctx = new Context(unresolvedPath); + public CommitRangeBuilderImpl(String unresolvedPath, CommitBoundary startBoundary) { + ctx = new Context(unresolvedPath, startBoundary); } /////////////////////////////////////// // Public CommitRangeBuilder Methods // /////////////////////////////////////// - @Override - public CommitRangeBuilderImpl withStartBoundary(CommitBoundary startBoundary) { - ctx.startBoundaryOpt = Optional.of(requireNonNull(startBoundary, "startBoundary is null")); - return this; - } - @Override public CommitRangeBuilderImpl withEndBoundary(CommitBoundary endBoundary) { ctx.endBoundaryOpt = Optional.of(requireNonNull(endBoundary, "endBoundary is null")); @@ -86,9 +81,10 @@ public CommitRange build(Engine engine) { //////////////////////////// private void validateInputOnBuild() { - // Validate that start boundary is less than or equal to end boundary if both are provided - if (ctx.startBoundaryOpt.isPresent() && ctx.endBoundaryOpt.isPresent()) { - CommitBoundary startBoundary = ctx.startBoundaryOpt.get(); + // Validate that start boundary is less than or equal to end boundary if end boundary is + // provided + if (ctx.endBoundaryOpt.isPresent()) { + CommitBoundary startBoundary = ctx.startBoundary; CommitBoundary endBoundary = ctx.endBoundaryOpt.get(); // If both are version-based, compare versions diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/commitrange/CommitRangeFactory.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/commitrange/CommitRangeFactory.java index 3467a393b3c..7507e3827de 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/commitrange/CommitRangeFactory.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/commitrange/CommitRangeFactory.java @@ -69,28 +69,22 @@ CommitRangeImpl create(Engine engine) { logger.info("{}: Resolved end-boundary to the latest version {}", tablePath, endVersion); } return new CommitRangeImpl( - tablePath, ctx.startBoundaryOpt, ctx.endBoundaryOpt, startVersion, endVersion, deltas); + tablePath, ctx.startBoundary, ctx.endBoundaryOpt, startVersion, endVersion, deltas); } private long resolveStartVersion(Engine engine, List catalogCommits) { - if (!ctx.startBoundaryOpt.isPresent()) { - // Default to version 0 if no start boundary is provided - return 0L; - } - CommitRangeBuilder.CommitBoundary startBoundary = ctx.startBoundaryOpt.get(); - - if (startBoundary.isVersion()) { - return startBoundary.getVersion(); + if (ctx.startBoundary.isVersion()) { + return ctx.startBoundary.getVersion(); } else { logger.info( "{}: Trying to resolve start-boundary timestamp {} to version", tablePath, - startBoundary.getTimestamp()); + ctx.startBoundary.getTimestamp()); return DeltaHistoryManager.getVersionAtOrAfterTimestamp( engine, logPath, - startBoundary.getTimestamp(), - (SnapshotImpl) startBoundary.getLatestSnapshot(), + ctx.startBoundary.getTimestamp(), + (SnapshotImpl) ctx.startBoundary.getLatestSnapshot(), catalogCommits); } } @@ -143,7 +137,7 @@ private void logResolvedVersions(long startVersion, Optional endVersionOpt tablePath, startVersion, endVersionOpt, - ctx.startBoundaryOpt, + ctx.startBoundary, ctx.endBoundaryOpt); } diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/commitrange/CommitRangeImpl.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/commitrange/CommitRangeImpl.java index fbeed3ccc43..bbb891732f1 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/commitrange/CommitRangeImpl.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/commitrange/CommitRangeImpl.java @@ -41,7 +41,7 @@ public class CommitRangeImpl implements CommitRange { private final Path dataPath; - private final Optional startBoundaryOpt; + private final CommitRangeBuilder.CommitBoundary startBoundary; private final Optional endBoundaryOpt; private final long startVersion; @@ -50,7 +50,7 @@ public class CommitRangeImpl implements CommitRange { public CommitRangeImpl( Path dataPath, - Optional startBoundaryOpt, + CommitRangeBuilder.CommitBoundary startBoundary, Optional endBoundaryOpt, long startVersion, long endVersion, @@ -59,7 +59,7 @@ public CommitRangeImpl( checkArgument( deltas.size() == endVersion - startVersion + 1, "deltaFiles size must match size of range"); this.dataPath = requireNonNull(dataPath, "dataPath cannot be null"); - this.startBoundaryOpt = requireNonNull(startBoundaryOpt, "startSpecOpt cannot be null"); + this.startBoundary = requireNonNull(startBoundary, "startBoundary cannot be null"); this.endBoundaryOpt = requireNonNull(endBoundaryOpt, "endSpecOpt cannot be null"); this.startVersion = startVersion; this.endVersion = endVersion; @@ -81,8 +81,8 @@ public long getEndVersion() { } @Override - public Optional getQueryStartBoundary() { - return startBoundaryOpt; + public CommitRangeBuilder.CommitBoundary getQueryStartBoundary() { + return startBoundary; } @Override diff --git a/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/CommitRangeBuilderSuite.scala b/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/CommitRangeBuilderSuite.scala index ae71d7fe26f..e4c3a0860be 100644 --- a/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/CommitRangeBuilderSuite.scala +++ b/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/CommitRangeBuilderSuite.scala @@ -39,28 +39,27 @@ class CommitRangeBuilderSuite extends AnyFunSuite with MockFileSystemClientUtils private def checkQueryBoundaries( commitRange: CommitRange, - startVersion: Option[Long], - endVersion: Option[Long], - startTimestamp: Option[Long], - endTimestamp: Option[Long]): Unit = { - def assertBoundaryVersion(boundary: Optional[CommitBoundary], version: Long) = { - assert(boundary.isPresent && boundary.get.isVersion && boundary.get.getVersion == version) + startBoundary: RequiredBoundaryDef, + endBoundary: BoundaryDef): Unit = { + def assertBoundaryVersion(boundary: CommitBoundary, version: Long) = { + assert(boundary.isVersion && boundary.getVersion == version) } - def assertBoundaryTimestamp(boundary: Optional[CommitBoundary], timestamp: Long) = { - assert( - boundary.isPresent && boundary.get.isTimestamp && boundary.get.getTimestamp == timestamp) + def assertBoundaryTimestamp(boundary: CommitBoundary, timestamp: Long) = { + assert(boundary.isTimestamp && boundary.getTimestamp == timestamp) } - if (startVersion.nonEmpty) { - assertBoundaryVersion(commitRange.getQueryStartBoundary, startVersion.get) - } else if (startTimestamp.nonEmpty) { - assertBoundaryTimestamp(commitRange.getQueryStartBoundary, startTimestamp.get) + if (startBoundary.version.nonEmpty) { + assertBoundaryVersion(commitRange.getQueryStartBoundary, startBoundary.version.get) + } else if (startBoundary.timestamp.nonEmpty) { + assertBoundaryTimestamp(commitRange.getQueryStartBoundary, startBoundary.timestamp.get) } else { - assert(!commitRange.getQueryStartBoundary.isPresent) + throw new IllegalStateException("RequiredBoundaryDef must have either timestamp or version") } - if (endVersion.nonEmpty) { - assertBoundaryVersion(commitRange.getQueryEndBoundary, endVersion.get) - } else if (endTimestamp.nonEmpty) { - assertBoundaryTimestamp(commitRange.getQueryEndBoundary, endTimestamp.get) + if (endBoundary.version.nonEmpty) { + assert(commitRange.getQueryEndBoundary.isPresent) + assertBoundaryVersion(commitRange.getQueryEndBoundary.get, endBoundary.version.get) + } else if (endBoundary.timestamp.nonEmpty) { + assert(commitRange.getQueryEndBoundary.isPresent) + assertBoundaryTimestamp(commitRange.getQueryEndBoundary.get, endBoundary.timestamp.get) } else { assert(!commitRange.getQueryEndBoundary.isPresent) } @@ -69,10 +68,8 @@ class CommitRangeBuilderSuite extends AnyFunSuite with MockFileSystemClientUtils private def buildCommitRange( engine: Engine, fileList: Seq[FileStatus], - startVersion: Option[Long] = None, - endVersion: Option[Long] = None, - startTimestamp: Option[Long] = None, - endTimestamp: Option[Long] = None, + startBoundary: RequiredBoundaryDef, + endBoundary: BoundaryDef, logData: Option[Seq[ParsedLogData]] = None, ictEnablementInfo: Option[(Long, Long)] = None): CommitRange = { def getVersionFromFS(fs: FileStatus): Long = FileNames.getFileVersion(new Path(fs.getPath)) @@ -83,23 +80,26 @@ class CommitRangeBuilderSuite extends AnyFunSuite with MockFileSystemClientUtils .filter(fs => FileNames.isStagedDeltaFile(fs.getPath)) .find(getVersionFromFS(_) == latestVersion) - var commitRangeBuilder = TableManager.loadCommitRange(dataPath.toString) - startVersion.foreach { v => - commitRangeBuilder = commitRangeBuilder.withStartBoundary(CommitBoundary.atVersion(v)) - } - endVersion.foreach { v => - commitRangeBuilder = commitRangeBuilder.withEndBoundary(CommitBoundary.atVersion(v)) - } lazy val mockLatestSnapshot = getMockSnapshot( dataPath, latestVersion, ictEnablementInfoOpt = ictEnablementInfo, deltaFileAtEndVersion = deltaFileAtEndVersion) - startTimestamp.foreach { v => - commitRangeBuilder = commitRangeBuilder.withStartBoundary( - CommitBoundary.atTimestamp(v, mockLatestSnapshot)) + + // Determine the start boundary + val startBound = if (startBoundary.version.isDefined) { + CommitBoundary.atVersion(startBoundary.version.get) + } else if (startBoundary.timestamp.isDefined) { + CommitBoundary.atTimestamp(startBoundary.timestamp.get, mockLatestSnapshot) + } else { + throw new IllegalStateException("RequiredBoundaryDef must have either timestamp or version") } - endTimestamp.foreach { v => + + var commitRangeBuilder = TableManager.loadCommitRange(dataPath.toString, startBound) + endBoundary.version.foreach { v => + commitRangeBuilder = commitRangeBuilder.withEndBoundary(CommitBoundary.atVersion(v)) + } + endBoundary.timestamp.foreach { v => commitRangeBuilder = commitRangeBuilder.withEndBoundary( CommitBoundary.atTimestamp(v, mockLatestSnapshot)) } @@ -113,20 +113,16 @@ class CommitRangeBuilderSuite extends AnyFunSuite with MockFileSystemClientUtils fileList: Seq[FileStatus], expectedStartVersion: Long, expectedEndVersion: Long, - startVersion: Option[Long] = None, - endVersion: Option[Long] = None, - startTimestamp: Option[Long] = None, - endTimestamp: Option[Long] = None): Unit = { + startBoundary: RequiredBoundaryDef, + endBoundary: BoundaryDef): Unit = { val commitRange = buildCommitRange( createMockFSListFromEngine(fileList), fileList, - startVersion, - endVersion, - startTimestamp, - endTimestamp) + startBoundary, + endBoundary) assert(commitRange.getStartVersion == expectedStartVersion) assert(commitRange.getEndVersion == expectedEndVersion) - checkQueryBoundaries(commitRange, startVersion, endVersion, startTimestamp, endTimestamp) + checkQueryBoundaries(commitRange, startBoundary, endBoundary) val expectedFileList = fileList .filter(fs => { val version = FileNames.getFileVersion(new Path(fs.getPath)) @@ -149,6 +145,17 @@ class CommitRangeBuilderSuite extends AnyFunSuite with MockFileSystemClientUtils def timestamp: Option[Long] = None } + /** + * Base class for boundary definitions that are NOT the default (i.e. are provided). + * + * At least one of `version` or `timestamp` must be defined in this case. + */ + private abstract class RequiredBoundaryDef( + expectedVersion: Long, + expectError: Boolean = false) extends BoundaryDef(expectedVersion, expectError) { + assert(version.isDefined || timestamp.isDefined) + } + /** * Version-based boundary definition. * @param versionValue the version to use as boundary @@ -156,7 +163,7 @@ class CommitRangeBuilderSuite extends AnyFunSuite with MockFileSystemClientUtils */ private case class VersionBoundaryDef( versionValue: Long, - expectsError: Boolean = false) extends BoundaryDef(versionValue, expectsError) { + expectsError: Boolean = false) extends RequiredBoundaryDef(versionValue, expectsError) { override def version: Option[Long] = Some(versionValue) } @@ -170,7 +177,7 @@ class CommitRangeBuilderSuite extends AnyFunSuite with MockFileSystemClientUtils private case class TimestampBoundaryDef( timestampValue: Long, resolvedVersion: Long, - expectsError: Boolean = false) extends BoundaryDef(resolvedVersion, expectsError) { + expectsError: Boolean = false) extends RequiredBoundaryDef(resolvedVersion, expectsError) { override def timestamp: Option[Long] = Some(timestampValue) } @@ -185,7 +192,7 @@ class CommitRangeBuilderSuite extends AnyFunSuite with MockFileSystemClientUtils expectsError: Boolean = false) extends BoundaryDef(resolvedVersion, expectsError) def getExpectedException( - startBoundary: BoundaryDef, + startBoundary: RequiredBoundaryDef, endBoundary: BoundaryDef): Option[(Class[_ <: Throwable], String)] = { // These two cases fail on CommitRangeBuilderImpl.validateInputOnBuild if (startBoundary.version.isDefined && endBoundary.version.isDefined) { @@ -233,7 +240,7 @@ class CommitRangeBuilderSuite extends AnyFunSuite with MockFileSystemClientUtils def testStartAndEndBoundaryCombinations( description: String, fileStatuses: Seq[FileStatus], - startBoundaries: Seq[BoundaryDef], + startBoundaries: Seq[RequiredBoundaryDef], endBoundaries: Seq[BoundaryDef]): Unit = { startBoundaries.foreach { startBound => endBoundaries.foreach { endBound => @@ -244,10 +251,8 @@ class CommitRangeBuilderSuite extends AnyFunSuite with MockFileSystemClientUtils buildCommitRange( createMockFSListFromEngine(fileStatuses), fileList = fileStatuses, - startVersion = startBound.version, - endVersion = endBound.version, - startTimestamp = startBound.timestamp, - endTimestamp = endBound.timestamp) + startBoundary = startBound, + endBoundary = endBound) } assert( expectedException.get._1.isInstance(e), @@ -258,10 +263,8 @@ class CommitRangeBuilderSuite extends AnyFunSuite with MockFileSystemClientUtils fileList = fileStatuses, expectedStartVersion = startBound.expectedVersion, expectedEndVersion = endBound.expectedVersion, - startVersion = startBound.version, - endVersion = endBound.version, - startTimestamp = startBound.timestamp, - endTimestamp = endBound.timestamp) + startBoundary = startBound, + endBoundary = endBound) } } } @@ -283,7 +286,6 @@ class CommitRangeBuilderSuite extends AnyFunSuite with MockFileSystemClientUtils TimestampBoundaryDef(-100, resolvedVersion = 0L), // at v0 TimestampBoundaryDef(-75, resolvedVersion = 1), // between v0, v1 TimestampBoundaryDef(-50, resolvedVersion = 1), // at v1 - DefaultBoundaryDef(resolvedVersion = 0), // default to 0 TimestampBoundaryDef(-40, resolvedVersion = -1, expectsError = true), // after v1 VersionBoundaryDef(2L, expectsError = true) // version DNE ), @@ -310,7 +312,6 @@ class CommitRangeBuilderSuite extends AnyFunSuite with MockFileSystemClientUtils TimestampBoundaryDef(0, resolvedVersion = 0L), // at v0 TimestampBoundaryDef(5, resolvedVersion = 1), // between v0, v1 TimestampBoundaryDef(10, resolvedVersion = 1), // at v1 - DefaultBoundaryDef(resolvedVersion = 0), // default to 0 TimestampBoundaryDef(11, resolvedVersion = -1, expectsError = true), // after v1 VersionBoundaryDef(2L, expectsError = true) // version DNE ), @@ -342,7 +343,6 @@ class CommitRangeBuilderSuite extends AnyFunSuite with MockFileSystemClientUtils TimestampBoundaryDef(115, resolvedVersion = 12L), // between v11, v12 TimestampBoundaryDef(120, resolvedVersion = 12L), // at v12 TimestampBoundaryDef(125, resolvedVersion = -1, expectsError = true), // after v12 - DefaultBoundaryDef(resolvedVersion = 0, expectsError = true), // default to 0 VersionBoundaryDef(9L, expectsError = true), // version DNE VersionBoundaryDef(13L, expectsError = true) // version DNE ), @@ -370,7 +370,6 @@ class CommitRangeBuilderSuite extends AnyFunSuite with MockFileSystemClientUtils VersionBoundaryDef(10L), TimestampBoundaryDef(99L, resolvedVersion = 10L), // before v10 TimestampBoundaryDef(100L, resolvedVersion = 10L), // at v10 - DefaultBoundaryDef(resolvedVersion = 0, expectsError = true), // default to 0 TimestampBoundaryDef(101L, resolvedVersion = -1, expectsError = true), // after v10 VersionBoundaryDef(1L, expectsError = true) // version DNE ), @@ -389,27 +388,23 @@ class CommitRangeBuilderSuite extends AnyFunSuite with MockFileSystemClientUtils fileList: Seq[FileStatus], logData: Seq[ParsedLogData], versionToICT: Map[Long, Long], - startBound: BoundaryDef, + startBound: RequiredBoundaryDef, endBound: BoundaryDef, expectedFileList: Seq[FileStatus]): Unit = { // Create mock engine with ICT reading support val commitRange = buildCommitRange( createMockFSAndJsonEngineForICT(fileList, versionToICT), fileList, - startVersion = startBound.version, - endVersion = endBound.version, - startTimestamp = startBound.timestamp, - endTimestamp = endBound.timestamp, + startBoundary = startBound, + endBoundary = endBound, Some(logData), ictEnablementInfo = Some((0, 0))) assert(commitRange.getStartVersion == startBound.expectedVersion) assert(commitRange.getEndVersion == endBound.expectedVersion) checkQueryBoundaries( commitRange, - startBound.version, - endBound.version, - startBound.timestamp, - endBound.timestamp) + startBound, + endBound) assert(expectedFileList.toSet == commitRange.asInstanceOf[CommitRangeImpl].getDeltaFiles.asScala.toSet) } @@ -424,7 +419,7 @@ class CommitRangeBuilderSuite extends AnyFunSuite with MockFileSystemClientUtils expectedFileList: (Long, Long) => Seq[FileStatus], logData: Seq[ParsedLogData], versionToICT: Map[Long, Long], - startBoundaries: Seq[BoundaryDef], + startBoundaries: Seq[RequiredBoundaryDef], endBoundaries: Seq[BoundaryDef]): Unit = { startBoundaries.foreach { startBound => endBoundaries.foreach { endBound => @@ -435,10 +430,8 @@ class CommitRangeBuilderSuite extends AnyFunSuite with MockFileSystemClientUtils buildCommitRange( createMockFSAndJsonEngineForICT(fileStatuses, versionToICT), fileStatuses, - startVersion = startBound.version, - endVersion = endBound.version, - startTimestamp = startBound.timestamp, - endTimestamp = endBound.timestamp, + startBoundary = startBound, + endBoundary = endBound, Some(logData), ictEnablementInfo = Some((0, 0))) } @@ -470,7 +463,6 @@ class CommitRangeBuilderSuite extends AnyFunSuite with MockFileSystemClientUtils val startBoundaries = Seq( // V0 (first published commit) VersionBoundaryDef(0), - DefaultBoundaryDef(0), TimestampBoundaryDef(5L, 0), // before V0 TimestampBoundaryDef(50L, 0L), // exactly at V0 // V1 (last published commit) @@ -537,7 +529,6 @@ class CommitRangeBuilderSuite extends AnyFunSuite with MockFileSystemClientUtils val startBoundaries = Seq( // V0 VersionBoundaryDef(0), - DefaultBoundaryDef(0), TimestampBoundaryDef(5L, 0), // before V0 TimestampBoundaryDef(50L, 0L), // exactly at V0 // V1 @@ -596,7 +587,6 @@ class CommitRangeBuilderSuite extends AnyFunSuite with MockFileSystemClientUtils val startBoundaries = Seq( // V0 VersionBoundaryDef(0), - DefaultBoundaryDef(0), TimestampBoundaryDef(5L, 0), // before V0 TimestampBoundaryDef(50L, 0L), // exactly at V0 // V1 @@ -642,7 +632,6 @@ class CommitRangeBuilderSuite extends AnyFunSuite with MockFileSystemClientUtils val startBoundaries = Seq( // V0 VersionBoundaryDef(0), - DefaultBoundaryDef(0), TimestampBoundaryDef(5L, 0), // before V0 TimestampBoundaryDef(50L, 0L), // exactly at V0 // V1 @@ -690,7 +679,6 @@ class CommitRangeBuilderSuite extends AnyFunSuite with MockFileSystemClientUtils val startBoundaries = Seq( // V0 VersionBoundaryDef(0), - DefaultBoundaryDef(0), TimestampBoundaryDef(5L, 0), // before V0 TimestampBoundaryDef(50L, 0L), // exactly at V0 // V1 @@ -743,7 +731,6 @@ class CommitRangeBuilderSuite extends AnyFunSuite with MockFileSystemClientUtils val startBoundaries = Seq( // V0 VersionBoundaryDef(0), - DefaultBoundaryDef(0), TimestampBoundaryDef(5L, 0), // before V0 TimestampBoundaryDef(50L, 0L), // exactly at V0 // V1 @@ -796,8 +783,8 @@ class CommitRangeBuilderSuite extends AnyFunSuite with MockFileSystemClientUtils buildCommitRange( createMockFSAndJsonEngineForICT(fileList, versionToICT), fileList, - startVersion = Some(0), - endVersion = Some(3), + startBoundary = VersionBoundaryDef(0), + endBoundary = VersionBoundaryDef(3), logData = Some(parsedLogData), ictEnablementInfo = Some((0, 0))) } @@ -817,8 +804,8 @@ class CommitRangeBuilderSuite extends AnyFunSuite with MockFileSystemClientUtils buildCommitRange( createMockFSAndJsonEngineForICT(fileList, versionToICT), fileList, - startVersion = Some(0), - endVersion = Some(2), + startBoundary = VersionBoundaryDef(0), + endBoundary = VersionBoundaryDef(2), logData = Some(parsedLogData), ictEnablementInfo = Some((0, 0))) } @@ -833,8 +820,8 @@ class CommitRangeBuilderSuite extends AnyFunSuite with MockFileSystemClientUtils buildCommitRange( createMockFSListFromEngine(publishedDeltaFiles), publishedDeltaFiles, - startVersion = Some(0), - endVersion = Some(3)) + startBoundary = VersionBoundaryDef(0), + endBoundary = VersionBoundaryDef(3)) } assert(e.getMessage.contains( "Missing delta files: versions are not contiguous: ([0, 2, 3])")) @@ -846,7 +833,7 @@ class CommitRangeBuilderSuite extends AnyFunSuite with MockFileSystemClientUtils val suffix = s"- type=${parsedLogData.getGroupByCategoryClass.toString}" test(s"withLogData: non-staged-ratified-commit throws IllegalArgumentException $suffix") { val builder = TableManager - .loadCommitRange(dataPath.toString) + .loadCommitRange(dataPath.toString, CommitBoundary.atVersion(0)) .withLogData(Collections.singletonList(parsedLogData)) val exMsg = intercept[IllegalArgumentException] { @@ -859,7 +846,7 @@ class CommitRangeBuilderSuite extends AnyFunSuite with MockFileSystemClientUtils test("withLogData: non-contiguous input throws IllegalArgumentException") { val exMsg = intercept[IllegalArgumentException] { - TableManager.loadCommitRange(dataPath.toString) + TableManager.loadCommitRange(dataPath.toString, CommitBoundary.atVersion(0)) .withLogData(parsedRatifiedStagedCommits(Seq(0, 2)).toList.asJava) .build(mockEngine()) }.getMessage @@ -869,7 +856,7 @@ class CommitRangeBuilderSuite extends AnyFunSuite with MockFileSystemClientUtils test("withLogData: non-sorted input throws IllegalArgumentException") { val exMsg = intercept[IllegalArgumentException] { - TableManager.loadCommitRange(dataPath.toString) + TableManager.loadCommitRange(dataPath.toString, CommitBoundary.atVersion(0)) .withLogData(parsedRatifiedStagedCommits(Seq(2, 1, 0)).toList.asJava) .build(mockEngine()) }.getMessage diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/TableChangesSuite.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/TableChangesSuite.scala index 2137b68f405..923af1ea1ee 100644 --- a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/TableChangesSuite.scala +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/TableChangesSuite.scala @@ -65,10 +65,10 @@ class CommitRangeTableChangesSuite extends TableChangesSuite { startVersion: Long, endVersion: Long, actionSet: Set[DeltaAction]): Seq[ColumnarBatch] = { - val commitRange = TableManager.loadCommitRange(tablePath) - .withStartBoundary(CommitBoundary.atVersion(startVersion)) - .withEndBoundary(CommitBoundary.atVersion(endVersion)) - .build(defaultEngine) + val commitRange = + TableManager.loadCommitRange(tablePath, CommitBoundary.atVersion(startVersion)) + .withEndBoundary(CommitBoundary.atVersion(endVersion)) + .build(defaultEngine) commitRange.getActions( defaultEngine, getTableManagerAdapter.getSnapshotAtVersion(defaultEngine, tablePath, startVersion), @@ -80,10 +80,10 @@ class CommitRangeTableChangesSuite extends TableChangesSuite { (0 to 4).foreach { _ => spark.range(10).write.format("delta").mode("append").save(tempDir.getCanonicalPath) } - val commitRange = TableManager.loadCommitRange(tempDir.getCanonicalPath) - .withStartBoundary(CommitBoundary.atVersion(0)) - .withEndBoundary(CommitBoundary.atVersion(4)) - .build(defaultEngine) + val commitRange = + TableManager.loadCommitRange(tempDir.getCanonicalPath, CommitBoundary.atVersion(0)) + .withEndBoundary(CommitBoundary.atVersion(4)) + .build(defaultEngine) val e = intercept[IllegalArgumentException] { commitRange.getActions( defaultEngine, @@ -94,13 +94,14 @@ class CommitRangeTableChangesSuite extends TableChangesSuite { } } - test("No boundaries provided uses defaults") { + test("No end boundary provided defaults to latest") { withTempDir { tempDir => (0 to 4).foreach { _ => spark.range(10).write.format("delta").mode("append").save(tempDir.getCanonicalPath) } - val commitRange = TableManager.loadCommitRange(tempDir.getCanonicalPath) - .build(defaultEngine) + val commitRange = + TableManager.loadCommitRange(tempDir.getCanonicalPath, CommitBoundary.atVersion(0)) + .build(defaultEngine) assert(commitRange.getStartVersion == 0 && commitRange.getEndVersion == 4) // Just double check the changes are correct testGetChangesVsSpark(tempDir.getCanonicalPath, 0, 4, FULL_ACTION_SET) @@ -129,14 +130,17 @@ class CommitRangeTableChangesSuite extends TableChangesSuite { val latestSnapshot = getTableManagerAdapter.getSnapshotAtLatest(defaultEngine, tablePath) def checkStartBoundary(timestamp: Long, expectedVersion: Long): Unit = { - assert(TableManager.loadCommitRange(tablePath) - .withStartBoundary(CommitBoundary.atTimestamp(timestamp, latestSnapshot)) - .build(defaultEngine).getStartVersion == expectedVersion) + assert(TableManager.loadCommitRange( + tablePath, + CommitBoundary.atTimestamp(timestamp, latestSnapshot)) + .build(defaultEngine) + .getStartVersion == expectedVersion) } def checkEndBoundary(timestamp: Long, expectedVersion: Long): Unit = { - assert(TableManager.loadCommitRange(tablePath) + assert(TableManager.loadCommitRange(tablePath, CommitBoundary.atVersion(0)) .withEndBoundary(CommitBoundary.atTimestamp(timestamp, latestSnapshot)) - .build(defaultEngine).getEndVersion == expectedVersion) + .build(defaultEngine) + .getEndVersion == expectedVersion) } // startTimestamp is before the earliest available version @@ -206,14 +210,17 @@ class CommitRangeTableChangesSuite extends TableChangesSuite { val latestSnapshot = getTableManagerAdapter.getSnapshotAtLatest(defaultEngine, tablePath) def checkStartBoundary(timestamp: Long, expectedVersion: Long): Unit = { - assert(TableManager.loadCommitRange(tablePath) - .withStartBoundary(CommitBoundary.atTimestamp(timestamp, latestSnapshot)) - .build(defaultEngine).getStartVersion == expectedVersion) + assert(TableManager.loadCommitRange( + tablePath, + CommitBoundary.atTimestamp(timestamp, latestSnapshot)) + .build(defaultEngine) + .getStartVersion == expectedVersion) } def checkEndBoundary(timestamp: Long, expectedVersion: Long): Unit = { - assert(TableManager.loadCommitRange(tablePath) + assert(TableManager.loadCommitRange(tablePath, CommitBoundary.atVersion(0)) .withEndBoundary(CommitBoundary.atTimestamp(timestamp, latestSnapshot)) - .build(defaultEngine).getEndVersion == expectedVersion) + .build(defaultEngine) + .getEndVersion == expectedVersion) } // Test that timestamp resolution is done using ICT. Since the file modification times for @@ -283,8 +290,7 @@ class CommitRangeTableChangesSuite extends TableChangesSuite { delta1.setLastModified(2000) delta2.setLastModified(3000) - val commitRange = TableManager.loadCommitRange(tablePath) - .withStartBoundary(CommitBoundary.atVersion(0)) + val commitRange = TableManager.loadCommitRange(tablePath, CommitBoundary.atVersion(0)) .withEndBoundary(CommitBoundary.atVersion(2)) .build(defaultEngine) @@ -355,8 +361,7 @@ class CommitRangeTableChangesSuite extends TableChangesSuite { data = immutable.Seq(Map.empty[String, Literal] -> dataBatches1), clock = clock) - val commitRange = TableManager.loadCommitRange(tablePath) - .withStartBoundary(CommitBoundary.atVersion(0)) + val commitRange = TableManager.loadCommitRange(tablePath, CommitBoundary.atVersion(0)) .withEndBoundary(CommitBoundary.atVersion(2)) .build(defaultEngine) @@ -404,8 +409,7 @@ class CommitRangeTableChangesSuite extends TableChangesSuite { spark.range(10).write.format("delta").mode("append").save(tablePath) } - val commitRange = TableManager.loadCommitRange(tablePath) - .withStartBoundary(CommitBoundary.atVersion(0)) + val commitRange = TableManager.loadCommitRange(tablePath, CommitBoundary.atVersion(0)) .withEndBoundary(CommitBoundary.atVersion(2)) .build(defaultEngine) @@ -484,8 +488,7 @@ class CommitRangeTableChangesSuite extends TableChangesSuite { val txn = deltaLog.startTransaction() txn.commitUnsafe(tablePath, 1) - val commitRange = TableManager.loadCommitRange(tablePath) - .withStartBoundary(CommitBoundary.atVersion(0)) + val commitRange = TableManager.loadCommitRange(tablePath, CommitBoundary.atVersion(0)) .withEndBoundary(CommitBoundary.atVersion(1)) .build(defaultEngine) diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/catalogManaged/CatalogManagedE2EReadSuite.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/catalogManaged/CatalogManagedE2EReadSuite.scala index 3d411af7f5f..576d14e7cf4 100644 --- a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/catalogManaged/CatalogManagedE2EReadSuite.scala +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/catalogManaged/CatalogManagedE2EReadSuite.scala @@ -231,16 +231,19 @@ class CatalogManagedE2EReadSuite extends AnyFunSuite .build(defaultEngine) def checkStartBoundary(timestamp: Long, expectedVersion: Long): Unit = { - assert(TableManager.loadCommitRange(tablePath) + assert(TableManager.loadCommitRange( + tablePath, + CommitBoundary.atTimestamp(timestamp, latestSnapshot)) .withLogData(parsedLogData.asJava) - .withStartBoundary(CommitBoundary.atTimestamp(timestamp, latestSnapshot)) - .build(defaultEngine).getStartVersion == expectedVersion) + .build(defaultEngine) + .getStartVersion == expectedVersion) } def checkEndBoundary(timestamp: Long, expectedVersion: Long): Unit = { - assert(TableManager.loadCommitRange(tablePath) + assert(TableManager.loadCommitRange(tablePath, CommitBoundary.atVersion(0)) .withLogData(parsedLogData.asJava) .withEndBoundary(CommitBoundary.atTimestamp(timestamp, latestSnapshot)) - .build(defaultEngine).getEndVersion == expectedVersion) + .build(defaultEngine) + .getEndVersion == expectedVersion) } // startTimestamp is before V0 @@ -284,7 +287,7 @@ class CatalogManagedE2EReadSuite extends AnyFunSuite // Verify the fileList in the CommitRange val commitRange = TableManager - .loadCommitRange(tablePath) + .loadCommitRange(tablePath, CommitBoundary.atVersion(0)) .withLogData(parsedLogData.asJava) .build(defaultEngine) diff --git a/kernel/unitycatalog/src/main/java/io/delta/kernel/unitycatalog/UCCatalogManagedClient.java b/kernel/unitycatalog/src/main/java/io/delta/kernel/unitycatalog/UCCatalogManagedClient.java index 435b4081c01..b6ae78818a3 100644 --- a/kernel/unitycatalog/src/main/java/io/delta/kernel/unitycatalog/UCCatalogManagedClient.java +++ b/kernel/unitycatalog/src/main/java/io/delta/kernel/unitycatalog/UCCatalogManagedClient.java @@ -212,21 +212,25 @@ public CreateTableTransactionBuilder buildCreateTableTransaction( } /** - * Loads a Kernel {@link CommitRange} for the provided boundaries. If no start boundary is - * provided, defaults to version 0. If no end boundary is provided, defaults to the latest - * version. + * Loads a Kernel {@link CommitRange} for the provided boundaries. If no end boundary is provided, + * defaults to the latest version. + * + *

A start boundary is required and must be specified using either {@code startVersionOpt} or + * {@code startTimestampOpt}. These parameters are mutually exclusive and at least one must be + * provided. * * @param engine The Delta Kernel {@link Engine} to use for loading the table. * @param ucTableId The Unity Catalog table ID, which is a unique identifier for the table in UC. * @param tablePath The path to the Delta table in the underlying storage system. * @param startVersionOpt The optional start version boundary. This must be mutually exclusive - * with startTimestampOpt. + * with startTimestampOpt. Either this or startTimestampOpt must be provided. * @param startTimestampOpt The optional start timestamp boundary. This must be mutually exclusive - * with startVersionOpt. + * with startVersionOpt. Either this or startVersionOpt must be provided. * @param endVersionOpt The optional end version boundary. This must be mutually exclusive with * endTimestampOpt. * @param endTimestampOpt The optional end timestamp boundary. This must be mutually exclusive * with endVersionOpt. + * @throws IllegalArgumentException if neither startVersionOpt nor startTimestampOpt is provided * @throws IllegalArgumentException if both startVersionOpt and startTimestampOpt are defined * @throws IllegalArgumentException if both endVersionOpt and endTimestampOpt are defined * @throws IllegalArgumentException if either startVersionOpt or endVersionOpt is provided and is @@ -253,6 +257,9 @@ public CommitRange loadCommitRange( checkArgument( !endVersionOpt.isPresent() || !endTimestampOpt.isPresent(), "Cannot provide both an end timestamp and start version"); + checkArgument( + startVersionOpt.isPresent() || startTimestampOpt.isPresent(), + "Must provide either a start timestamp or start version"); if (startVersionOpt.isPresent() && endVersionOpt.isPresent()) { checkArgument( startVersionOpt.get() <= endVersionOpt.get(), @@ -290,19 +297,20 @@ public CommitRange loadCommitRange( "TableManager.loadCommitRange", ucTableId, () -> { - CommitRangeBuilder commitRangeBuilder = TableManager.loadCommitRange(tablePath); - + // Determine the start boundary (required - validated above) + CommitRangeBuilder.CommitBoundary startBoundary; if (startVersionOpt.isPresent()) { - commitRangeBuilder = - commitRangeBuilder.withStartBoundary( - CommitRangeBuilder.CommitBoundary.atVersion(startVersionOpt.get())); - } - if (startTimestampOpt.isPresent()) { - commitRangeBuilder = - commitRangeBuilder.withStartBoundary( - CommitRangeBuilder.CommitBoundary.atTimestamp( - startTimestampOpt.get(), latestSnapshot.get())); + startBoundary = CommitRangeBuilder.CommitBoundary.atVersion(startVersionOpt.get()); + } else { + // startTimestampOpt must be present due to validation above + startBoundary = + CommitRangeBuilder.CommitBoundary.atTimestamp( + startTimestampOpt.get(), latestSnapshot.get()); } + + CommitRangeBuilder commitRangeBuilder = + TableManager.loadCommitRange(tablePath, startBoundary); + if (endVersionOpt.isPresent()) { commitRangeBuilder = commitRangeBuilder.withEndBoundary( diff --git a/kernel/unitycatalog/src/test/scala/io/delta/kernel/unitycatalog/UCCatalogManagedClientCommitRangeSuite.scala b/kernel/unitycatalog/src/test/scala/io/delta/kernel/unitycatalog/UCCatalogManagedClientCommitRangeSuite.scala index 42303a2286d..54046489e89 100644 --- a/kernel/unitycatalog/src/test/scala/io/delta/kernel/unitycatalog/UCCatalogManagedClientCommitRangeSuite.scala +++ b/kernel/unitycatalog/src/test/scala/io/delta/kernel/unitycatalog/UCCatalogManagedClientCommitRangeSuite.scala @@ -52,9 +52,6 @@ class UCCatalogManagedClientCommitRangeSuite extends AnyFunSuite with UCCatalogM startTimestampOpt: Optional[java.lang.Long] = emptyLongOpt, endVersionOpt: Optional[java.lang.Long] = emptyLongOpt, endTimestampOpt: Optional[java.lang.Long] = emptyLongOpt): Unit = { - require(!startVersionOpt.isPresent || !startTimestampOpt.isPresent) - require(!endVersionOpt.isPresent || !endTimestampOpt.isPresent) - withUCClientAndTestTable { (ucClient, tablePath, _) => val ucCatalogManagedClient = new UCCatalogManagedClient(ucClient) val commitRange = loadCommitRange( @@ -125,6 +122,7 @@ class UCCatalogManagedClientCommitRangeSuite extends AnyFunSuite with UCCatalogM val ex = intercept[IllegalArgumentException] { loadCommitRange( ucCatalogManagedClient, + startVersionOpt = Optional.of(0L), endVersionOpt = Optional.of(2L), endTimestampOpt = Optional.of(200L)) } @@ -180,14 +178,28 @@ class UCCatalogManagedClientCommitRangeSuite extends AnyFunSuite with UCCatalogM testLoadCommitRange( expectedStartVersion = 0, expectedEndVersion = 2, + startVersionOpt = Optional.of(0L), endVersionOpt = Optional.of(9L)) } assert(ex.getMessage.contains( "Cannot load commit range with end version 9 as the latest version ratified by UC is 2")) } - test("loadCommitRange loads with default boundaries (start=0, end=latest)") { - testLoadCommitRange(expectedStartVersion = 0, expectedEndVersion = 2) + test("loadCommitRange throws when no start boundary is provided") { + val ucClient = new InMemoryUCClient("ucMetastoreId") + val ucCatalogManagedClient = new UCCatalogManagedClient(ucClient) + + val ex = intercept[IllegalArgumentException] { + loadCommitRange(ucCatalogManagedClient) + } + assert(ex.getMessage.contains("Must provide either a start timestamp or start version")) + } + + test("loadCommitRange loads with default end boundary -> latest") { + testLoadCommitRange( + expectedStartVersion = 0, + expectedEndVersion = 2, + startVersionOpt = Optional.of(0L)) } test("loadCommitRange loads with version boundaries") { @@ -249,6 +261,7 @@ class UCCatalogManagedClientCommitRangeSuite extends AnyFunSuite with UCCatalogM testLoadCommitRange( expectedStartVersion = 1L, expectedEndVersion = 1L, + startVersionOpt = Optional.of(0), endTimestampOpt = Optional.of(v0Ts - 10)) } } @@ -260,7 +273,8 @@ class UCCatalogManagedClientCommitRangeSuite extends AnyFunSuite with UCCatalogM val ex = intercept[RuntimeException] { loadCommitRange( ucCatalogManagedClient, - ucTableId = "nonExistentTableId") + ucTableId = "nonExistentTableId", + startVersionOpt = Optional.of(0L)) } assert(ex.getCause.isInstanceOf[InvalidTargetTableException]) } @@ -268,7 +282,10 @@ class UCCatalogManagedClientCommitRangeSuite extends AnyFunSuite with UCCatalogM test("loadCommitRange for new table when UC maxRatifiedVersion is 0") { val tablePath = getTestResourceFilePath("catalog-owned-preview") val ucCatalogManagedClient = createUCCatalogManagedClientForTableAfterCreate() - val commitRange = loadCommitRange(ucCatalogManagedClient, tablePath = tablePath) + val commitRange = loadCommitRange( + ucCatalogManagedClient, + tablePath = tablePath, + startVersionOpt = Optional.of(0L)) assert(commitRange.getStartVersion == 0) assert(commitRange.getEndVersion == 0) From b740dd2175868d1b203749e5356a895b532bcd72 Mon Sep 17 00:00:00 2001 From: Scott Sandre Date: Fri, 5 Dec 2025 16:08:51 -0800 Subject: [PATCH 4/6] [Kernel] Add support for setting `delta.dataSkippingStatsColumns`, though its behavior is unimplemented (#5642) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## 🥞 Stacked PR Use this [link](https://github.com/delta-io/delta/pull/5642/files) to review incremental changes. - [**stack/kernel_data_skipping_stats_columns**](https://github.com/delta-io/delta/pull/5642) [[Files changed](https://github.com/delta-io/delta/pull/5642/files)] --------- #### Which Delta project/connector is this regarding? - [ ] Spark - [ ] Standalone - [ ] Flink - [X] Kernel - [ ] Other (fill in here) ## Description This PR allows Kernel to set the `delta.dataSkippingStatsColumns` table property. Note that (1) its value is not validated/checked, and (2) its desired behavior is not yet enforced. ## How was this patch tested? Fairly trivial to add a new table property. Added a new UT nonetheless. ## Does this PR introduce _any_ user-facing changes? No. (cherry picked from commit e1fe75510f1ab3a4cfa85459fb2a21bf55cb7d9c) --- .../io/delta/kernel/internal/TableConfig.java | 36 ++++++++++++++++++- .../defaults/TablePropertiesSuite.scala | 29 +++++++++++++++ 2 files changed, 64 insertions(+), 1 deletion(-) diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableConfig.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableConfig.java index 9a7dfcf2d31..ffb695526eb 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableConfig.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableConfig.java @@ -19,8 +19,8 @@ import io.delta.kernel.exceptions.UnknownConfigurationException; import io.delta.kernel.internal.actions.Metadata; import io.delta.kernel.internal.tablefeatures.TableFeatures; -import io.delta.kernel.internal.util.*; import io.delta.kernel.internal.util.ColumnMapping.ColumnMappingMode; +import io.delta.kernel.internal.util.IntervalParserUtils; import java.util.*; import java.util.function.Function; import java.util.function.Predicate; @@ -277,6 +277,37 @@ public class TableConfig { "needs to be larger than or equal to -1.", true); + /** + * IMPORTANT: This table property is recognized but is not yet validated, enforced, or implemented + * by Kernel. + * + *

The names of specific columns to collect stats on for data skipping. If present, it takes + * precedence over {@link #DATA_SKIPPING_NUM_INDEXED_COLS}, and the system will only collect stats + * for columns that exactly match those specified. If a nested column is specified, the system + * will collect stats for all leaf fields of that column. If a non-existent column is specified, + * it will be ignored. Updating this config does not trigger stats re-collection, but redefines + * the stats schema of the table, i.e., it will change the behavior of future stats collection + * (e.g., in append and OPTIMIZE) as well as data skipping (e.g., the column stats not mentioned + * by this config will be ignored even if they exist). + * + *

The value is a comma-separated list of case-insensitive column identifiers. Each column + * identifier can consist of letters, digits, and underscores. If a column identifier includes + * special characters, the column name should be enclosed in backticks (`) to escape the special + * characters. + * + *

A column identifier can refer to one of the following: the name of a non-struct column, the + * leaf field's name of a struct column, or the name of a struct column. When a struct column's + * name is specified, statistics for all its leaf fields will be collected. + */ + public static final TableConfig> DATA_SKIPPING_STATS_COLUMNS = + new TableConfig<>( + "delta.dataSkippingStatsColumns", + null, + v -> Optional.ofNullable(v), + value -> true, + "needs to be a comma-separated list of column identifiers.", + true); + /** * Table property that enables modifying the table in accordance with the Delta-Iceberg Writer * Compatibility V1 ({@code icebergCompatWriterV1}) protocol. @@ -398,6 +429,9 @@ public static class UniversalFormats { addConfig(this, MATERIALIZED_ROW_ID_COLUMN_NAME); addConfig(this, MATERIALIZED_ROW_COMMIT_VERSION_COLUMN_NAME); addConfig(this, VARIANT_SHREDDING_ENABLED); + + // The below configs do not yet have their behavior correctly implemented in Kernel. + addConfig(this, DATA_SKIPPING_STATS_COLUMNS); } }); diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/TablePropertiesSuite.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/TablePropertiesSuite.scala index 5c77aea83ee..3eadffb25fc 100644 --- a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/TablePropertiesSuite.scala +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/TablePropertiesSuite.scala @@ -333,4 +333,33 @@ trait TablePropertiesSuiteBase extends AnyFunSuite with AbstractWriteUtils { val metadata = getMetadata(defaultEngine, tablePath) assert(keys.forall(!metadata.getConfiguration.containsKey(_))) } + + val recognizedButUnimplementedProps = Seq( + ("delta.dataSkippingStatsColumns", "col1,col2,nested.field")) + + recognizedButUnimplementedProps.foreach { case (propKey, value) => + test(s"$propKey is allowed (but not implemented) - create table") { + withTempDir { tempFile => + val tablePath = tempFile.getAbsolutePath + createUpdateTableWithProps( + tablePath, + createTable = true, + propsAdded = Map(propKey -> value)) + assertHasProp(tablePath, Map(propKey -> value)) + } + } + + test(s"$propKey is allowed (but not implemented) - update table") { + withTempDir { tempFile => + val tablePath = tempFile.getAbsolutePath + createUpdateTableWithProps(tablePath, createTable = true) + + val updatedValue = s"${value}_updated" + createUpdateTableWithProps( + tablePath, + propsAdded = Map(propKey -> updatedValue)) + assertHasProp(tablePath, Map(propKey -> updatedValue)) + } + } + } } From 7ed1abc7c9096bf7491711f6a2cafcc153979ac6 Mon Sep 17 00:00:00 2001 From: Allison Portis Date: Tue, 9 Dec 2025 13:59:41 -0800 Subject: [PATCH 5/6] [Kernel] Fix log segment construction for ccv2 tables to read _last_checkpoint file for latest queries (#5663) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit #### Which Delta project/connector is this regarding? - [ ] Spark - [ ] Standalone - [ ] Flink - [X] Kernel - [ ] Other (fill in here) ## Description In https://github.com/delta-io/delta/commit/7a8c2428fc89e3b56f40ab689210f5e85ca82741 I missed removing prior code that pinned the time-travel version to maxCatalogVersion. This means we weren't actually reading _last_checkpoint file for latest queries. Since I only added unit tests for loadLogSegment this wasn't caught in tests. ## How was this patch tested? Adds a test. ✅ validated the unit test fails without the fix ## Does this PR introduce _any_ user-facing changes? No (cherry picked from commit 67405c8c16604a2629e6a2671205bc44d708987e) --- .../kernel/internal/table/SnapshotFactory.java | 17 ++++++++--------- .../CatalogManagedE2EReadSuite.scala | 18 ++++++++++++++++++ 2 files changed, 26 insertions(+), 9 deletions(-) diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/table/SnapshotFactory.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/table/SnapshotFactory.java index 336663ee581..09fec7e8a38 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/table/SnapshotFactory.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/table/SnapshotFactory.java @@ -183,8 +183,9 @@ SnapshotImpl create(Engine engine) { } private SnapshotImpl createSnapshot(Engine engine, SnapshotQueryContext snapshotCtx) { - final Optional versionToLoad = getTargetVersionToLoad(engine, snapshotCtx); - final Lazy lazyLogSegment = getLazyLogSegment(engine, snapshotCtx, versionToLoad); + final Optional timeTravelVersion = getTargetTimeTravelVersion(engine, snapshotCtx); + final Lazy lazyLogSegment = + getLazyLogSegment(engine, snapshotCtx, timeTravelVersion); final Lazy> lazyCrcInfo = createLazyChecksumFileLoaderWithMetrics( engine, lazyLogSegment, snapshotCtx.getSnapshotMetrics()); @@ -216,7 +217,7 @@ private SnapshotImpl createSnapshot(Engine engine, SnapshotQueryContext snapshot return new SnapshotImpl( tablePath, - versionToLoad.orElseGet(() -> lazyLogSegment.get().getVersion()), + timeTravelVersion.orElseGet(() -> lazyLogSegment.get().getVersion()), lazyLogSegment, logReplay, protocol, @@ -238,7 +239,7 @@ private SnapshotQueryContext getSnapshotQueryContext() { } private Lazy getLazyLogSegment( - Engine engine, SnapshotQueryContext snapshotCtx, Optional versionToLoad) { + Engine engine, SnapshotQueryContext snapshotCtx, Optional timeTravelVersion) { return new Lazy<>( () -> { final LogSegment logSegment = @@ -249,7 +250,7 @@ private Lazy getLazyLogSegment( () -> new SnapshotManager(tablePath) .getLogSegmentForVersion( - engine, versionToLoad, ctx.logDatas, ctx.maxCatalogVersion)); + engine, timeTravelVersion, ctx.logDatas, ctx.maxCatalogVersion)); snapshotCtx.setResolvedVersion(logSegment.getVersion()); snapshotCtx.setCheckpointVersion(logSegment.getCheckpointVersionOpt()); @@ -258,7 +259,8 @@ private Lazy getLazyLogSegment( }); } - private Optional getTargetVersionToLoad(Engine engine, SnapshotQueryContext snapshotCtx) { + private Optional getTargetTimeTravelVersion( + Engine engine, SnapshotQueryContext snapshotCtx) { if (ctx.timestampQueryContextOpt.isPresent()) { return Optional.of( resolveTimestampToSnapshotVersion( @@ -269,9 +271,6 @@ private Optional getTargetVersionToLoad(Engine engine, SnapshotQueryContex ctx.logDatas)); } else if (ctx.versionOpt.isPresent()) { return ctx.versionOpt; - } else if (ctx.maxCatalogVersion.isPresent()) { - // For latest queries for catalogManaged tables we want to load the maxCatalogVersion - return ctx.maxCatalogVersion; } return Optional.empty(); } diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/catalogManaged/CatalogManagedE2EReadSuite.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/catalogManaged/CatalogManagedE2EReadSuite.scala index 576d14e7cf4..6a868a1fb81 100644 --- a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/catalogManaged/CatalogManagedE2EReadSuite.scala +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/catalogManaged/CatalogManagedE2EReadSuite.scala @@ -20,6 +20,7 @@ import scala.collection.JavaConverters._ import io.delta.kernel.{SnapshotBuilder, TableManager} import io.delta.kernel.CommitRangeBuilder.CommitBoundary +import io.delta.kernel.defaults.engine.hadoopio.HadoopFileIO import io.delta.kernel.defaults.utils.{TestRow, TestUtilsWithTableManagerAPIs, WriteUtilsWithV2Builders} import io.delta.kernel.exceptions.KernelException import io.delta.kernel.internal.DeltaHistoryManager @@ -31,6 +32,7 @@ import io.delta.kernel.internal.tablefeatures.TableFeatures.{isCatalogManagedSup import io.delta.kernel.internal.util.FileNames import io.delta.kernel.utils.FileStatus +import org.apache.hadoop.conf.Configuration import org.scalatest.funsuite.AnyFunSuite /** @@ -406,4 +408,20 @@ class CatalogManagedE2EReadSuite extends AnyFunSuite assert(e.getMessage.contains("Cannot load table version 2")) } } + + test("for latest queries we read the _last_checkpoint file") { + withCatalogOwnedPreviewTestTable { (resourceTablePath, resourceLogData) => + // It doesn't matter if the checkpoint actually exists; we just want to check that during + // log segment building we try to read _last_checkpoint + import io.delta.kernel.defaults.MetricsEngine + val engine = new MetricsEngine(new HadoopFileIO(new Configuration())) + val snapshot = TableManager + .loadSnapshot(resourceTablePath) + .withMaxCatalogVersion(2) + .withLogData(resourceLogData.asJava) + .build(engine) + assert(snapshot.getVersion == 2) + assert(engine.getJsonHandler.getLastCheckpointMetadataReadCalls == 1) + } + } } From f02b53792d2cfb5c9fd8154a9063dc710d9f1e47 Mon Sep 17 00:00:00 2001 From: Scott Sandre Date: Tue, 9 Dec 2025 14:14:24 -0800 Subject: [PATCH 6/6] [Kernel] Rename catalogOwned-preview to catalogManaged (#5644) Use this [link](https://github.com/delta-io/delta/pull/5644/files) to review incremental changes. - [**stack/kernel_catalog_managed_feature_rename_2**](https://github.com/delta-io/delta/pull/5644) [[Files changed](https://github.com/delta-io/delta/pull/5644/files)] --------- Title --- .../kernel/internal/tablefeatures/TableFeatures.java | 2 +- ...terCompatV1MetadataValidatorAndUpdaterSuite.scala | 2 +- ...terCompatV3MetadataValidatorAndUpdaterSuite.scala | 2 +- .../internal/tablefeatures/TableFeaturesSuite.scala | 12 ++++++------ .../_delta_log/00000000000000000000.json | 2 +- .../test/resources/catalog-owned-preview/info.txt | 2 +- 6 files changed, 11 insertions(+), 11 deletions(-) diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/tablefeatures/TableFeatures.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/tablefeatures/TableFeatures.java index 2caae11b3bb..a3e6e1fd7d3 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/tablefeatures/TableFeatures.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/tablefeatures/TableFeatures.java @@ -83,7 +83,7 @@ public boolean hasKernelWriteSupport(Metadata metadata) { } public static final TableFeature CATALOG_MANAGED_RW_FEATURE = - new CatalogManagedFeatureBase("catalogOwned-preview"); + new CatalogManagedFeatureBase("catalogManaged"); private static class CatalogManagedFeatureBase extends TableFeature.ReaderWriterFeature { CatalogManagedFeatureBase(String featureName) { diff --git a/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/icebergcompat/IcebergWriterCompatV1MetadataValidatorAndUpdaterSuite.scala b/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/icebergcompat/IcebergWriterCompatV1MetadataValidatorAndUpdaterSuite.scala index b168b8df484..9d0c35103de 100644 --- a/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/icebergcompat/IcebergWriterCompatV1MetadataValidatorAndUpdaterSuite.scala +++ b/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/icebergcompat/IcebergWriterCompatV1MetadataValidatorAndUpdaterSuite.scala @@ -271,7 +271,7 @@ class IcebergWriterCompatV1MetadataValidatorAndUpdaterSuite "typeWidening", "typeWidening-preview", "timestampNtz", - "catalogOwned-preview") + "catalogManaged") val protocol = new Protocol(3, 7, readerFeatures.asJava, writerFeatures.asJava) val metadata = getCompatEnabledMetadata(cmTestSchema()) validateAndUpdateIcebergWriterCompatV1Metadata(true, metadata, protocol) diff --git a/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/icebergcompat/IcebergWriterCompatV3MetadataValidatorAndUpdaterSuite.scala b/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/icebergcompat/IcebergWriterCompatV3MetadataValidatorAndUpdaterSuite.scala index aa16687d33d..8995e6b2c61 100644 --- a/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/icebergcompat/IcebergWriterCompatV3MetadataValidatorAndUpdaterSuite.scala +++ b/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/icebergcompat/IcebergWriterCompatV3MetadataValidatorAndUpdaterSuite.scala @@ -385,7 +385,7 @@ class IcebergWriterCompatV3MetadataValidatorAndUpdaterSuite "variantShredding-preview", "icebergCompatV2", "icebergWriterCompatV1", - "catalogOwned-preview") + "catalogManaged") val protocol = new Protocol(3, 7, readerFeatures.asJava, writerFeatures.asJava) val metadata = getCompatEnabledMetadata(cmTestSchema()) validateAndUpdateIcebergWriterCompatV3Metadata(true, metadata, protocol) diff --git a/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/tablefeatures/TableFeaturesSuite.scala b/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/tablefeatures/TableFeaturesSuite.scala index c51e18a6c40..590235dd7a8 100644 --- a/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/tablefeatures/TableFeaturesSuite.scala +++ b/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/tablefeatures/TableFeaturesSuite.scala @@ -40,7 +40,7 @@ class TableFeaturesSuite extends AnyFunSuite { // Tests for [[TableFeature]] implementations // ///////////////////////////////////////////////////////////////////////////////////////////////// val readerWriterFeatures = Seq( - "catalogOwned-preview", + "catalogManaged", "columnMapping", "deletionVectors", "timestampNtz", @@ -201,7 +201,7 @@ class TableFeaturesSuite extends AnyFunSuite { "domainMetadata", "vacuumProtocolCheck", "clustering", - "catalogOwned-preview", + "catalogManaged", "allowColumnDefaults").foreach { feature => test(s"doesn't support auto enable by metadata: $feature") { @@ -234,7 +234,7 @@ class TableFeaturesSuite extends AnyFunSuite { .collect(toList()).asScala val expected = Seq( - "catalogOwned-preview", + "catalogManaged", "columnMapping", "v2Checkpoint", "variantType", @@ -258,7 +258,7 @@ class TableFeaturesSuite extends AnyFunSuite { // are writable because the metadata has not been set the info that // these features are enabled val expected = Seq( - "catalogOwned-preview", + "catalogManaged", "columnMapping", "allowColumnDefaults", "v2Checkpoint", @@ -323,7 +323,7 @@ class TableFeaturesSuite extends AnyFunSuite { // Reads: Supported table features represented as readerFeatures in the protocol Seq( - "catalogOwned-preview", + "catalogManaged", "variantType", "variantType-preview", "variantShredding-preview", @@ -380,7 +380,7 @@ class TableFeaturesSuite extends AnyFunSuite { checkWriteSupported( "validateKernelCanWriteToTable: protocol 7 with catalogManaged", - new Protocol(3, 7, singleton("catalogOwned-preview"), singleton("catalogOwned-preview")), + new Protocol(3, 7, singleton("catalogManaged"), singleton("catalogManaged")), testMetadata()) checkWriteUnsupported( diff --git a/kernel/kernel-defaults/src/test/resources/catalog-owned-preview/_delta_log/00000000000000000000.json b/kernel/kernel-defaults/src/test/resources/catalog-owned-preview/_delta_log/00000000000000000000.json index a7ec1654633..8bf3a475a8c 100644 --- a/kernel/kernel-defaults/src/test/resources/catalog-owned-preview/_delta_log/00000000000000000000.json +++ b/kernel/kernel-defaults/src/test/resources/catalog-owned-preview/_delta_log/00000000000000000000.json @@ -1,3 +1,3 @@ {"commitInfo":{"inCommitTimestamp":1749830855993,"timestamp":1749830855992,"operation":"CREATE TABLE","operationParameters":{"partitionBy":"[\"part1\"]","clusterBy":"[]","description":null,"isManaged":"false","properties":"{}"},"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{},"engineInfo":"Apache-Spark/4.0.0 Delta-Lake/4.0.0","txnId":"d108f896-9662-4eda-b4de-444a99850aa8"}} {"metaData":{"id":"64dcd182-b3b4-4ee0-88e0-63c159a4121c","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"part1\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"col1\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":["part1"],"configuration":{"delta.enableInCommitTimestamps":"true"},"createdTime":1749830855646}} -{"protocol":{"minReaderVersion":3,"minWriterVersion":7,"readerFeatures":["catalogOwned-preview"],"writerFeatures":["catalogOwned-preview","inCommitTimestamp","invariants","appendOnly"]}} +{"protocol":{"minReaderVersion":3,"minWriterVersion":7,"readerFeatures":["catalogManaged"],"writerFeatures":["catalogManaged","inCommitTimestamp","invariants","appendOnly"]}} diff --git a/kernel/kernel-defaults/src/test/resources/catalog-owned-preview/info.txt b/kernel/kernel-defaults/src/test/resources/catalog-owned-preview/info.txt index 07e21cd47a0..00a2625c928 100644 --- a/kernel/kernel-defaults/src/test/resources/catalog-owned-preview/info.txt +++ b/kernel/kernel-defaults/src/test/resources/catalog-owned-preview/info.txt @@ -45,7 +45,7 @@ FROM ( ) """) -# Then, add `"readerFeatures":["catalogOwned-preview"]` to the _delta_log/001.json protocol +# Then, add `"readerFeatures":["catalogManaged"]` to the _delta_log/001.json protocol # Then, for commits version $v in [1, 2] move _delta_log/$v.json into # _delta_log/_staged_commits/$v.$uuid.json, where $uuid is taken from the commitInfo.txnId in # $v.json \ No newline at end of file