Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
622cf14
initial commit
TimothyW553 Nov 10, 2025
857aa90
unify comments
TimothyW553 Nov 11, 2025
7690d84
edit comment
TimothyW553 Nov 11, 2025
6a1d2a5
change to catalogtable
TimothyW553 Nov 11, 2025
a57a342
add tests
TimothyW553 Nov 12, 2025
eeadbdb
update methods and testS
TimothyW553 Nov 12, 2025
030b548
delete workspace
TimothyW553 Nov 12, 2025
85c45eb
delete
TimothyW553 Nov 12, 2025
1ad766d
fmt
TimothyW553 Nov 12, 2025
d6c5b47
test fix
TimothyW553 Nov 13, 2025
12d1cfd
fix catalogtable mcok
TimothyW553 Nov 13, 2025
b2b4428
storage props
TimothyW553 Nov 14, 2025
0832138
Add CatalogTable test helper
TimothyW553 Nov 14, 2025
5830196
merge props
TimothyW553 Nov 17, 2025
8b5c962
address comments
TimothyW553 Nov 17, 2025
4463c3e
fmt
TimothyW553 Nov 17, 2025
fed1e0a
address second round comments
TimothyW553 Nov 18, 2025
f88f46a
add reason for scala usage
TimothyW553 Nov 18, 2025
aab04bb
remove private method test
TimothyW553 Nov 18, 2025
ec1a35b
null check for storage properties
TimothyW553 Nov 18, 2025
1b1fe59
fmt
TimothyW553 Nov 18, 2025
6686a97
address comments
TimothyW553 Nov 18, 2025
26ad1cd
address comments
TimothyW553 Nov 18, 2025
3f3cd9c
constants
TimothyW553 Nov 20, 2025
f4d4f13
space
TimothyW553 Nov 20, 2025
6359268
fmt
TimothyW553 Nov 20, 2025
4571d96
config and extraction
TimothyW553 Nov 20, 2025
f66e0cd
decouple
TimothyW553 Nov 21, 2025
0bd57d3
Remove UC-specific utilities and inline logic in CatalogManagedSnapsh…
TimothyW553 Nov 21, 2025
fc30213
change unity
TimothyW553 Nov 21, 2025
3bd4357
change unity
TimothyW553 Nov 21, 2025
2ad9fc0
docs: tidy root metadata
TimothyW553 Nov 21, 2025
6126080
build
TimothyW553 Nov 21, 2025
f88a47a
Merge branch 'master' into stack/ccv2-catalog-config
TimothyW553 Nov 21, 2025
54bf26d
generics
TimothyW553 Nov 24, 2025
ec45077
clean up
TimothyW553 Nov 24, 2025
d6d604e
fmt
TimothyW553 Nov 24, 2025
5635ab6
refactor
TimothyW553 Nov 24, 2025
2d056e5
cleaner
TimothyW553 Nov 25, 2025
841e46f
Merge branch 'master' into stack/ccv2-catalog-config
TimothyW553 Nov 25, 2025
4a1d8fa
fix comment
TimothyW553 Nov 25, 2025
2c69260
Merge branch 'master' into stack/ccv2-catalog-config
TimothyW553 Nov 25, 2025
f06eee3
Merge branch 'master' into stack/ccv2-catalog-config
TimothyW553 Nov 25, 2025
2f792d8
fmt
TimothyW553 Nov 25, 2025
82c3837
Remove wiring from catalog-config base
TimothyW553 Nov 25, 2025
b2f2b5f
Wire SparkTable to snapshot manager factory
TimothyW553 Nov 25, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,7 @@ lazy val sparkV1Filtered = (project in file("spark-v1-filtered"))
lazy val sparkV2 = (project in file("kernel-spark"))
.dependsOn(sparkV1Filtered)
.dependsOn(kernelDefaults)
.dependsOn(kernelUnityCatalog)
.dependsOn(goldenTables % "test")
.settings(
name := "delta-spark-v2",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import io.delta.kernel.Snapshot;
import io.delta.kernel.spark.read.SparkScanBuilder;
import io.delta.kernel.spark.snapshot.DeltaSnapshotManager;
import io.delta.kernel.spark.snapshot.PathBasedSnapshotManager;
import io.delta.kernel.spark.snapshot.DeltaSnapshotManagerFactory;
import io.delta.kernel.spark.utils.SchemaUtils;
import java.util.*;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -138,9 +138,11 @@ private SparkTable(
merged.putAll(userOptions);
this.options = Collections.unmodifiableMap(merged);

this.hadoopConf =
SparkSession.active().sessionState().newHadoopConfWithOptions(toScalaMap(options));
this.snapshotManager = new PathBasedSnapshotManager(tablePath, hadoopConf);
SparkSession spark = SparkSession.active();
this.hadoopConf = spark.sessionState().newHadoopConfWithOptions(toScalaMap(options));
// Use factory to create appropriate snapshot manager (catalog-managed vs path-based)
this.snapshotManager =
DeltaSnapshotManagerFactory.create(tablePath, catalogTable, spark, hadoopConf);
// Load the initial snapshot through the manager
this.initialSnapshot = snapshotManager.loadLatestSnapshot();
this.schema = SchemaUtils.convertKernelSchemaToSparkSchema(initialSnapshot.getSchema());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
/*
* Copyright (2025) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.spark.snapshot;

import static io.delta.kernel.internal.util.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;

import io.delta.kernel.CommitRange;
import io.delta.kernel.Snapshot;
import io.delta.kernel.defaults.engine.DefaultEngine;
import io.delta.kernel.engine.Engine;
import io.delta.kernel.exceptions.KernelException;
import io.delta.kernel.internal.DeltaHistoryManager;
import io.delta.kernel.spark.exception.VersionNotFoundException;
import java.util.Optional;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.annotation.Experimental;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Implementation of DeltaSnapshotManager for catalog-managed tables (e.g., UC).
*
* <p>This snapshot manager is agnostic to the underlying catalog implementation. It delegates to a
* {@link ManagedCommitClient}, keeping catalog-specific wiring out of the manager itself.
*/
@Experimental
public class CatalogManagedSnapshotManager implements DeltaSnapshotManager, AutoCloseable {

private static final Logger logger = LoggerFactory.getLogger(CatalogManagedSnapshotManager.class);

private final ManagedCommitClient commitClient;
private final Engine kernelEngine;

public CatalogManagedSnapshotManager(ManagedCommitClient commitClient, Configuration hadoopConf) {
this.commitClient = requireNonNull(commitClient, "commitClient is null");
requireNonNull(hadoopConf, "hadoopConf is null");

this.kernelEngine = DefaultEngine.create(hadoopConf);
logger.info(
"Created CatalogManagedSnapshotManager for table {} at path {}",
commitClient.getTableId(),
commitClient.getTablePath());
}

/** Loads the latest snapshot of the catalog-managed Delta table. */
@Override
public Snapshot loadLatestSnapshot() {
return commitClient.loadSnapshot(kernelEngine, Optional.empty(), Optional.empty());
}

/**
* Loads a specific version of the Unity Catalog managed Delta table.
*
* @param version the version to load (must be >= 0)
* @return the snapshot at the specified version
*/
@Override
public Snapshot loadSnapshotAt(long version) {
checkArgument(version >= 0, "version must be non-negative");
return commitClient.loadSnapshot(kernelEngine, Optional.of(version), Optional.empty());
}

/**
* Finds the active commit at a specific timestamp.
*
* <p><strong>Note:</strong> This operation is not yet supported for Unity Catalog managed tables
* because it requires filesystem-based commit history which is not accessible for catalog-managed
* tables. Unity Catalog coordinates commits differently than traditional Delta tables.
*
* @throws UnsupportedOperationException always - not yet implemented for catalog-managed tables
*/
@Override
public DeltaHistoryManager.Commit getActiveCommitAtTime(
long timestampMillis,
boolean canReturnLastCommit,
boolean mustBeRecreatable,
boolean canReturnEarliestCommit) {
throw new UnsupportedOperationException(
"getActiveCommitAtTime not yet implemented for catalog-managed tables. "
+ "This operation requires filesystem-based commit history which may not be "
+ "available for catalog-managed tables.");
}

/**
* Checks if a specific version exists and is accessible.
*
* <p><strong>Performance Note:</strong> For Unity Catalog managed tables, version checking
* requires loading the full snapshot including all file metadata. This is less efficient than
* filesystem-based checks which can verify log file existence without reading contents.
*
* <p><strong>TODO (Next PR):</strong> Add lightweight version checking API to
* UCCatalogManagedClient to avoid loading full snapshots for existence checks.
*
* @throws VersionNotFoundException if the version is not available
*/
@Override
public void checkVersionExists(long version, boolean mustBeRecreatable, boolean allowOutOfRange)
throws VersionNotFoundException {
checkArgument(version >= 0, "version must be non-negative");

try {
// Attempt to load the snapshot at the specified version
// Note: This loads the full snapshot - see performance note above
loadSnapshotAt(version);
} catch (KernelException e) {
// Specific Kernel exceptions indicate version doesn't exist or isn't accessible
// Let other exceptions (network failures, auth errors, etc.) propagate to caller
long latestVersion = loadLatestSnapshot().getVersion();
throw new VersionNotFoundException(version, 0, latestVersion);
}
}

/**
* Gets a range of table changes between versions.
*
* <p><strong>Note:</strong> This operation delegates to the managed commit client.
*
* @throws UnsupportedOperationException if not yet implemented for catalog-managed tables
*/
@Override
public CommitRange getTableChanges(Engine engine, long startVersion, Optional<Long> endVersion) {
requireNonNull(engine, "engine is null");
checkArgument(startVersion >= 0, "startVersion must be non-negative");
endVersion.ifPresent(v -> checkArgument(v >= 0, "endVersion must be non-negative"));

return commitClient.loadCommitRange(
engine, Optional.of(startVersion), Optional.empty(), endVersion, Optional.empty());
}

/**
* Closes the UC client and releases resources.
*
* <p>This method should be called when the snapshot manager is no longer needed. Prefer using
* try-with-resources to ensure proper cleanup.
*/
@Override
public void close() {
try {
commitClient.close();
logger.info("Closed CatalogManagedSnapshotManager for table {}", commitClient.getTableId());
} catch (Exception e) {
logger.warn(
"Error closing catalog-managed client for table {}", commitClient.getTableId(), e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Copyright (2025) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.spark.snapshot;

import static java.util.Objects.requireNonNull;

import io.delta.kernel.spark.snapshot.unitycatalog.UnityCatalogManagedCommitClient;
import java.util.Optional;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.annotation.Experimental;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.catalog.CatalogTable;

/**
* Factory for creating {@link DeltaSnapshotManager} instances.
*
* <p>This factory determines the appropriate snapshot manager implementation based on table
* characteristics and automatically handles the selection between:
*
* <ul>
* <li>{@link CatalogManagedSnapshotManager} - for Unity Catalog managed tables (CCv2)
* <li>{@link PathBasedSnapshotManager} - for regular filesystem-based Delta tables
* </ul>
*
* <p>The factory encapsulates the decision logic so that callers (e.g., {@code SparkTable}) don't
* need to know about specific manager implementations.
*
* <p><strong>Example usage:</strong>
*
* <pre>{@code
* DeltaSnapshotManager manager = DeltaSnapshotManagerFactory.create(
* tablePath,
* Optional.of(catalogTable),
* spark,
* hadoopConf
* );
* Snapshot snapshot = manager.loadLatestSnapshot();
* }</pre>
*/
@Experimental
public final class DeltaSnapshotManagerFactory {

// Utility class - no instances
private DeltaSnapshotManagerFactory() {}

/**
* Creates the appropriate snapshot manager for a Delta table.
*
* <p><strong>Selection logic:</strong>
*
* <ul>
* <li>If {@code catalogTable} is present and UC-managed → {@link CatalogManagedSnapshotManager}
* <li>Otherwise → {@link PathBasedSnapshotManager}
* </ul>
*
* @param tablePath filesystem path to the Delta table root
* @param catalogTable optional Spark catalog table metadata
* @param spark SparkSession for resolving Unity Catalog configurations
* @param hadoopConf Hadoop configuration for the Delta Kernel engine
* @return appropriate snapshot manager implementation
* @throws NullPointerException if tablePath, spark, or hadoopConf is null
* @throws IllegalArgumentException if catalogTable is UC-managed but configuration is invalid
*/
public static DeltaSnapshotManager create(
String tablePath,
Optional<CatalogTable> catalogTable,
SparkSession spark,
Configuration hadoopConf) {

requireNonNull(tablePath, "tablePath is null");
requireNonNull(catalogTable, "catalogTable is null");
requireNonNull(spark, "spark is null");
requireNonNull(hadoopConf, "hadoopConf is null");

if (catalogTable.isPresent()) {
Optional<ManagedCommitClient> clientOpt =
UnityCatalogManagedCommitClient.fromCatalog(catalogTable.get(), spark);
if (clientOpt.isPresent()) {
return new CatalogManagedSnapshotManager(clientOpt.get(), hadoopConf);
}
}

// Default to path-based snapshot manager
return new PathBasedSnapshotManager(tablePath, hadoopConf);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright (2025) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.spark.snapshot;

import io.delta.kernel.CommitRange;
import io.delta.kernel.Snapshot;
import io.delta.kernel.engine.Engine;
import java.util.Optional;

/**
* Catalog-managed commit client that knows how to load snapshots and commit ranges for a specific
* table.
*/
public interface ManagedCommitClient extends AutoCloseable {

/** @return catalog-managed table identifier (for logging/telemetry). */
String getTableId();

/** @return physical table path used by Delta Kernel. */
String getTablePath();

Snapshot loadSnapshot(Engine engine, Optional<Long> versionOpt, Optional<Long> timestampOpt);

CommitRange loadCommitRange(
Engine engine,
Optional<Long> startVersionOpt,
Optional<Long> startTimestampOpt,
Optional<Long> endVersionOpt,
Optional<Long> endTimestampOpt);

@Override
void close();
}
Loading
Loading