Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,12 @@
import io.delta.kernel.Snapshot;
import io.delta.kernel.engine.Engine;
import io.delta.kernel.internal.DeltaHistoryManager;
import io.delta.kernel.internal.SnapshotImpl;
import io.delta.kernel.internal.files.ParsedCatalogCommitData;
import io.delta.kernel.spark.exception.VersionNotFoundException;
import io.delta.kernel.spark.snapshot.DeltaSnapshotManager;
import io.delta.kernel.unitycatalog.UCCatalogManagedClient;
import java.util.List;
import java.util.Optional;

/**
Expand Down Expand Up @@ -57,35 +61,90 @@ public UCManagedTableSnapshotManager(

@Override
public Snapshot loadLatestSnapshot() {
throw new UnsupportedOperationException(
"UCManagedTableSnapshotManager.loadLatestSnapshot is not yet implemented");
return ucCatalogManagedClient.loadSnapshot(
engine,
tableId,
tablePath,
Optional.empty() /* versionOpt */,
Optional.empty() /* timestampOpt */);
}

@Override
public Snapshot loadSnapshotAt(long version) {
throw new UnsupportedOperationException(
"UCManagedTableSnapshotManager.loadSnapshotAt is not yet implemented");
return ucCatalogManagedClient.loadSnapshot(
engine, tableId, tablePath, Optional.of(version), Optional.empty() /* timestampOpt */);
}

/**
* Finds the active commit at a specific timestamp.
*
* <p>For UC-managed tables, this loads the latest snapshot and uses {@link
* DeltaHistoryManager#getActiveCommitAtTimestamp} to resolve the timestamp to a commit.
*/
@Override
public DeltaHistoryManager.Commit getActiveCommitAtTime(
long timestampMillis,
boolean canReturnLastCommit,
boolean mustBeRecreatable,
boolean canReturnEarliestCommit) {
throw new UnsupportedOperationException(
"UCManagedTableSnapshotManager.getActiveCommitAtTime is not yet implemented");
SnapshotImpl snapshot = (SnapshotImpl) loadLatestSnapshot();
List<ParsedCatalogCommitData> catalogCommits = snapshot.getLogSegment().getAllCatalogCommits();
return DeltaHistoryManager.getActiveCommitAtTimestamp(
engine,
snapshot,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@allisonport-db - can you help me understand: Why does this API need both the snapshot as well as the catalogCommits? What makes the catalogCommits special?

e.g. as you can see here: we are just extracting the catalogCommits from the snapshot.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Kernel's usage the snapshot is the latestSnapshot that is provided as part of the time-travel timestamp spec, the catalogCommits are what are provided by the catalog (these two arguments are given separately when building anything with time-travel)

I guess you could extract the commits from the latestSnapshot, but maybe it's possible the catalog will provide different ones they want you to use? This is an internal API and it's based off of our public API for SnapshotBuilder and CommitRangeBuilder which take list of catalog commits + timestamp-timetravel spec (snapshot + timestamp)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there is one another usage of snapshot in this method, extract metadata and tell if ict is enabled and then have some different business logic

snapshot.getLogPath(),
timestampMillis,
mustBeRecreatable,
canReturnLastCommit,
canReturnEarliestCommit,
catalogCommits);
}

/**
* Checks if a specific version exists and is accessible.
*
* <p>For UC-managed tables with catalogManaged, log files may be cleaned up, so we need to use
* DeltaHistoryManager to find the earliest available version based on filesystem state.
*/
@Override
public void checkVersionExists(long version, boolean mustBeRecreatable, boolean allowOutOfRange) {
throw new UnsupportedOperationException(
"UCManagedTableSnapshotManager.checkVersionExists is not yet implemented");
public void checkVersionExists(long version, boolean mustBeRecreatable, boolean allowOutOfRange)
throws VersionNotFoundException {
Comment on lines +110 to +111
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the behavior of the parent checkVersionExists method also throw error if version does not exist? throwing error on invalid in a function meant to check validity is very unintuitive. i would expect a method named checkX() to simply return true or false, and only throw error when something totally unexpected has happened that prevent it from correctly verfying X is valid or not.

cc @huan233usc

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was trying to make the API having the same semantic as v1's equivalence.

def checkVersionExists(
version: Long,
catalogTableOpt: Option[CatalogTable],
mustBeRecreatable: Boolean = true,
allowOutOfRange: Boolean = false): Unit = {
val earliest = if (mustBeRecreatable) {
getEarliestRecreatableCommit
} else {
getEarliestDeltaFile(deltaLog)
}
val latest = deltaLog.update(catalogTableOpt = catalogTableOpt).version
if (version < earliest || ((version > latest) && !allowOutOfRange)) {
throw VersionNotFoundException(version, earliest, latest)
}
}

Honestly returning boolean is a better API design. I think we could try to change both v1 and here

// Load latest to get the current version bounds
SnapshotImpl snapshot = (SnapshotImpl) loadLatestSnapshot();
long latestRatifiedVersion = snapshot.getVersion();

// Fast path: check upper bound before expensive filesystem operations
if ((version > latestRatifiedVersion) && !allowOutOfRange) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you help me understand: Why would I pass in version 1000 with allowOutOfRange=true, yet the latestVersion is 22. What's the connector logic around that use case?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The allowOutOfRange flag is primarily for streaming. For example:

if (startingVersion.version > snapshotToUse.version) {
val allowOutOfRange = conf.getConf(DeltaSQLConf.DELTA_CDF_ALLOW_OUT_OF_RANGE_TIMESTAMP)
if (allowOutOfRange) {
return Some(emptyCDFRelation(spark, snapshotToUse, catalogTableOpt, BatchCDFSchemaLegacy))
}
throw DeltaErrors.startVersionAfterLatestVersion(
startingVersion.version, snapshotToUse.version)
}

downstream, if version (1000) > latest (22), the connector returns an empty result instead of throwing. then we can poll for "changes since X" without errors/throwing when X is ahead of the table (the latest version)

throw new VersionNotFoundException(version, 0, latestRatifiedVersion);
}

// Compute earliestRatifiedCommitVersion from catalog commits
List<ParsedCatalogCommitData> catalogCommits = snapshot.getLogSegment().getAllCatalogCommits();
Optional<Long> earliestRatifiedCommitVersion =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not clear to me how this is helpful or what this variable is for.

(1) Every commit in the delta log was at one point ratified, so ealiestRatifiedCommitVersion is a bit confusing.

(2) I think you just mean: Earliest catalog commit. But, a catalog commit can also be published. So the delta log can have published deltas 0 to 20, and the catalog can have commits 10 to 30, and here the earliestCatalogCommit will be 10 -- so what? why is that helpful?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dah. I see earliestRatifiedCommitVersion is a Kernel term. Ha. I'll ask @allisonport-db to PTAL here

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see .. it's just used for a version 0 edge case

catalogCommits.stream().map(ParsedCatalogCommitData::getVersion).min(Long::compare);

// Use DeltaHistoryManager to find earliest version based on filesystem state
long earliestVersion =
mustBeRecreatable
? DeltaHistoryManager.getEarliestRecreatableCommit(
engine, snapshot.getLogPath(), earliestRatifiedCommitVersion)
: DeltaHistoryManager.getEarliestDeltaFile(
engine, snapshot.getLogPath(), earliestRatifiedCommitVersion);

if (version < earliestVersion) {
throw new VersionNotFoundException(version, earliestVersion, latestRatifiedVersion);
}
}

@Override
public CommitRange getTableChanges(Engine engine, long startVersion, Optional<Long> endVersion) {
throw new UnsupportedOperationException(
"UCManagedTableSnapshotManager.getTableChanges is not yet implemented");
return ucCatalogManagedClient.loadCommitRange(
engine,
tableId,
tablePath,
Optional.of(startVersion) /* startVersionOpt */,
Optional.empty() /* startTimestampOpt */,
endVersion /* endVersionOpt */,
Optional.empty() /* endTimestampOpt */);
}
}
Loading
Loading