-
Notifications
You must be signed in to change notification settings - Fork 2k
[Kernel-Spark] Add Unity Catalog connection extraction utilities for CCv2 support #5605
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
36 commits
Select commit
Hold shift + click to select a range
540c3a4
update methods and testS
TimothyW553 80d4662
delete workspace
TimothyW553 7cf2c28
delete
TimothyW553 5fca199
change to catalogtable
TimothyW553 b0c0b6f
add tests
TimothyW553 fdec89e
fmt
TimothyW553 4807c4f
test fix
TimothyW553 8e992d9
storage props
TimothyW553 bbc37b5
merge props
TimothyW553 28e17b5
address comments
TimothyW553 afcdfb1
remove private method test
TimothyW553 e1aed3e
null check for storage properties
TimothyW553 d709460
Restore UC connection utils after rebase
TimothyW553 e7ef109
clean up...
TimothyW553 9835086
Restore UC connection info utils and add SparkUnityCatalogUtils suite
TimothyW553 d066f73
Update tests
TimothyW553 1f09ddc
spark
TimothyW553 c63731f
Clean up test utils and refactor
TimothyW553 fc13a3a
revert DeltaSQLConfV2.scala
TimothyW553 7ccf4aa
remove npe tests
TimothyW553 3be445e
remove more npe
TimothyW553 2ada816
Centralize utils
TimothyW553 3344671
fmt
TimothyW553 1028015
FMT
TimothyW553 5cddf21
Address comments - remove unecessary wrappers and add logging
TimothyW553 73c9ecd
Address comments - remove unecessary wrappers and add logging
TimothyW553 5cd8807
Add wrapper for map
TimothyW553 ab7f0fd
use SharedSparkSession
TimothyW553 37631c0
fix tests
TimothyW553 ca68661
fix tests
TimothyW553 e56a59f
annotate constant params
TimothyW553 5fc21cd
annotate constant params
TimothyW553 2db986b
Rename and follow naming conventions
TimothyW553 7e405bc
Rename and follow naming conventions
TimothyW553 28a6259
update context
TimothyW553 42b94dd
Merge branch 'master' into stack/ccv2-uc-utils
TimothyW553 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
54 changes: 54 additions & 0 deletions
54
kernel-spark/src/main/java/io/delta/kernel/spark/snapshot/unitycatalog/UCTableInfo.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,54 @@ | ||
| /* | ||
| * Copyright (2025) The Delta Lake Project Authors. | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package io.delta.kernel.spark.snapshot.unitycatalog; | ||
|
|
||
| import static java.util.Objects.requireNonNull; | ||
|
|
||
| /** | ||
| * Table information for Unity Catalog managed tables. | ||
| * | ||
| * <p>This POJO encapsulates all the information needed to interact with a Unity Catalog table | ||
| * without requiring Spark dependencies. | ||
| */ | ||
| public final class UCTableInfo { | ||
| private final String tableId; | ||
| private final String tablePath; | ||
| private final String ucUri; | ||
| private final String ucToken; | ||
|
|
||
| public UCTableInfo(String tableId, String tablePath, String ucUri, String ucToken) { | ||
| this.tableId = requireNonNull(tableId, "tableId is null"); | ||
| this.tablePath = requireNonNull(tablePath, "tablePath is null"); | ||
| this.ucUri = requireNonNull(ucUri, "ucUri is null"); | ||
| this.ucToken = requireNonNull(ucToken, "ucToken is null"); | ||
| } | ||
|
|
||
| public String getTableId() { | ||
| return tableId; | ||
| } | ||
|
|
||
| public String getTablePath() { | ||
| return tablePath; | ||
| } | ||
|
|
||
| public String getUcUri() { | ||
| return ucUri; | ||
| } | ||
|
|
||
| public String getUcToken() { | ||
| return ucToken; | ||
| } | ||
| } |
113 changes: 113 additions & 0 deletions
113
kernel-spark/src/main/java/io/delta/kernel/spark/snapshot/unitycatalog/UCUtils.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,113 @@ | ||
| /* | ||
| * Copyright (2025) The Delta Lake Project Authors. | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package io.delta.kernel.spark.snapshot.unitycatalog; | ||
|
|
||
| import static java.util.Objects.requireNonNull; | ||
|
|
||
| import io.delta.kernel.spark.utils.CatalogTableUtils; | ||
| import io.delta.storage.commit.uccommitcoordinator.UCCommitCoordinatorClient; | ||
| import java.util.Map; | ||
| import java.util.Optional; | ||
| import org.apache.spark.sql.SparkSession; | ||
| import org.apache.spark.sql.catalyst.catalog.CatalogTable; | ||
| import org.apache.spark.sql.delta.coordinatedcommits.UCCatalogConfig; | ||
| import org.apache.spark.sql.delta.coordinatedcommits.UCCommitCoordinatorBuilder$; | ||
|
|
||
| /** | ||
| * Utility class for extracting Unity Catalog table information from Spark catalog metadata. | ||
| * | ||
| * <p>This class isolates Spark dependencies, allowing {@link UCManagedSnapshotManager} to be | ||
| * created without Spark if table info is provided directly via {@link UCTableInfo}. | ||
| */ | ||
| public final class UCUtils { | ||
|
|
||
| // Utility class - no instances | ||
| private UCUtils() {} | ||
|
|
||
| /** | ||
| * Extracts Unity Catalog table information from Spark catalog table metadata. | ||
| * | ||
| * @param catalogTable Spark catalog table metadata | ||
| * @param spark SparkSession for resolving Unity Catalog configurations | ||
| * @return table info if table is UC-managed, empty otherwise | ||
| * @throws IllegalArgumentException if table is UC-managed but configuration is invalid | ||
| */ | ||
| public static Optional<UCTableInfo> extractTableInfo( | ||
| CatalogTable catalogTable, SparkSession spark) { | ||
| requireNonNull(catalogTable, "catalogTable is null"); | ||
| requireNonNull(spark, "spark is null"); | ||
|
|
||
| if (!CatalogTableUtils.isUnityCatalogManagedTable(catalogTable)) { | ||
| return Optional.empty(); | ||
| } | ||
|
|
||
| String tableId = extractUCTableId(catalogTable); | ||
| String tablePath = extractTablePath(catalogTable); | ||
|
|
||
| // Get catalog name - require explicit catalog in identifier | ||
| scala.Option<String> catalogOption = catalogTable.identifier().catalog(); | ||
| if (catalogOption.isEmpty()) { | ||
| throw new IllegalArgumentException( | ||
| "Unable to determine Unity Catalog for table " | ||
| + catalogTable.identifier() | ||
| + ": catalog name is missing. Use a fully-qualified table name with an explicit " | ||
| + "catalog (e.g., catalog.schema.table)."); | ||
| } | ||
| String catalogName = catalogOption.get(); | ||
|
|
||
| // Get UC endpoint and token from Spark configs | ||
| scala.collection.immutable.Map<String, UCCatalogConfig> ucConfigs = | ||
| UCCommitCoordinatorBuilder$.MODULE$.getCatalogConfigMap(spark); | ||
|
|
||
| scala.Option<UCCatalogConfig> configOpt = ucConfigs.get(catalogName); | ||
|
|
||
| if (configOpt.isEmpty()) { | ||
| throw new IllegalArgumentException( | ||
| "Cannot create UC client for table " | ||
| + catalogTable.identifier() | ||
| + ": Unity Catalog configuration not found for catalog '" | ||
| + catalogName | ||
| + "'."); | ||
| } | ||
|
|
||
| UCCatalogConfig config = configOpt.get(); | ||
| String ucUri = config.uri(); | ||
| String ucToken = config.token(); | ||
|
|
||
| return Optional.of(new UCTableInfo(tableId, tablePath, ucUri, ucToken)); | ||
| } | ||
|
|
||
| private static String extractUCTableId(CatalogTable catalogTable) { | ||
| Map<String, String> storageProperties = | ||
| scala.jdk.javaapi.CollectionConverters.asJava(catalogTable.storage().properties()); | ||
|
|
||
| // TODO: UC constants should be consolidated in a shared location (future PR) | ||
| String ucTableId = storageProperties.get(UCCommitCoordinatorClient.UC_TABLE_ID_KEY); | ||
TimothyW553 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| if (ucTableId == null || ucTableId.isEmpty()) { | ||
| throw new IllegalArgumentException( | ||
| "Cannot extract ucTableId from table " + catalogTable.identifier()); | ||
| } | ||
| return ucTableId; | ||
| } | ||
|
|
||
| private static String extractTablePath(CatalogTable catalogTable) { | ||
| if (catalogTable.location() == null) { | ||
| throw new IllegalArgumentException( | ||
| "Cannot extract table path: location is null for table " + catalogTable.identifier()); | ||
| } | ||
| return catalogTable.location().toString(); | ||
| } | ||
| } | ||
40 changes: 40 additions & 0 deletions
40
kernel-spark/src/test/java/io/delta/kernel/spark/snapshot/unitycatalog/UCTableInfoTest.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,40 @@ | ||
| /* | ||
| * Copyright (2025) The Delta Lake Project Authors. | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package io.delta.kernel.spark.snapshot.unitycatalog; | ||
|
|
||
| import static org.junit.jupiter.api.Assertions.assertEquals; | ||
|
|
||
| import org.junit.jupiter.api.Test; | ||
|
|
||
| /** Tests for {@link UCTableInfo}. */ | ||
| class UCTableInfoTest { | ||
|
|
||
| @Test | ||
| void testConstructor_ValidInputs_StoresAllFields() { | ||
| // Use distinctive values that would fail if implementation had hardcoded defaults | ||
| String tableId = "uc_tbl_7f3a9b2c-e8d1-4f6a"; | ||
| String tablePath = "abfss://[email protected]/delta/v2"; | ||
| String ucUri = "https://uc-server.example.net/api/2.1/uc"; | ||
| String ucToken = "dapi_Kx9mN$2pQr#7vWz"; | ||
|
|
||
| UCTableInfo info = new UCTableInfo(tableId, tablePath, ucUri, ucToken); | ||
|
|
||
| assertEquals(tableId, info.getTableId(), "Table ID should be stored correctly"); | ||
| assertEquals(tablePath, info.getTablePath(), "Table path should be stored correctly"); | ||
| assertEquals(ucUri, info.getUcUri(), "UC URI should be stored correctly"); | ||
| assertEquals(ucToken, info.getUcToken(), "UC token should be stored correctly"); | ||
| } | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
perhaps log the table identifier as well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, error message now includes the table identifier:
"Cannot create UC client for table " + catalogTable.identifier() + ": ..."