Skip to content

Commit ef93d13

Browse files
committed
catalogmanagedsnapshot
1 parent 6d5cb0e commit ef93d13

File tree

3 files changed

+414
-2
lines changed

3 files changed

+414
-2
lines changed
Lines changed: 247 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,247 @@
1+
/*
2+
* Copyright (2025) The Delta Lake Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.delta.kernel.spark.snapshot;
17+
18+
import static java.util.Objects.requireNonNull;
19+
20+
import io.delta.kernel.CommitRange;
21+
import io.delta.kernel.CommitRangeBuilder;
22+
import io.delta.kernel.Snapshot;
23+
import io.delta.kernel.SnapshotBuilder;
24+
import io.delta.kernel.TableManager;
25+
import io.delta.kernel.defaults.engine.DefaultEngine;
26+
import io.delta.kernel.engine.Engine;
27+
import io.delta.kernel.internal.DeltaHistoryManager;
28+
import io.delta.kernel.internal.SnapshotImpl;
29+
import io.delta.kernel.internal.files.ParsedCatalogCommitData;
30+
import io.delta.kernel.internal.files.ParsedLogData;
31+
import io.delta.kernel.spark.exception.VersionNotFoundException;
32+
import io.delta.kernel.spark.unity.UnityCatalogClientFactory;
33+
import io.delta.kernel.spark.utils.CatalogTableUtils;
34+
import io.delta.kernel.utils.FileStatus;
35+
import io.delta.storage.commit.Commit;
36+
import io.delta.storage.commit.GetCommitsResponse;
37+
import io.delta.storage.commit.uccommitcoordinator.UCClient;
38+
import io.delta.storage.commit.uccommitcoordinator.UCCommitCoordinatorClient;
39+
import io.delta.storage.commit.uccommitcoordinator.UCCommitCoordinatorException;
40+
import java.io.IOException;
41+
import java.net.URI;
42+
import java.util.Comparator;
43+
import java.util.List;
44+
import java.util.Map;
45+
import java.util.Optional;
46+
import java.util.concurrent.atomic.AtomicReference;
47+
import java.util.stream.Collectors;
48+
import org.apache.hadoop.conf.Configuration;
49+
import org.apache.hadoop.fs.Path;
50+
import org.apache.spark.sql.catalyst.catalog.CatalogTable;
51+
import org.slf4j.Logger;
52+
import org.slf4j.LoggerFactory;
53+
54+
/**
55+
* {@link DeltaSnapshotManager} implementation backed by Unity Catalog commit APIs (CCv2).
56+
*
57+
* <p>This manager defers snapshot reconstruction and commit enumeration to {@link
58+
* UCCatalogManagedClient}, ensuring staged-but-unpublished commits returned by the catalog are
59+
* honoured for snapshot, time-travel and streaming queries.
60+
*/
61+
public final class CatalogManagedSnapshotManager implements DeltaSnapshotManager {
62+
63+
private static final Logger LOG = LoggerFactory.getLogger(CatalogManagedSnapshotManager.class);
64+
65+
private final String tablePath;
66+
private final String ucTableId;
67+
private final UCClient ucClient;
68+
private final Engine kernelEngine;
69+
private final AtomicReference<Snapshot> latestSnapshotRef = new AtomicReference<>();
70+
71+
CatalogManagedSnapshotManager(
72+
String tablePath,
73+
CatalogTable catalogTable,
74+
UnityCatalogClientFactory.UnityCatalogClient unityCatalogClient,
75+
Configuration hadoopConf) {
76+
this(tablePath, catalogTable, unityCatalogClient.getUcClient(), hadoopConf);
77+
}
78+
79+
CatalogManagedSnapshotManager(
80+
String tablePath, CatalogTable catalogTable, UCClient ucClient, Configuration hadoopConf) {
81+
this.tablePath = requireNonNull(tablePath, "tablePath is null");
82+
requireNonNull(catalogTable, "catalogTable is null");
83+
this.ucTableId = extractUcTableId(catalogTable);
84+
this.ucClient = requireNonNull(ucClient, "ucClient is null");
85+
this.kernelEngine = DefaultEngine.create(requireNonNull(hadoopConf, "hadoopConf is null"));
86+
}
87+
88+
private static String extractUcTableId(CatalogTable catalogTable) {
89+
Map<String, String> storageProperties = CatalogTableUtils.getStorageProperties(catalogTable);
90+
String tableId = storageProperties.get(UCCommitCoordinatorClient.UC_TABLE_ID_KEY);
91+
if (tableId == null || tableId.trim().isEmpty()) {
92+
throw new IllegalStateException(
93+
"Unity Catalog managed table is missing '"
94+
+ UCCommitCoordinatorClient.UC_TABLE_ID_KEY
95+
+ "' storage property.");
96+
}
97+
return tableId.trim();
98+
}
99+
100+
@Override
101+
public Snapshot loadLatestSnapshot() {
102+
Snapshot snapshot = buildSnapshot(Optional.empty());
103+
latestSnapshotRef.set(snapshot);
104+
return snapshot;
105+
}
106+
107+
@Override
108+
public Snapshot loadSnapshotAt(long version) {
109+
Snapshot snapshot = buildSnapshot(Optional.of(version));
110+
latestSnapshotRef.set(snapshot);
111+
return snapshot;
112+
}
113+
114+
@Override
115+
public DeltaHistoryManager.Commit getActiveCommitAtTime(
116+
long timestampMillis,
117+
boolean canReturnLastCommit,
118+
boolean mustBeRecreatable,
119+
boolean canReturnEarliestCommit) {
120+
SnapshotImpl snapshot = ensureSnapshot();
121+
List<ParsedCatalogCommitData> catalogCommits =
122+
fetchRatifiedCatalogCommits(Optional.of(snapshot.getVersion()));
123+
124+
return DeltaHistoryManager.getActiveCommitAtTimestamp(
125+
kernelEngine,
126+
snapshot,
127+
snapshot.getLogPath(),
128+
timestampMillis,
129+
mustBeRecreatable,
130+
canReturnLastCommit,
131+
canReturnEarliestCommit,
132+
catalogCommits);
133+
}
134+
135+
@Override
136+
public void checkVersionExists(long version, boolean mustBeRecreatable, boolean allowOutOfRange)
137+
throws VersionNotFoundException {
138+
SnapshotImpl snapshot = ensureSnapshot();
139+
List<ParsedCatalogCommitData> catalogCommits =
140+
fetchRatifiedCatalogCommits(Optional.of(snapshot.getVersion()));
141+
142+
Optional<Long> earliestRatifiedVersion =
143+
catalogCommits.stream().map(ParsedLogData::getVersion).min(Long::compareTo);
144+
145+
long earliestAvailable =
146+
mustBeRecreatable
147+
? DeltaHistoryManager.getEarliestRecreatableCommit(
148+
kernelEngine, snapshot.getLogPath(), earliestRatifiedVersion)
149+
: DeltaHistoryManager.getEarliestDeltaFile(
150+
kernelEngine, snapshot.getLogPath(), earliestRatifiedVersion);
151+
152+
long latest = snapshot.getVersion();
153+
if (version < earliestAvailable || ((version > latest) && !allowOutOfRange)) {
154+
throw new VersionNotFoundException(version, earliestAvailable, latest);
155+
}
156+
}
157+
158+
@Override
159+
public CommitRange getTableChanges(Engine engine, long startVersion, Optional<Long> endVersion) {
160+
requireNonNull(engine, "engine is null");
161+
requireNonNull(endVersion, "endVersion is null");
162+
List<ParsedCatalogCommitData> catalogCommits = fetchRatifiedCatalogCommits(endVersion);
163+
164+
CommitRangeBuilder builder =
165+
TableManager.loadCommitRange(tablePath)
166+
.withStartBoundary(CommitRangeBuilder.CommitBoundary.atVersion(startVersion))
167+
.withLogData(
168+
catalogCommits.stream()
169+
.map(commit -> (ParsedLogData) commit)
170+
.collect(Collectors.toList()));
171+
172+
endVersion.ifPresent(
173+
version -> builder.withEndBoundary(CommitRangeBuilder.CommitBoundary.atVersion(version)));
174+
175+
return builder.build(engine);
176+
}
177+
178+
private SnapshotImpl ensureSnapshot() {
179+
Snapshot snapshot = latestSnapshotRef.get();
180+
if (snapshot == null) {
181+
snapshot = loadLatestSnapshot();
182+
}
183+
return (SnapshotImpl) snapshot;
184+
}
185+
186+
private List<ParsedCatalogCommitData> fetchRatifiedCatalogCommits(Optional<Long> endVersionOpt) {
187+
GetCommitsResponse response = fetchCommits(endVersionOpt);
188+
List<ParsedCatalogCommitData> catalogCommits = convertToCatalogCommits(response.getCommits());
189+
if (catalogCommits.isEmpty()) {
190+
LOG.debug("No catalog commits returned for Unity Catalog table '{}'.", ucTableId);
191+
}
192+
return catalogCommits;
193+
}
194+
195+
private Snapshot buildSnapshot(Optional<Long> versionOpt) {
196+
GetCommitsResponse response = fetchCommits(versionOpt);
197+
long catalogVersion = resolveCatalogVersion(response.getLatestTableVersion());
198+
versionOpt.ifPresent(version -> validateRequestedVersion(version, catalogVersion));
199+
200+
List<ParsedLogData> logData =
201+
convertToCatalogCommits(response.getCommits()).stream()
202+
.map(commit -> (ParsedLogData) commit)
203+
.collect(Collectors.toList());
204+
205+
SnapshotBuilder builder = TableManager.loadSnapshot(tablePath);
206+
if (versionOpt.isPresent()) {
207+
builder = builder.atVersion(versionOpt.get());
208+
}
209+
return builder.withLogData(logData).build(kernelEngine);
210+
}
211+
212+
private void validateRequestedVersion(long versionToLoad, long catalogVersion) {
213+
if (versionToLoad > catalogVersion) {
214+
throw new IllegalArgumentException(
215+
String.format(
216+
"[%s] Cannot load table version %d as the latest version ratified by UC is %d",
217+
ucTableId, versionToLoad, catalogVersion));
218+
}
219+
}
220+
221+
private long resolveCatalogVersion(long latestTableVersion) {
222+
return latestTableVersion == -1 ? 0 : latestTableVersion;
223+
}
224+
225+
private GetCommitsResponse fetchCommits(Optional<Long> endVersionOpt) {
226+
try {
227+
URI tableUri = new Path(tablePath).toUri();
228+
return ucClient.getCommits(ucTableId, tableUri, Optional.empty(), endVersionOpt);
229+
} catch (IOException | UCCommitCoordinatorException e) {
230+
throw new RuntimeException(
231+
"Failed to retrieve Unity Catalog commits for table " + ucTableId, e);
232+
}
233+
}
234+
235+
private List<ParsedCatalogCommitData> convertToCatalogCommits(List<Commit> commits) {
236+
return commits.stream()
237+
.sorted(Comparator.comparingLong(Commit::getVersion))
238+
.map(
239+
commit ->
240+
ParsedCatalogCommitData.forFileStatus(
241+
FileStatus.of(
242+
commit.getFileStatus().getPath().toString(),
243+
commit.getFileStatus().getLen(),
244+
commit.getFileStatus().getModificationTime())))
245+
.collect(Collectors.toList());
246+
}
247+
}

kernel-spark/src/main/java/io/delta/kernel/spark/snapshot/SnapshotManagerFactory.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,9 +67,10 @@ public static DeltaSnapshotManager create(
6767
&& CatalogTableUtils.isUnityCatalogManagedTable(catalogTable.get())
6868
&& unityCatalogClient.isPresent()) {
6969
LOG.debug(
70-
"Unity Catalog-managed table '{}' detected. Falling back to PathBasedSnapshotManager "
71-
+ "until catalog-managed support is wired.",
70+
"Unity Catalog-managed table '{}' detected. Using CatalogManagedSnapshotManager.",
7271
identifier);
72+
return new CatalogManagedSnapshotManager(
73+
tablePath, catalogTable.get(), unityCatalogClient.get(), hadoopConf);
7374
}
7475

7576
return new PathBasedSnapshotManager(tablePath, hadoopConf);

0 commit comments

Comments
 (0)