-
Notifications
You must be signed in to change notification settings - Fork 2k
[Kernel-Spark][DSv2] Add CCv2 routing to SparkTable #5660
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
dbe3c19
ac042af
9ca1793
e3fcc41
776ed69
418b9bd
e926066
d443640
d1684da
0ca9b4c
ee36547
906643b
71c62f3
e524c60
b79ff29
816e596
a5d745a
9d313c9
1ba9177
c6d7e67
fc614cb
2d333d3
7dc4549
b2e39c8
0423386
7717995
9cad0ba
a294801
f43a198
d4f0c0f
0a58d48
94303b3
4275b56
ad25296
254b3f5
07f3b5b
2bee750
64e5cda
bb9fe06
1615a27
c069296
f4dab6c
8588477
20c631b
f032bc4
15ddabf
ca28e0d
984e458
63e0f90
094db22
10da4a2
043f3c1
2767fec
0c22e3a
7a36ab2
197efc8
7f40d83
0b58868
cd3e85e
8ccc3ae
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -22,7 +22,13 @@ | |
| import io.delta.kernel.spark.read.SparkScanBuilder; | ||
| import io.delta.kernel.spark.snapshot.DeltaSnapshotManager; | ||
| import io.delta.kernel.spark.snapshot.PathBasedSnapshotManager; | ||
| import io.delta.kernel.spark.snapshot.unitycatalog.UCManagedSnapshotManager; | ||
| import io.delta.kernel.spark.snapshot.unitycatalog.UCTableInfo; | ||
| import io.delta.kernel.spark.snapshot.unitycatalog.UCUtils; | ||
| import io.delta.kernel.spark.utils.SchemaUtils; | ||
| import io.delta.kernel.unitycatalog.UCCatalogManagedClient; | ||
| import io.delta.storage.commit.uccommitcoordinator.UCClient; | ||
| import io.delta.storage.commit.uccommitcoordinator.UCTokenBasedRestClient; | ||
| import java.util.*; | ||
| import java.util.function.Supplier; | ||
| import org.apache.hadoop.conf.Configuration; | ||
|
|
@@ -137,7 +143,20 @@ private SparkTable( | |
|
|
||
| this.hadoopConf = | ||
| SparkSession.active().sessionState().newHadoopConfWithOptions(toScalaMap(options)); | ||
|
|
||
| // Create snapshot manager: UC-managed for Unity Catalog tables, path-based otherwise | ||
| this.snapshotManager = new PathBasedSnapshotManager(tablePath, hadoopConf); | ||
| if (catalogTable.isPresent()) { | ||
| Optional<UCTableInfo> ucTableInfo = | ||
| UCUtils.extractTableInfo(catalogTable.get(), SparkSession.active()); | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. sorry -- why are we hardcoding these UC calls? what if the managing catalog is glue? hms?
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @tdas -- just double checking: you're aware + okay with this?
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I may be out of date on my understanding of this design, but I thought the plan was that: Spark asks the SparkCatalog to load a table, and that SparkCatalog (maybe a GlueCatalog or UnitySingleCatalog) etc does the table-specific lookup (e.g. perhaps parquet, perhaps iceberg, perhaps Delta) and then comes into Kernel to create the right construct (e.g. a Kernel Snapshot with a UC-Committer) and then injects that Snapshot into the SparkTable. Has the dsv1 -> dsv2 fallback (or, perhaps the dsv2 -> dsv1 fallback) changed this? But I don't see how hardcoding UC references here is acceptable. Would love if you could help me understand this approach, thanks! |
||
| if (ucTableInfo.isPresent()) { | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. and if it's not? |
||
| UCTableInfo info = ucTableInfo.get(); | ||
| UCClient ucClient = new UCTokenBasedRestClient(info.getUcUri(), info.getUcToken()); | ||
| UCCatalogManagedClient ucCatalogClient = new UCCatalogManagedClient(ucClient); | ||
| this.snapshotManager = new UCManagedSnapshotManager(ucCatalogClient, info, hadoopConf); | ||
| } | ||
| } | ||
|
|
||
| // Load the initial snapshot through the manager | ||
| this.initialSnapshot = snapshotManager.loadLatestSnapshot(); | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,91 @@ | ||
| /* | ||
| * Copyright (2025) The Delta Lake Project Authors. | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package io.delta.kernel.spark.snapshot.unitycatalog; | ||
|
|
||
| import static java.util.Objects.requireNonNull; | ||
|
|
||
| import io.delta.kernel.CommitRange; | ||
| import io.delta.kernel.Snapshot; | ||
| import io.delta.kernel.engine.Engine; | ||
| import io.delta.kernel.internal.DeltaHistoryManager; | ||
| import io.delta.kernel.spark.snapshot.DeltaSnapshotManager; | ||
| import io.delta.kernel.unitycatalog.UCCatalogManagedClient; | ||
| import java.util.Optional; | ||
|
|
||
| /** | ||
| * Snapshot manager for Unity Catalog managed tables. | ||
| * | ||
| * <p>Used for tables with the catalog-managed commit feature enabled. Unity Catalog serves as the | ||
| * source of truth for the table's commit history. | ||
| */ | ||
| public class UCManagedTableSnapshotManager implements DeltaSnapshotManager { | ||
|
|
||
| private final UCCatalogManagedClient ucCatalogManagedClient; | ||
| private final String tableId; | ||
| private final String tablePath; | ||
| private final Engine engine; | ||
|
|
||
| /** | ||
| * Creates a new UCManagedTableSnapshotManager. | ||
| * | ||
| * @param ucCatalogManagedClient the UC client for catalog-managed operations | ||
| * @param tableInfo the UC table information (tableId, tablePath, etc.) | ||
| * @param engine the Kernel engine for table operations | ||
| */ | ||
| public UCManagedTableSnapshotManager( | ||
| UCCatalogManagedClient ucCatalogManagedClient, UCTableInfo tableInfo, Engine engine) { | ||
| this.ucCatalogManagedClient = | ||
| requireNonNull(ucCatalogManagedClient, "ucCatalogManagedClient is null"); | ||
| requireNonNull(tableInfo, "tableInfo is null"); | ||
| this.tableId = tableInfo.getTableId(); | ||
| this.tablePath = tableInfo.getTablePath(); | ||
| this.engine = requireNonNull(engine, "engine is null"); | ||
| } | ||
|
|
||
| @Override | ||
| public Snapshot loadLatestSnapshot() { | ||
| throw new UnsupportedOperationException( | ||
| "UCManagedTableSnapshotManager.loadLatestSnapshot is not yet implemented"); | ||
| } | ||
|
|
||
| @Override | ||
| public Snapshot loadSnapshotAt(long version) { | ||
| throw new UnsupportedOperationException( | ||
| "UCManagedTableSnapshotManager.loadSnapshotAt is not yet implemented"); | ||
| } | ||
|
|
||
| @Override | ||
| public DeltaHistoryManager.Commit getActiveCommitAtTime( | ||
| long timestampMillis, | ||
| boolean canReturnLastCommit, | ||
| boolean mustBeRecreatable, | ||
| boolean canReturnEarliestCommit) { | ||
| throw new UnsupportedOperationException( | ||
| "UCManagedTableSnapshotManager.getActiveCommitAtTime is not yet implemented"); | ||
| } | ||
|
|
||
| @Override | ||
| public void checkVersionExists(long version, boolean mustBeRecreatable, boolean allowOutOfRange) { | ||
| throw new UnsupportedOperationException( | ||
| "UCManagedTableSnapshotManager.checkVersionExists is not yet implemented"); | ||
| } | ||
|
|
||
| @Override | ||
| public CommitRange getTableChanges(Engine engine, long startVersion, Optional<Long> endVersion) { | ||
| throw new UnsupportedOperationException( | ||
| "UCManagedTableSnapshotManager.getTableChanges is not yet implemented"); | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems like a simple
would be cleaner?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Gotcha, also moved this PR to #5678 due to some stack issues.