-
Notifications
You must be signed in to change notification settings - Fork 2k
[DSv2] Create UC client #5520
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
[DSv2] Create UC client #5520
Changes from 46 commits
622cf14
857aa90
7690d84
6a1d2a5
a57a342
eeadbdb
030b548
85c45eb
1ad766d
d6c5b47
12d1cfd
b2b4428
0832138
5830196
8b5c962
4463c3e
fed1e0a
f88f46a
aab04bb
ec1a35b
1b1fe59
6686a97
26ad1cd
3f3cd9c
f4d4f13
6359268
4571d96
f66e0cd
0bd57d3
fc30213
3bd4357
2ad9fc0
6126080
f88a47a
54bf26d
ec45077
d6d604e
5635ab6
2d056e5
841e46f
4a1d8fa
2c69260
f06eee3
2f792d8
82c3837
80d3abf
72ab1a5
2a6f689
63e6137
270fd8c
b2e366c
f677e9c
31a7b61
a3ff091
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,193 @@ | ||
| /* | ||
| * Copyright (2025) The Delta Lake Project Authors. | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package io.delta.kernel.spark.snapshot; | ||
|
|
||
| import static io.delta.kernel.internal.util.Preconditions.checkArgument; | ||
| import static java.util.Objects.requireNonNull; | ||
|
|
||
| import io.delta.kernel.CommitRange; | ||
| import io.delta.kernel.Snapshot; | ||
| import io.delta.kernel.defaults.engine.DefaultEngine; | ||
| import io.delta.kernel.engine.Engine; | ||
| import io.delta.kernel.internal.DeltaHistoryManager; | ||
| import io.delta.kernel.internal.SnapshotImpl; | ||
| import io.delta.kernel.internal.files.ParsedCatalogCommitData; | ||
| import io.delta.kernel.internal.files.ParsedLogData; | ||
| import io.delta.kernel.spark.exception.VersionNotFoundException; | ||
| import java.util.List; | ||
| import java.util.Optional; | ||
| import java.util.stream.Collectors; | ||
| import org.apache.hadoop.conf.Configuration; | ||
| import org.apache.spark.annotation.Experimental; | ||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
||
| /** | ||
| * Implementation of DeltaSnapshotManager for catalog-managed tables (e.g., UC). | ||
| * | ||
| * <p>This snapshot manager is agnostic to the underlying catalog implementation. It delegates to a | ||
| * {@link ManagedCommitClient}, keeping catalog-specific wiring out of the manager itself. | ||
| */ | ||
| @Experimental | ||
| public class CatalogManagedSnapshotManager implements DeltaSnapshotManager, AutoCloseable { | ||
|
|
||
| private static final Logger logger = LoggerFactory.getLogger(CatalogManagedSnapshotManager.class); | ||
|
|
||
| private final ManagedCommitClient commitClient; | ||
| private final Engine kernelEngine; | ||
|
|
||
| public CatalogManagedSnapshotManager(ManagedCommitClient commitClient, Configuration hadoopConf) { | ||
| this.commitClient = requireNonNull(commitClient, "commitClient is null"); | ||
| requireNonNull(hadoopConf, "hadoopConf is null"); | ||
|
|
||
| this.kernelEngine = DefaultEngine.create(hadoopConf); | ||
| logger.info( | ||
| "Created CatalogManagedSnapshotManager for table {} at path {}", | ||
| commitClient.getTableId(), | ||
| commitClient.getTablePath()); | ||
| } | ||
|
|
||
| /** Loads the latest snapshot of the catalog-managed Delta table. */ | ||
| @Override | ||
| public Snapshot loadLatestSnapshot() { | ||
| return commitClient.loadSnapshot(kernelEngine, Optional.empty(), Optional.empty()); | ||
| } | ||
|
|
||
| /** | ||
| * Loads a specific version of the Unity Catalog managed Delta table. | ||
| * | ||
| * @param version the version to load (must be >= 0) | ||
| * @return the snapshot at the specified version | ||
| */ | ||
| @Override | ||
| public Snapshot loadSnapshotAt(long version) { | ||
| checkArgument(version >= 0, "version must be non-negative"); | ||
| return commitClient.loadSnapshot(kernelEngine, Optional.of(version), Optional.empty()); | ||
| } | ||
|
|
||
| /** | ||
| * Finds the active commit at a specific timestamp. | ||
| * | ||
| * <p>For catalog-managed tables, this method retrieves ratified commits from the catalog and uses | ||
| * {@link DeltaHistoryManager#getActiveCommitAtTimestamp} to find the commit that was active at | ||
| * the specified timestamp. | ||
| * | ||
| * @param timestampMillis the timestamp in milliseconds since epoch (UTC) | ||
| * @param canReturnLastCommit if true, returns the last commit if the timestamp is after all | ||
| * commits; if false, throws an exception | ||
| * @param mustBeRecreatable if true, only considers commits that can be fully recreated from | ||
| * available log files; if false, considers all commits | ||
| * @param canReturnEarliestCommit if true, returns the earliest commit if the timestamp is before | ||
| * all commits; if false, throws an exception | ||
| * @return the commit that was active at the specified timestamp | ||
| */ | ||
| @Override | ||
| public DeltaHistoryManager.Commit getActiveCommitAtTime( | ||
| long timestampMillis, | ||
| boolean canReturnLastCommit, | ||
| boolean mustBeRecreatable, | ||
| boolean canReturnEarliestCommit) { | ||
| // Load the latest snapshot for timestamp resolution | ||
| SnapshotImpl latestSnapshot = (SnapshotImpl) loadLatestSnapshot(); | ||
|
|
||
| // Get ratified commits from the catalog | ||
| List<ParsedLogData> logData = commitClient.getRatifiedCommits(Optional.empty()); | ||
|
|
||
| // Convert to ParsedCatalogCommitData for DeltaHistoryManager | ||
| List<ParsedCatalogCommitData> catalogCommits = | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. loadLatestSnapshot() will call
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah yes, I see the codepath now for getting commits and the latest snapshot. Fixed by extracting catalog commits directly from the snapshot's log segment instead of calling UC twice: List<ParsedCatalogCommitData> catalogCommits = latestSnapshot.getLogSegment().getAllCatalogCommits(); |
||
| logData.stream() | ||
| .filter(data -> data instanceof ParsedCatalogCommitData) | ||
| .map(data -> (ParsedCatalogCommitData) data) | ||
| .collect(Collectors.toList()); | ||
|
|
||
| return DeltaHistoryManager.getActiveCommitAtTimestamp( | ||
| kernelEngine, | ||
| latestSnapshot, | ||
| latestSnapshot.getLogPath(), | ||
| timestampMillis, | ||
| mustBeRecreatable, | ||
| canReturnLastCommit, | ||
| canReturnEarliestCommit, | ||
| catalogCommits); | ||
| } | ||
|
|
||
| /** | ||
| * Checks if a specific version exists and is accessible. | ||
| * | ||
| * <p>For catalog-managed tables, versions are assumed to be contiguous (enforced by the catalog | ||
| * coordinator). This method performs a lightweight check by verifying the version is within the | ||
| * valid range [0, latestRatifiedVersion]. | ||
| * | ||
| * <p>This approach is consistent with the existing Spark Delta behavior in {@code | ||
| * DeltaHistoryManager.checkVersionExists} which also assumes contiguous commits. | ||
| * | ||
| * @param version the version to check | ||
| * @param mustBeRecreatable if true, requires that the version can be fully recreated from | ||
| * available log files. For catalog-managed tables, all versions are recreatable since the | ||
| * catalog maintains the complete commit history. | ||
| * @param allowOutOfRange if true, allows versions greater than the latest version without | ||
| * throwing an exception; if false, throws exception for out-of-range versions | ||
| * @throws VersionNotFoundException if the version is not available | ||
| */ | ||
| @Override | ||
| public void checkVersionExists(long version, boolean mustBeRecreatable, boolean allowOutOfRange) | ||
| throws VersionNotFoundException { | ||
| checkArgument(version >= 0, "version must be non-negative"); | ||
|
|
||
| // For catalog-managed tables, the earliest recreatable version is 0 since the catalog | ||
| // maintains the complete commit history | ||
| long earliestVersion = 0; | ||
| long latestVersion = commitClient.getLatestRatifiedVersion(); | ||
|
|
||
| if (version < earliestVersion || ((version > latestVersion) && !allowOutOfRange)) { | ||
| throw new VersionNotFoundException(version, earliestVersion, latestVersion); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Gets a range of table changes between versions. | ||
| * | ||
| * <p><strong>Note:</strong> This operation delegates to the managed commit client. | ||
| * | ||
| * @throws UnsupportedOperationException if not yet implemented for catalog-managed tables | ||
| */ | ||
| @Override | ||
| public CommitRange getTableChanges(Engine engine, long startVersion, Optional<Long> endVersion) { | ||
| requireNonNull(engine, "engine is null"); | ||
| checkArgument(startVersion >= 0, "startVersion must be non-negative"); | ||
| endVersion.ifPresent(v -> checkArgument(v >= 0, "endVersion must be non-negative")); | ||
|
|
||
| return commitClient.loadCommitRange( | ||
| engine, Optional.of(startVersion), Optional.empty(), endVersion, Optional.empty()); | ||
| } | ||
|
|
||
| /** | ||
| * Closes the UC client and releases resources. | ||
| * | ||
| * <p>This method should be called when the snapshot manager is no longer needed. Prefer using | ||
| * try-with-resources to ensure proper cleanup. | ||
| */ | ||
| @Override | ||
| public void close() { | ||
| try { | ||
| commitClient.close(); | ||
| logger.info("Closed CatalogManagedSnapshotManager for table {}", commitClient.getTableId()); | ||
| } catch (Exception e) { | ||
| logger.warn( | ||
| "Error closing catalog-managed client for table {}", commitClient.getTableId(), e); | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,99 @@ | ||
| /* | ||
| * Copyright (2025) The Delta Lake Project Authors. | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package io.delta.kernel.spark.snapshot; | ||
|
|
||
| import static java.util.Objects.requireNonNull; | ||
|
|
||
| import io.delta.kernel.spark.snapshot.unitycatalog.UnityCatalogManagedCommitClient; | ||
| import java.util.Optional; | ||
| import org.apache.hadoop.conf.Configuration; | ||
| import org.apache.spark.annotation.Experimental; | ||
| import org.apache.spark.sql.SparkSession; | ||
| import org.apache.spark.sql.catalyst.catalog.CatalogTable; | ||
|
|
||
| /** | ||
| * Factory for creating {@link DeltaSnapshotManager} instances. | ||
| * | ||
| * <p>This factory determines the appropriate snapshot manager implementation based on table | ||
| * characteristics and automatically handles the selection between: | ||
| * | ||
| * <ul> | ||
| * <li>{@link CatalogManagedSnapshotManager} - for Unity Catalog managed tables (CCv2) | ||
| * <li>{@link PathBasedSnapshotManager} - for regular filesystem-based Delta tables | ||
| * </ul> | ||
| * | ||
| * <p>The factory encapsulates the decision logic so that callers (e.g., {@code SparkTable}) don't | ||
| * need to know about specific manager implementations. | ||
| * | ||
| * <p><strong>Example usage:</strong> | ||
| * | ||
| * <pre>{@code | ||
| * DeltaSnapshotManager manager = DeltaSnapshotManagerFactory.create( | ||
| * tablePath, | ||
| * Optional.of(catalogTable), | ||
| * spark, | ||
| * hadoopConf | ||
| * ); | ||
| * Snapshot snapshot = manager.loadLatestSnapshot(); | ||
| * }</pre> | ||
| */ | ||
| @Experimental | ||
| public final class DeltaSnapshotManagerFactory { | ||
|
|
||
| // Utility class - no instances | ||
| private DeltaSnapshotManagerFactory() {} | ||
|
|
||
| /** | ||
| * Creates the appropriate snapshot manager for a Delta table. | ||
| * | ||
| * <p><strong>Selection logic:</strong> | ||
| * | ||
| * <ul> | ||
| * <li>If {@code catalogTable} is present and UC-managed → {@link CatalogManagedSnapshotManager} | ||
| * <li>Otherwise → {@link PathBasedSnapshotManager} | ||
| * </ul> | ||
| * | ||
| * @param tablePath filesystem path to the Delta table root | ||
| * @param catalogTable optional Spark catalog table metadata | ||
| * @param spark SparkSession for resolving Unity Catalog configurations | ||
| * @param hadoopConf Hadoop configuration for the Delta Kernel engine | ||
| * @return appropriate snapshot manager implementation | ||
| * @throws NullPointerException if tablePath, spark, or hadoopConf is null | ||
| * @throws IllegalArgumentException if catalogTable is UC-managed but configuration is invalid | ||
| */ | ||
| public static DeltaSnapshotManager create( | ||
| String tablePath, | ||
|
||
| Optional<CatalogTable> catalogTable, | ||
| SparkSession spark, | ||
| Configuration hadoopConf) { | ||
|
|
||
| requireNonNull(tablePath, "tablePath is null"); | ||
| requireNonNull(catalogTable, "catalogTable is null"); | ||
| requireNonNull(spark, "spark is null"); | ||
| requireNonNull(hadoopConf, "hadoopConf is null"); | ||
|
|
||
| if (catalogTable.isPresent()) { | ||
| Optional<ManagedCommitClient> clientOpt = | ||
| UnityCatalogManagedCommitClient.fromCatalog(catalogTable.get(), spark); | ||
| if (clientOpt.isPresent()) { | ||
| return new CatalogManagedSnapshotManager(clientOpt.get(), hadoopConf); | ||
| } | ||
| } | ||
|
|
||
| // Default to path-based snapshot manager | ||
| return new PathBasedSnapshotManager(tablePath, hadoopConf); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,70 @@ | ||
| /* | ||
| * Copyright (2025) The Delta Lake Project Authors. | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package io.delta.kernel.spark.snapshot; | ||
|
|
||
| import io.delta.kernel.CommitRange; | ||
| import io.delta.kernel.Snapshot; | ||
| import io.delta.kernel.engine.Engine; | ||
| import io.delta.kernel.internal.files.ParsedLogData; | ||
| import java.util.List; | ||
| import java.util.Optional; | ||
|
|
||
| /** | ||
| * Catalog-managed commit client that knows how to load snapshots and commit ranges for a specific | ||
| * table. | ||
| */ | ||
| public interface ManagedCommitClient extends AutoCloseable { | ||
|
||
|
|
||
| /** @return catalog-managed table identifier (for logging/telemetry). */ | ||
|
||
| String getTableId(); | ||
|
|
||
| /** @return physical table path used by Delta Kernel. */ | ||
| String getTablePath(); | ||
|
|
||
| Snapshot loadSnapshot(Engine engine, Optional<Long> versionOpt, Optional<Long> timestampOpt); | ||
|
|
||
| CommitRange loadCommitRange( | ||
| Engine engine, | ||
| Optional<Long> startVersionOpt, | ||
| Optional<Long> startTimestampOpt, | ||
| Optional<Long> endVersionOpt, | ||
| Optional<Long> endTimestampOpt); | ||
|
|
||
| /** | ||
| * Gets the ratified commits from the catalog up to the specified version. | ||
| * | ||
| * <p>The returned list contains {@link ParsedLogData} representing each ratified commit, sorted | ||
| * by version in ascending order. These are typically {@code ParsedCatalogCommitData} instances | ||
| * for catalog-managed tables. | ||
| * | ||
| * @param endVersionOpt optional end version (inclusive); if empty, returns commits up to latest | ||
| * @return list of parsed log data representing ratified commits, sorted by version ascending | ||
| */ | ||
| List<ParsedLogData> getRatifiedCommits(Optional<Long> endVersionOpt); | ||
|
|
||
| /** | ||
| * Gets the latest ratified table version from the catalog. | ||
| * | ||
| * <p>For catalog-managed tables, this is the highest version that has been ratified by the | ||
| * catalog coordinator. | ||
| * | ||
| * @return the latest version ratified by the catalog, or 0 if only the initial commit exists | ||
| */ | ||
| long getLatestRatifiedVersion(); | ||
|
|
||
| @Override | ||
| void close(); | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Plz add comments if the param passed in is not obvious. (Optional.empty()/param_name/)
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated. I did it this way
(a previous PR was done like this)