Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
622cf14
initial commit
TimothyW553 Nov 10, 2025
857aa90
unify comments
TimothyW553 Nov 11, 2025
7690d84
edit comment
TimothyW553 Nov 11, 2025
6a1d2a5
change to catalogtable
TimothyW553 Nov 11, 2025
a57a342
add tests
TimothyW553 Nov 12, 2025
eeadbdb
update methods and testS
TimothyW553 Nov 12, 2025
030b548
delete workspace
TimothyW553 Nov 12, 2025
85c45eb
delete
TimothyW553 Nov 12, 2025
1ad766d
fmt
TimothyW553 Nov 12, 2025
d6c5b47
test fix
TimothyW553 Nov 13, 2025
12d1cfd
fix catalogtable mcok
TimothyW553 Nov 13, 2025
b2b4428
storage props
TimothyW553 Nov 14, 2025
0832138
Add CatalogTable test helper
TimothyW553 Nov 14, 2025
5830196
merge props
TimothyW553 Nov 17, 2025
8b5c962
address comments
TimothyW553 Nov 17, 2025
4463c3e
fmt
TimothyW553 Nov 17, 2025
fed1e0a
address second round comments
TimothyW553 Nov 18, 2025
f88f46a
add reason for scala usage
TimothyW553 Nov 18, 2025
aab04bb
remove private method test
TimothyW553 Nov 18, 2025
ec1a35b
null check for storage properties
TimothyW553 Nov 18, 2025
1b1fe59
fmt
TimothyW553 Nov 18, 2025
6686a97
address comments
TimothyW553 Nov 18, 2025
26ad1cd
address comments
TimothyW553 Nov 18, 2025
3f3cd9c
constants
TimothyW553 Nov 20, 2025
f4d4f13
space
TimothyW553 Nov 20, 2025
6359268
fmt
TimothyW553 Nov 20, 2025
4571d96
config and extraction
TimothyW553 Nov 20, 2025
f66e0cd
decouple
TimothyW553 Nov 21, 2025
0bd57d3
Remove UC-specific utilities and inline logic in CatalogManagedSnapsh…
TimothyW553 Nov 21, 2025
fc30213
change unity
TimothyW553 Nov 21, 2025
3bd4357
change unity
TimothyW553 Nov 21, 2025
2ad9fc0
docs: tidy root metadata
TimothyW553 Nov 21, 2025
6126080
build
TimothyW553 Nov 21, 2025
f88a47a
Merge branch 'master' into stack/ccv2-catalog-config
TimothyW553 Nov 21, 2025
54bf26d
generics
TimothyW553 Nov 24, 2025
ec45077
clean up
TimothyW553 Nov 24, 2025
d6d604e
fmt
TimothyW553 Nov 24, 2025
5635ab6
refactor
TimothyW553 Nov 24, 2025
2d056e5
cleaner
TimothyW553 Nov 25, 2025
841e46f
Merge branch 'master' into stack/ccv2-catalog-config
TimothyW553 Nov 25, 2025
4a1d8fa
fix comment
TimothyW553 Nov 25, 2025
2c69260
Merge branch 'master' into stack/ccv2-catalog-config
TimothyW553 Nov 25, 2025
f06eee3
Merge branch 'master' into stack/ccv2-catalog-config
TimothyW553 Nov 25, 2025
2f792d8
fmt
TimothyW553 Nov 25, 2025
82c3837
Remove wiring from catalog-config base
TimothyW553 Nov 25, 2025
80d3abf
operation completeness
TimothyW553 Nov 26, 2025
72ab1a5
using imports instead of fully qualified names
TimothyW553 Dec 1, 2025
2a6f689
Add parameter name comments for Optional.empty()
TimothyW553 Dec 1, 2025
63e6137
change from client to adapter
TimothyW553 Dec 1, 2025
270fd8c
adapter, separate create to catalog + path, etc.
TimothyW553 Dec 1, 2025
b2e366c
dedub getcommits from client - just get from logsegment
TimothyW553 Dec 1, 2025
f677e9c
fmt
TimothyW553 Dec 1, 2025
31a7b61
tests and deps
TimothyW553 Dec 1, 2025
a3ff091
Merge branch 'master' into stack/ccv2-catalog-config
TimothyW553 Dec 1, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,7 @@ lazy val sparkV1Filtered = (project in file("spark-v1-filtered"))
lazy val sparkV2 = (project in file("kernel-spark"))
.dependsOn(sparkV1Filtered)
.dependsOn(kernelDefaults)
.dependsOn(kernelUnityCatalog % "compile->compile;test->test")
.dependsOn(goldenTables % "test")
.settings(
name := "delta-spark-v2",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
/*
* Copyright (2025) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.spark.snapshot;

import static io.delta.kernel.internal.util.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;

import io.delta.kernel.CommitRange;
import io.delta.kernel.Snapshot;
import io.delta.kernel.defaults.engine.DefaultEngine;
import io.delta.kernel.engine.Engine;
import io.delta.kernel.internal.DeltaHistoryManager;
import io.delta.kernel.internal.SnapshotImpl;
import io.delta.kernel.internal.files.ParsedCatalogCommitData;
import io.delta.kernel.spark.exception.VersionNotFoundException;
import java.util.List;
import java.util.Optional;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.annotation.Experimental;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Implementation of DeltaSnapshotManager for catalog-managed tables (e.g., UC).
*
* <p>This snapshot manager is agnostic to the underlying catalog implementation. It delegates to a
* {@link ManagedCatalogAdapter}, keeping catalog-specific wiring out of the manager itself.
*/
@Experimental
public class CatalogManagedSnapshotManager implements DeltaSnapshotManager, AutoCloseable {

private static final Logger logger = LoggerFactory.getLogger(CatalogManagedSnapshotManager.class);

private final ManagedCatalogAdapter catalogAdapter;
private final String tableId;
private final String tablePath;
private final Engine kernelEngine;

public CatalogManagedSnapshotManager(
ManagedCatalogAdapter catalogAdapter,
String tableId,
String tablePath,
Configuration hadoopConf) {
this.catalogAdapter = requireNonNull(catalogAdapter, "catalogAdapter is null");
this.tableId = requireNonNull(tableId, "tableId is null");
this.tablePath = requireNonNull(tablePath, "tablePath is null");
requireNonNull(hadoopConf, "hadoopConf is null");

this.kernelEngine = DefaultEngine.create(hadoopConf);
logger.info(
"Created CatalogManagedSnapshotManager for table {} at path {}", tableId, tablePath);
}

/** Loads the latest snapshot of the catalog-managed Delta table. */
@Override
public Snapshot loadLatestSnapshot() {
return catalogAdapter.loadSnapshot(
kernelEngine, /* versionOpt = */ Optional.empty(), /* timestampOpt = */ Optional.empty());
}

/**
* Loads a specific version of the Unity Catalog managed Delta table.
*
* @param version the version to load (must be >= 0)
* @return the snapshot at the specified version
*/
@Override
public Snapshot loadSnapshotAt(long version) {
checkArgument(version >= 0, "version must be non-negative");
return catalogAdapter.loadSnapshot(
kernelEngine, Optional.of(version), /* timestampOpt = */ Optional.empty());
}

/**
* Finds the active commit at a specific timestamp.
*
* <p>For catalog-managed tables, this method retrieves ratified commits from the catalog and uses
* {@link DeltaHistoryManager#getActiveCommitAtTimestamp} to find the commit that was active at
* the specified timestamp.
*
* @param timestampMillis the timestamp in milliseconds since epoch (UTC)
* @param canReturnLastCommit if true, returns the last commit if the timestamp is after all
* commits; if false, throws an exception
* @param mustBeRecreatable if true, only considers commits that can be fully recreated from
* available log files; if false, considers all commits
* @param canReturnEarliestCommit if true, returns the earliest commit if the timestamp is before
* all commits; if false, throws an exception
* @return the commit that was active at the specified timestamp
*/
@Override
public DeltaHistoryManager.Commit getActiveCommitAtTime(
long timestampMillis,
boolean canReturnLastCommit,
boolean mustBeRecreatable,
boolean canReturnEarliestCommit) {
// Load the latest snapshot for timestamp resolution
SnapshotImpl latestSnapshot = (SnapshotImpl) loadLatestSnapshot();

// Extract catalog commits from the snapshot's log segment (avoids redundant UC call)
List<ParsedCatalogCommitData> catalogCommits =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

loadLatestSnapshot() will call commitClient.getRatifiedCommits as well. We need to avoid calling it twice.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes, I see the codepath now for getting commits and the latest snapshot. Fixed by extracting catalog commits directly from the snapshot's log segment instead of calling UC twice:

List<ParsedCatalogCommitData> catalogCommits = latestSnapshot.getLogSegment().getAllCatalogCommits();

latestSnapshot.getLogSegment().getAllCatalogCommits();

return DeltaHistoryManager.getActiveCommitAtTimestamp(
kernelEngine,
latestSnapshot,
latestSnapshot.getLogPath(),
timestampMillis,
mustBeRecreatable,
canReturnLastCommit,
canReturnEarliestCommit,
catalogCommits);
}

/**
* Checks if a specific version exists and is accessible.
*
* <p>For catalog-managed tables, versions are assumed to be contiguous (enforced by the catalog
* coordinator). This method performs a lightweight check by verifying the version is within the
* valid range [0, latestRatifiedVersion].
*
* <p>This approach is consistent with the existing Spark Delta behavior in {@code
* DeltaHistoryManager.checkVersionExists} which also assumes contiguous commits.
*
* @param version the version to check
* @param mustBeRecreatable if true, requires that the version can be fully recreated from
* available log files. For catalog-managed tables, all versions are recreatable since the
* catalog maintains the complete commit history.
* @param allowOutOfRange if true, allows versions greater than the latest version without
* throwing an exception; if false, throws exception for out-of-range versions
* @throws VersionNotFoundException if the version is not available
*/
@Override
public void checkVersionExists(long version, boolean mustBeRecreatable, boolean allowOutOfRange)
throws VersionNotFoundException {
checkArgument(version >= 0, "version must be non-negative");

// For catalog-managed tables, the earliest recreatable version is 0 since the catalog
// maintains the complete commit history
long earliestVersion = 0;
long latestVersion = catalogAdapter.getLatestRatifiedVersion();

if (version < earliestVersion || ((version > latestVersion) && !allowOutOfRange)) {
throw new VersionNotFoundException(version, earliestVersion, latestVersion);
}
}

/**
* Gets a range of table changes between versions.
*
* <p><strong>Note:</strong> This operation delegates to the managed commit client.
*
* @throws UnsupportedOperationException if not yet implemented for catalog-managed tables
*/
@Override
public CommitRange getTableChanges(Engine engine, long startVersion, Optional<Long> endVersion) {
requireNonNull(engine, "engine is null");
checkArgument(startVersion >= 0, "startVersion must be non-negative");
endVersion.ifPresent(v -> checkArgument(v >= 0, "endVersion must be non-negative"));

return catalogAdapter.loadCommitRange(
engine,
Optional.of(startVersion),
/* startTimestampOpt = */ Optional.empty(),
endVersion,
/* endTimestampOpt = */ Optional.empty());
}

/**
* Closes the UC client and releases resources.
*
* <p>This method should be called when the snapshot manager is no longer needed. Prefer using
* try-with-resources to ensure proper cleanup.
*/
@Override
public void close() {
try {
catalogAdapter.close();
logger.info("Closed CatalogManagedSnapshotManager for table {}", tableId);
} catch (Exception e) {
logger.warn("Error closing catalog-managed client for table {}", tableId, e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Copyright (2025) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.spark.snapshot;

import static java.util.Objects.requireNonNull;

import io.delta.kernel.spark.snapshot.unitycatalog.UnityCatalogAdapter;
import java.util.Optional;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.annotation.Experimental;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.catalog.CatalogTable;

/**
* Factory for creating {@link DeltaSnapshotManager} instances.
*
* <p>This factory provides two creation methods:
*
* <ul>
* <li>{@link #fromPath} - Creates a {@link PathBasedSnapshotManager} for filesystem-based Delta
* tables
* <li>{@link #fromCatalogTable} - Creates snapshot manager from catalog metadata, automatically
* selecting {@link CatalogManagedSnapshotManager} for UC tables or falling back to {@link
* PathBasedSnapshotManager}
* </ul>
*
* <p><strong>Example usage:</strong>
*
* <pre>{@code
* // For path-based tables
* DeltaSnapshotManager manager = DeltaSnapshotManagerFactory.fromPath(
* tablePath,
* hadoopConf
* );
*
* // For catalog tables
* DeltaSnapshotManager manager = DeltaSnapshotManagerFactory.fromCatalogTable(
* catalogTable,
* spark,
* hadoopConf
* );
* }</pre>
*/
@Experimental
public final class DeltaSnapshotManagerFactory {

// Utility class - no instances
private DeltaSnapshotManagerFactory() {}

/**
* Creates a path-based snapshot manager for filesystem Delta tables.
*
* <p>Use this when no catalog metadata is available or when you want to work directly with a
* filesystem path.
*
* @param tablePath filesystem path to the Delta table root
* @param hadoopConf Hadoop configuration for the Delta Kernel engine
* @return PathBasedSnapshotManager instance
* @throws NullPointerException if tablePath or hadoopConf is null
*/
public static DeltaSnapshotManager fromPath(String tablePath, Configuration hadoopConf) {
requireNonNull(tablePath, "tablePath is null");
requireNonNull(hadoopConf, "hadoopConf is null");
return new PathBasedSnapshotManager(tablePath, hadoopConf);
}

/**
* Creates a snapshot manager from catalog table metadata.
*
* <p>Automatically selects {@link CatalogManagedSnapshotManager} for Unity Catalog managed
* tables, or falls back to {@link PathBasedSnapshotManager} for regular tables.
*
* @param catalogTable Spark catalog table metadata
* @param spark SparkSession for resolving Unity Catalog configurations
* @param hadoopConf Hadoop configuration for the Delta Kernel engine
* @return appropriate snapshot manager implementation
* @throws NullPointerException if catalogTable, spark, or hadoopConf is null
* @throws IllegalArgumentException if catalogTable is UC-managed but configuration is invalid
*/
public static DeltaSnapshotManager fromCatalogTable(
CatalogTable catalogTable, SparkSession spark, Configuration hadoopConf) {
requireNonNull(catalogTable, "catalogTable is null");
requireNonNull(spark, "spark is null");
requireNonNull(hadoopConf, "hadoopConf is null");

Optional<ManagedCatalogAdapter> adapterOpt =
UnityCatalogAdapter.fromCatalog(catalogTable, spark);

if (adapterOpt.isPresent()) {
ManagedCatalogAdapter adapter = adapterOpt.get();
// Cast to UnityCatalogAdapter to access tableId and tablePath
UnityCatalogAdapter ucAdapter = (UnityCatalogAdapter) adapter;
String tableId = ucAdapter.getTableId();
String tablePath = ucAdapter.getTablePath();
return new CatalogManagedSnapshotManager(adapter, tableId, tablePath, hadoopConf);
}

// Fallback to path-based snapshot manager
String tablePath = catalogTable.location().toString();
return new PathBasedSnapshotManager(tablePath, hadoopConf);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright (2025) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.spark.snapshot;

import io.delta.kernel.CommitRange;
import io.delta.kernel.Snapshot;
import io.delta.kernel.engine.Engine;
import io.delta.kernel.internal.files.ParsedLogData;
import java.util.List;
import java.util.Optional;

/**
* Adapter for catalog-managed tables that knows how to load snapshots and commit ranges for a
* specific table.
*/
public interface ManagedCatalogAdapter extends AutoCloseable {

Snapshot loadSnapshot(Engine engine, Optional<Long> versionOpt, Optional<Long> timestampOpt);

CommitRange loadCommitRange(
Engine engine,
Optional<Long> startVersionOpt,
Optional<Long> startTimestampOpt,
Optional<Long> endVersionOpt,
Optional<Long> endTimestampOpt);

/**
* Gets the ratified commits from the catalog up to the specified version.
*
* <p>The returned list contains {@link ParsedLogData} representing each ratified commit, sorted
* by version in ascending order. These are typically {@code ParsedCatalogCommitData} instances
* for catalog-managed tables.
*
* @param endVersionOpt optional end version (inclusive); if empty, returns commits up to latest
* @return list of parsed log data representing ratified commits, sorted by version ascending
*/
List<ParsedLogData> getRatifiedCommits(Optional<Long> endVersionOpt);

/**
* Gets the latest ratified table version from the catalog.
*
* <p>For catalog-managed tables, this is the highest version that has been ratified by the
* catalog coordinator.
*
* @return the latest version ratified by the catalog, or 0 if only the initial commit exists
*/
long getLatestRatifiedVersion();

@Override
void close();
}
Loading
Loading