Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@
import static java.util.Objects.requireNonNull;

import io.delta.kernel.Snapshot;
import io.delta.kernel.defaults.engine.DefaultEngine;
import io.delta.kernel.engine.Engine;
import io.delta.kernel.spark.read.SparkScanBuilder;
import io.delta.kernel.spark.snapshot.DeltaSnapshotManager;
import io.delta.kernel.spark.snapshot.PathBasedSnapshotManager;
import io.delta.kernel.spark.snapshot.SnapshotManagerFactory;
import io.delta.kernel.spark.utils.SchemaUtils;
import java.util.*;
import java.util.function.Supplier;
Expand Down Expand Up @@ -52,8 +54,6 @@ public class SparkTable implements Table, SupportsRead {
/** Snapshot created during connector setup */
private final Snapshot initialSnapshot;

private final Configuration hadoopConf;

private final SchemaProvider schemaProvider;
private final Optional<CatalogTable> catalogTable;

Expand Down Expand Up @@ -135,9 +135,10 @@ private SparkTable(
merged.putAll(userOptions);
this.options = Collections.unmodifiableMap(merged);

this.hadoopConf =
Configuration hadoopConf =
SparkSession.active().sessionState().newHadoopConfWithOptions(toScalaMap(options));
this.snapshotManager = new PathBasedSnapshotManager(tablePath, hadoopConf);
Engine kernelEngine = DefaultEngine.create(hadoopConf);
this.snapshotManager = SnapshotManagerFactory.create(tablePath, kernelEngine, catalogTable);
// Load the initial snapshot through the manager
this.initialSnapshot = snapshotManager.loadLatestSnapshot();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,12 @@ public class PathBasedSnapshotManager implements DeltaSnapshotManager {
private final Engine kernelEngine;

public PathBasedSnapshotManager(String tablePath, Configuration hadoopConf) {
this(tablePath, DefaultEngine.create(requireNonNull(hadoopConf, "hadoopConf is null")));
}

public PathBasedSnapshotManager(String tablePath, Engine kernelEngine) {
this.tablePath = requireNonNull(tablePath, "tablePath is null");
this.kernelEngine = DefaultEngine.create(requireNonNull(hadoopConf, "hadoopConf is null"));
this.kernelEngine = requireNonNull(kernelEngine, "kernelEngine is null");
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright (2025) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.spark.snapshot;

import io.delta.kernel.engine.Engine;
import io.delta.kernel.spark.snapshot.unitycatalog.UCManagedTableSnapshotManager;
import io.delta.kernel.spark.snapshot.unitycatalog.UCTableInfo;
import io.delta.kernel.spark.snapshot.unitycatalog.UCUtils;
import io.delta.kernel.unitycatalog.UCCatalogManagedClient;
import io.delta.storage.commit.uccommitcoordinator.UCClient;
import io.delta.storage.commit.uccommitcoordinator.UCTokenBasedRestClient;
import java.util.Optional;
import org.apache.spark.annotation.Experimental;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.catalog.CatalogTable;

/**
* Factory for creating {@link DeltaSnapshotManager} instances.
*
* <p>This factory determines the appropriate snapshot manager based on the table configuration:
*
* <ul>
* <li>For Unity Catalog managed tables: creates {@link UCManagedTableSnapshotManager}
* <li>For path-based tables: creates {@link PathBasedSnapshotManager}
* </ul>
*/
@Experimental
public final class SnapshotManagerFactory {

// Utility class - no instances
private SnapshotManagerFactory() {}

/**
* Creates a snapshot manager for the given table.
*
* @param tablePath the filesystem path to the Delta table
* @param kernelEngine the pre-configured Kernel {@link Engine} to use for table operations
* @param catalogTable optional Spark catalog table metadata
* @return a {@link DeltaSnapshotManager} appropriate for the table type
*/
public static DeltaSnapshotManager create(
String tablePath, Engine kernelEngine, Optional<CatalogTable> catalogTable) {

if (catalogTable.isPresent()) {
Optional<UCTableInfo> ucTableInfo =
UCUtils.extractTableInfo(catalogTable.get(), SparkSession.active());
if (ucTableInfo.isPresent()) {
return createUCManagedSnapshotManager(ucTableInfo.get(), kernelEngine);
}
}

// Default: path-based snapshot manager for non-UC tables
return new PathBasedSnapshotManager(tablePath, kernelEngine);
}

private static UCManagedTableSnapshotManager createUCManagedSnapshotManager(
UCTableInfo tableInfo, Engine kernelEngine) {
UCClient ucClient = new UCTokenBasedRestClient(tableInfo.getUcUri(), tableInfo.getUcToken());
UCCatalogManagedClient ucCatalogClient = new UCCatalogManagedClient(ucClient);
return new UCManagedTableSnapshotManager(ucCatalogClient, tableInfo, kernelEngine);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,12 @@
import io.delta.kernel.Snapshot;
import io.delta.kernel.engine.Engine;
import io.delta.kernel.internal.DeltaHistoryManager;
import io.delta.kernel.internal.SnapshotImpl;
import io.delta.kernel.internal.files.ParsedCatalogCommitData;
import io.delta.kernel.spark.exception.VersionNotFoundException;
import io.delta.kernel.spark.snapshot.DeltaSnapshotManager;
import io.delta.kernel.unitycatalog.UCCatalogManagedClient;
import java.util.List;
import java.util.Optional;

/**
Expand Down Expand Up @@ -57,35 +61,90 @@ public UCManagedTableSnapshotManager(

@Override
public Snapshot loadLatestSnapshot() {
throw new UnsupportedOperationException(
"UCManagedTableSnapshotManager.loadLatestSnapshot is not yet implemented");
return ucCatalogManagedClient.loadSnapshot(
engine,
tableId,
tablePath,
Optional.empty() /* versionOpt */,
Optional.empty() /* timestampOpt */);
}

@Override
public Snapshot loadSnapshotAt(long version) {
throw new UnsupportedOperationException(
"UCManagedTableSnapshotManager.loadSnapshotAt is not yet implemented");
return ucCatalogManagedClient.loadSnapshot(
engine, tableId, tablePath, Optional.of(version), Optional.empty() /* timestampOpt */);
}

/**
* Finds the active commit at a specific timestamp.
*
* <p>For UC-managed tables, this loads the latest snapshot and uses {@link
* DeltaHistoryManager#getActiveCommitAtTimestamp} to resolve the timestamp to a commit.
*/
@Override
public DeltaHistoryManager.Commit getActiveCommitAtTime(
long timestampMillis,
boolean canReturnLastCommit,
boolean mustBeRecreatable,
boolean canReturnEarliestCommit) {
throw new UnsupportedOperationException(
"UCManagedTableSnapshotManager.getActiveCommitAtTime is not yet implemented");
SnapshotImpl snapshot = (SnapshotImpl) loadLatestSnapshot();
List<ParsedCatalogCommitData> catalogCommits = snapshot.getLogSegment().getAllCatalogCommits();
return DeltaHistoryManager.getActiveCommitAtTimestamp(
engine,
snapshot,
snapshot.getLogPath(),
timestampMillis,
mustBeRecreatable,
canReturnLastCommit,
canReturnEarliestCommit,
catalogCommits);
}

/**
* Checks if a specific version exists and is accessible.
*
* <p>For UC-managed tables with CCv2, log files may be cleaned up, so we need to use
* DeltaHistoryManager to find the earliest available version based on filesystem state.
*/
@Override
public void checkVersionExists(long version, boolean mustBeRecreatable, boolean allowOutOfRange) {
throw new UnsupportedOperationException(
"UCManagedTableSnapshotManager.checkVersionExists is not yet implemented");
public void checkVersionExists(long version, boolean mustBeRecreatable, boolean allowOutOfRange)
throws VersionNotFoundException {
// Load latest to get the current version bounds
SnapshotImpl snapshot = (SnapshotImpl) loadLatestSnapshot();
long latestRatifiedVersion = snapshot.getVersion();

// Fast path: check upper bound before expensive filesystem operations
if ((version > latestRatifiedVersion) && !allowOutOfRange) {
throw new VersionNotFoundException(version, 0, latestRatifiedVersion);
}

// Compute earliestRatifiedCommitVersion from catalog commits
List<ParsedCatalogCommitData> catalogCommits = snapshot.getLogSegment().getAllCatalogCommits();
Optional<Long> earliestRatifiedCommitVersion =
catalogCommits.stream().map(ParsedCatalogCommitData::getVersion).min(Long::compare);

// Use DeltaHistoryManager to find earliest version based on filesystem state
long earliestVersion =
mustBeRecreatable
? DeltaHistoryManager.getEarliestRecreatableCommit(
engine, snapshot.getLogPath(), earliestRatifiedCommitVersion)
: DeltaHistoryManager.getEarliestDeltaFile(
engine, snapshot.getLogPath(), earliestRatifiedCommitVersion);

if (version < earliestVersion) {
throw new VersionNotFoundException(version, earliestVersion, latestRatifiedVersion);
}
}

@Override
public CommitRange getTableChanges(Engine engine, long startVersion, Optional<Long> endVersion) {
throw new UnsupportedOperationException(
"UCManagedTableSnapshotManager.getTableChanges is not yet implemented");
return ucCatalogManagedClient.loadCommitRange(
engine,
tableId,
tablePath,
Optional.of(startVersion) /* startVersionOpt */,
Optional.empty() /* startTimestampOpt */,
endVersion /* endVersionOpt */,
Optional.empty() /* endTimestampOpt */);
}
}
Loading
Loading