Skip to content

Commit 7b49f62

Browse files
committed
Implement UC snapshot manager and adapter wiring
1 parent cd66037 commit 7b49f62

File tree

4 files changed

+408
-88
lines changed

4 files changed

+408
-88
lines changed

kernel-spark/src/main/java/io/delta/kernel/spark/snapshot/CatalogManagedSnapshotManager.java

Lines changed: 106 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,13 @@
2020

2121
import io.delta.kernel.CommitRange;
2222
import io.delta.kernel.Snapshot;
23+
import io.delta.kernel.defaults.engine.DefaultEngine;
2324
import io.delta.kernel.engine.Engine;
25+
import io.delta.kernel.internal.DeltaHistoryManager;
26+
import io.delta.kernel.internal.SnapshotImpl;
27+
import io.delta.kernel.internal.files.ParsedCatalogCommitData;
28+
import io.delta.kernel.spark.exception.VersionNotFoundException;
29+
import java.util.List;
2430
import java.util.Optional;
2531

2632
import io.delta.kernel.internal.DeltaHistoryManager;
@@ -30,10 +36,10 @@
3036
import org.slf4j.LoggerFactory;
3137

3238
/**
33-
* Wireframe implementation of DeltaSnapshotManager for catalog-managed tables (e.g., UC).
39+
* Implementation of DeltaSnapshotManager for catalog-managed tables (e.g., UC).
3440
*
35-
* <p>All operations are intentionally stubbed in this PR. Functionality will be implemented in a
36-
* follow-up PR.
41+
* <p>This snapshot manager is agnostic to the underlying catalog implementation. It delegates to a
42+
* {@link ManagedCatalogAdapter}, keeping catalog-specific wiring out of the manager itself.
3743
*/
3844
@Experimental
3945
public class CatalogManagedSnapshotManager implements DeltaSnapshotManager, AutoCloseable {
@@ -43,6 +49,7 @@ public class CatalogManagedSnapshotManager implements DeltaSnapshotManager, Auto
4349
private final ManagedCatalogAdapter catalogAdapter;
4450
private final String tableId;
4551
private final String tablePath;
52+
private final Engine kernelEngine;
4653

4754
public CatalogManagedSnapshotManager(
4855
ManagedCatalogAdapter catalogAdapter,
@@ -54,44 +61,133 @@ public CatalogManagedSnapshotManager(
5461
this.tablePath = requireNonNull(tablePath, "tablePath is null");
5562
requireNonNull(hadoopConf, "hadoopConf is null");
5663

64+
this.kernelEngine = DefaultEngine.create(hadoopConf);
5765
logger.info(
5866
"Created CatalogManagedSnapshotManager for table {} at path {}", tableId, tablePath);
5967
}
6068

69+
/** Loads the latest snapshot of the catalog-managed Delta table. */
6170
@Override
6271
public Snapshot loadLatestSnapshot() {
63-
throw new UnsupportedOperationException("loadLatestSnapshot not implemented yet");
72+
return catalogAdapter.loadSnapshot(
73+
kernelEngine, /* versionOpt = */ Optional.empty(), /* timestampOpt = */ Optional.empty());
6474
}
6575

76+
/**
77+
* Loads a specific version of the Unity Catalog managed Delta table.
78+
*
79+
* @param version the version to load (must be >= 0)
80+
* @return the snapshot at the specified version
81+
*/
6682
@Override
6783
public Snapshot loadSnapshotAt(long version) {
6884
checkArgument(version >= 0, "version must be non-negative");
69-
throw new UnsupportedOperationException("loadSnapshotAt not implemented yet");
85+
return catalogAdapter.loadSnapshot(
86+
kernelEngine, Optional.of(version), /* timestampOpt = */ Optional.empty());
7087
}
7188

89+
/**
90+
* Finds the active commit at a specific timestamp.
91+
*
92+
* <p>For catalog-managed tables, this method retrieves ratified commits from the catalog and uses
93+
* {@link DeltaHistoryManager#getActiveCommitAtTimestamp} to find the commit that was active at
94+
* the specified timestamp.
95+
*
96+
* @param timestampMillis the timestamp in milliseconds since epoch (UTC)
97+
* @param canReturnLastCommit if true, returns the last commit if the timestamp is after all
98+
* commits; if false, throws an exception
99+
* @param mustBeRecreatable if true, only considers commits that can be fully recreated from
100+
* available log files; if false, considers all commits
101+
* @param canReturnEarliestCommit if true, returns the earliest commit if the timestamp is before
102+
* all commits; if false, throws an exception
103+
* @return the commit that was active at the specified timestamp
104+
*/
72105
@Override
73106
public DeltaHistoryManager.Commit getActiveCommitAtTime(
74107
long timestampMillis,
75108
boolean canReturnLastCommit,
76109
boolean mustBeRecreatable,
77110
boolean canReturnEarliestCommit) {
78-
throw new UnsupportedOperationException("getActiveCommitAtTime not implemented yet");
111+
// Load the latest snapshot for timestamp resolution
112+
SnapshotImpl latestSnapshot = (SnapshotImpl) loadLatestSnapshot();
113+
114+
// Extract catalog commits from the snapshot's log segment (avoids redundant UC call)
115+
List<ParsedCatalogCommitData> catalogCommits =
116+
latestSnapshot.getLogSegment().getAllCatalogCommits();
117+
118+
return DeltaHistoryManager.getActiveCommitAtTimestamp(
119+
kernelEngine,
120+
latestSnapshot,
121+
latestSnapshot.getLogPath(),
122+
timestampMillis,
123+
mustBeRecreatable,
124+
canReturnLastCommit,
125+
canReturnEarliestCommit,
126+
catalogCommits);
79127
}
80128

129+
/**
130+
* Checks if a specific version exists and is accessible.
131+
*
132+
* <p>For catalog-managed tables, versions are assumed to be contiguous (enforced by the catalog
133+
* coordinator). This method performs a lightweight check by verifying the version is within the
134+
* valid range [0, latestRatifiedVersion].
135+
*
136+
* <p>This approach is consistent with the existing Spark Delta behavior in {@code
137+
* DeltaHistoryManager.checkVersionExists} which also assumes contiguous commits.
138+
*
139+
* @param version the version to check
140+
* @param mustBeRecreatable if true, requires that the version can be fully recreated from
141+
* available log files. For catalog-managed tables, all versions are recreatable since the
142+
* catalog maintains the complete commit history.
143+
* @param allowOutOfRange if true, allows versions greater than the latest version without
144+
* throwing an exception; if false, throws exception for out-of-range versions
145+
* @throws VersionNotFoundException if the version is not available
146+
*/
81147
@Override
82-
public void checkVersionExists(long version, boolean mustBeRecreatable, boolean allowOutOfRange) {
148+
public void checkVersionExists(long version, boolean mustBeRecreatable, boolean allowOutOfRange)
149+
throws VersionNotFoundException {
83150
checkArgument(version >= 0, "version must be non-negative");
84-
throw new UnsupportedOperationException("checkVersionExists not implemented yet");
151+
152+
// For catalog-managed tables, the earliest recreatable version is 0 since the catalog
153+
// maintains the complete commit history
154+
long earliestVersion = 0;
155+
long latestVersion = catalogAdapter.getLatestRatifiedVersion();
156+
157+
if (version < earliestVersion || ((version > latestVersion) && !allowOutOfRange)) {
158+
throw new VersionNotFoundException(version, earliestVersion, latestVersion);
159+
}
85160
}
86161

162+
/**
163+
* Gets a range of table changes between versions.
164+
*
165+
* <p><strong>Note:</strong> This operation delegates to the managed commit client.
166+
*
167+
* @throws UnsupportedOperationException if not yet implemented for catalog-managed tables
168+
*/
87169
@Override
88170
public CommitRange getTableChanges(Engine engine, long startVersion, Optional<Long> endVersion) {
89-
throw new UnsupportedOperationException("getTableChanges not implemented yet");
171+
requireNonNull(engine, "engine is null");
172+
checkArgument(startVersion >= 0, "startVersion must be non-negative");
173+
endVersion.ifPresent(v -> checkArgument(v >= 0, "endVersion must be non-negative"));
174+
175+
return catalogAdapter.loadCommitRange(
176+
engine,
177+
Optional.of(startVersion),
178+
/* startTimestampOpt = */ Optional.empty(),
179+
endVersion,
180+
/* endTimestampOpt = */ Optional.empty());
90181
}
91182

183+
/**
184+
* Closes the UC client and releases resources.
185+
*
186+
* <p>This method should be called when the snapshot manager is no longer needed. Prefer using
187+
* try-with-resources to ensure proper cleanup.
188+
*/
92189
@Override
93190
public void close() {
94-
// no-op in wireframe; adapter may implement close in the future
95191
try {
96192
catalogAdapter.close();
97193
logger.info("Closed CatalogManagedSnapshotManager for table {}", tableId);

kernel-spark/src/main/java/io/delta/kernel/spark/snapshot/DeltaSnapshotManagerFactory.java

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
import static java.util.Objects.requireNonNull;
1919

20+
import io.delta.kernel.spark.snapshot.unitycatalog.UnityCatalogAdapter;
21+
import java.util.Optional;
2022
import org.apache.hadoop.conf.Configuration;
2123
import org.apache.spark.annotation.Experimental;
2224
import org.apache.spark.sql.SparkSession;
@@ -78,16 +80,36 @@ public static DeltaSnapshotManager fromPath(String tablePath, Configuration hado
7880
/**
7981
* Creates a snapshot manager from catalog table metadata.
8082
*
81-
* <p>Wire-up is intentionally deferred; this skeleton method currently throws until implemented
82-
* in a follow-up PR.
83+
* <p>Automatically selects {@link CatalogManagedSnapshotManager} for Unity Catalog managed
84+
* tables, or falls back to {@link PathBasedSnapshotManager} for regular tables.
8385
*
84-
* @throws UnsupportedOperationException always, until UC wiring is added
86+
* @param catalogTable Spark catalog table metadata
87+
* @param spark SparkSession for resolving Unity Catalog configurations
88+
* @param hadoopConf Hadoop configuration for the Delta Kernel engine
89+
* @return appropriate snapshot manager implementation
90+
* @throws NullPointerException if catalogTable, spark, or hadoopConf is null
91+
* @throws IllegalArgumentException if catalogTable is UC-managed but configuration is invalid
8592
*/
8693
public static DeltaSnapshotManager fromCatalogTable(
8794
CatalogTable catalogTable, SparkSession spark, Configuration hadoopConf) {
8895
requireNonNull(catalogTable, "catalogTable is null");
8996
requireNonNull(spark, "spark is null");
9097
requireNonNull(hadoopConf, "hadoopConf is null");
91-
throw new UnsupportedOperationException("UC catalog wiring not implemented in skeleton");
98+
99+
Optional<ManagedCatalogAdapter> adapterOpt =
100+
UnityCatalogAdapter.fromCatalog(catalogTable, spark);
101+
102+
if (adapterOpt.isPresent()) {
103+
ManagedCatalogAdapter adapter = adapterOpt.get();
104+
// Cast to UnityCatalogAdapter to access tableId and tablePath
105+
UnityCatalogAdapter ucAdapter = (UnityCatalogAdapter) adapter;
106+
String tableId = ucAdapter.getTableId();
107+
String tablePath = ucAdapter.getTablePath();
108+
return new CatalogManagedSnapshotManager(adapter, tableId, tablePath, hadoopConf);
109+
}
110+
111+
// Fallback to path-based snapshot manager
112+
String tablePath = catalogTable.location().toString();
113+
return new PathBasedSnapshotManager(tablePath, hadoopConf);
92114
}
93115
}

kernel-spark/src/main/java/io/delta/kernel/spark/snapshot/unitycatalog/UnityCatalogAdapter.java

Lines changed: 83 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -20,46 +20,73 @@
2020
import io.delta.kernel.CommitRange;
2121
import io.delta.kernel.Snapshot;
2222
import io.delta.kernel.engine.Engine;
23+
import io.delta.kernel.internal.files.ParsedCatalogCommitData;
2324
import io.delta.kernel.internal.files.ParsedLogData;
2425
import io.delta.kernel.spark.snapshot.ManagedCatalogAdapter;
26+
import io.delta.kernel.unitycatalog.UCCatalogManagedClient;
27+
import io.delta.storage.commit.Commit;
28+
import io.delta.storage.commit.GetCommitsResponse;
29+
import io.delta.storage.commit.uccommitcoordinator.UCClient;
30+
import io.delta.storage.commit.uccommitcoordinator.UCCommitCoordinatorException;
31+
import io.delta.storage.commit.uccommitcoordinator.UCTokenBasedRestClient;
32+
import java.io.IOException;
33+
import java.io.UncheckedIOException;
34+
import java.util.Comparator;
2535
import java.util.List;
2636
import java.util.Optional;
37+
import java.util.stream.Collectors;
38+
import org.apache.hadoop.fs.Path;
2739
import org.apache.spark.sql.SparkSession;
2840
import org.apache.spark.sql.catalyst.catalog.CatalogTable;
2941

30-
/**
31-
* UC-backed implementation shell of {@link ManagedCatalogAdapter}.
32-
*
33-
* <p>Methods are intentionally stubbed in this wireframe PR and will be implemented in a follow-up
34-
* once UC operations are enabled.
35-
*/
42+
/** UC-backed implementation of {@link ManagedCatalogAdapter}. */
3643
public final class UnityCatalogAdapter implements ManagedCatalogAdapter {
3744

3845
private final String tableId;
3946
private final String tablePath;
47+
private final UCClient ucClient;
48+
private final UCCatalogManagedClient ucManagedClient;
4049

41-
public UnityCatalogAdapter(String tableId, String tablePath) {
50+
public UnityCatalogAdapter(String tableId, String tablePath, UCClient ucClient) {
4251
this.tableId = requireNonNull(tableId, "tableId is null");
4352
this.tablePath = requireNonNull(tablePath, "tablePath is null");
53+
this.ucClient = requireNonNull(ucClient, "ucClient is null");
54+
this.ucManagedClient = new UCCatalogManagedClient(ucClient);
4455
}
4556

4657
/**
4758
* Creates adapter from Spark catalog table (convenience method).
4859
*
4960
* <p>Extracts UC connection info from Spark metadata and delegates to {@link
5061
* #fromConnectionInfo}.
62+
*
63+
* @param catalogTable Spark catalog table metadata
64+
* @param spark SparkSession for resolving Unity Catalog configurations
65+
* @return adapter if table is UC-managed, empty otherwise
66+
* @throws IllegalArgumentException if table is UC-managed but configuration is invalid
5167
*/
5268
public static Optional<ManagedCatalogAdapter> fromCatalog(
5369
CatalogTable catalogTable, SparkSession spark) {
5470
requireNonNull(catalogTable, "catalogTable is null");
5571
requireNonNull(spark, "spark is null");
56-
throw new UnsupportedOperationException("UC wiring deferred to implementation PR");
72+
73+
return SparkUnityCatalogUtils.extractConnectionInfo(catalogTable, spark)
74+
.map(UnityCatalogAdapter::fromConnectionInfo);
5775
}
5876

59-
/** Creates adapter from connection info (no Spark dependency). */
77+
/**
78+
* Creates adapter from connection info (no Spark dependency).
79+
*
80+
* <p>This method allows creating a UC adapter without Spark dependencies if you have connection
81+
* information directly.
82+
*
83+
* @param info Unity Catalog connection information
84+
* @return adapter instance
85+
*/
6086
public static ManagedCatalogAdapter fromConnectionInfo(UnityCatalogConnectionInfo info) {
6187
requireNonNull(info, "info is null");
62-
return new UnityCatalogAdapter(info.getTableId(), info.getTablePath());
88+
UCClient client = new UCTokenBasedRestClient(info.getEndpoint(), info.getToken());
89+
return new UnityCatalogAdapter(info.getTableId(), info.getTablePath(), client);
6390
}
6491

6592
public String getTableId() {
@@ -73,7 +100,7 @@ public String getTablePath() {
73100
@Override
74101
public Snapshot loadSnapshot(
75102
Engine engine, Optional<Long> versionOpt, Optional<Long> timestampOpt) {
76-
throw new UnsupportedOperationException("UC snapshot loading not implemented yet");
103+
return ucManagedClient.loadSnapshot(engine, tableId, tablePath, versionOpt, timestampOpt);
77104
}
78105

79106
@Override
@@ -83,21 +110,62 @@ public CommitRange loadCommitRange(
83110
Optional<Long> startTimestampOpt,
84111
Optional<Long> endVersionOpt,
85112
Optional<Long> endTimestampOpt) {
86-
throw new UnsupportedOperationException("UC commit range loading not implemented yet");
113+
return ucManagedClient.loadCommitRange(
114+
engine,
115+
tableId,
116+
tablePath,
117+
startVersionOpt,
118+
startTimestampOpt,
119+
endVersionOpt,
120+
endTimestampOpt);
87121
}
88122

89123
@Override
90124
public List<ParsedLogData> getRatifiedCommits(Optional<Long> endVersionOpt) {
91-
throw new UnsupportedOperationException("UC commit listing not implemented yet");
125+
GetCommitsResponse response = getCommitsFromUC(endVersionOpt);
126+
return response.getCommits().stream()
127+
.sorted(Comparator.comparingLong(Commit::getVersion))
128+
.map(
129+
commit ->
130+
ParsedCatalogCommitData.forFileStatus(
131+
hadoopFileStatusToKernelFileStatus(commit.getFileStatus())))
132+
.collect(Collectors.toList());
92133
}
93134

94135
@Override
95136
public long getLatestRatifiedVersion() {
96-
throw new UnsupportedOperationException("UC ratified version lookup not implemented yet");
137+
GetCommitsResponse response = getCommitsFromUC(Optional.empty());
138+
long maxRatified = response.getLatestTableVersion();
139+
// UC returns -1 when only 0.json exists (CREATE not yet registered with UC)
140+
return maxRatified == -1 ? 0 : maxRatified;
97141
}
98142

99143
@Override
100144
public void close() {
101-
// no-op in wireframe
145+
try {
146+
ucClient.close();
147+
} catch (Exception e) {
148+
// Swallow close errors to avoid disrupting caller cleanup
149+
}
150+
}
151+
152+
private GetCommitsResponse getCommitsFromUC(Optional<Long> endVersionOpt) {
153+
try {
154+
return ucClient.getCommits(
155+
tableId,
156+
new Path(tablePath).toUri(),
157+
Optional.empty(), // startVersion
158+
endVersionOpt);
159+
} catch (IOException e) {
160+
throw new UncheckedIOException(e);
161+
} catch (UCCommitCoordinatorException e) {
162+
throw new RuntimeException(e);
163+
}
164+
}
165+
166+
private static io.delta.kernel.utils.FileStatus hadoopFileStatusToKernelFileStatus(
167+
org.apache.hadoop.fs.FileStatus hadoopFS) {
168+
return io.delta.kernel.utils.FileStatus.of(
169+
hadoopFS.getPath().toString(), hadoopFS.getLen(), hadoopFS.getModificationTime());
102170
}
103171
}

0 commit comments

Comments
 (0)