-
Notifications
You must be signed in to change notification settings - Fork 2k
[Kernel-Spark] Add Unity Catalog connection extraction utilities for CCv2 support #5605
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
0a40b39 to
44b51da
Compare
aab0082 to
645cbe7
Compare
| Optional<scala.Tuple3<String, String, String>> configTuple = | ||
| scala.jdk.javaapi.CollectionConverters.asJava(scalaConfigs).stream() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Go through scalaConfigs (all catalogs) and get the first one where tuple (catalogName, uri, token) has a matching catalog name
| assertEquals(token, info.getToken(), "Token should be stored correctly"); | ||
| } | ||
|
|
||
| @Test |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel like those npe test cases are a bit over testing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah makes sense, I'll remove them
| * <p>This POJO encapsulates all the information needed to connect to a Unity Catalog table without | ||
| * requiring Spark dependencies. | ||
| */ | ||
| public final class UnityCatalogConnectionInfo { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pojo class that gets passed around to avoid passing around CatalogTable and Spark session
| // Get catalog name | ||
| scala.Option<String> catalogOption = catalogTable.identifier().catalog(); | ||
| String catalogName = | ||
| catalogOption.isDefined() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to look at spark.sessionState().catalogManager().currentCatalog().name(), it will returning spark_catalog as default and user specified one with "USE CATALOG" but not necessary the right one to look at
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I agree currentCatalog() is the wrong approach here for because:
- It reflects the user’s current session catalog, not the table’s actual catalog. If someone does
USE CATALOG other_catalogafter resolving a UC table, we’d end up looking up credentials for the wrong catalog. - It defaults to
spark_catalog(the Hive metastore), which never has UC credentials anyway.
Actually, one thing to note: the UpdateCatalog.java hook is doing exactly that today.
I also checked Spark’s LookupCatalog utilities, but I'm not sure if they work here since they work with name parts during analysis, before resolution. By the time we have a CatalogTable, the original name parts are already lost.
One fix is to require an explicit catalog in the identifier:
if (catalogTable.identifier().catalog().isEmpty()) {
throw new IllegalArgumentException(
"Unable to determine Unity Catalog for table " + catalogTable.identifier() +
": catalog name is missing. Use a fully-qualified table name with an explicit catalog.");
}
String catalogName = catalogTable.identifier().catalog().get();If UCSingleCatalog isn’t setting the catalog field on the CatalogTable.identifier, that’s a bug in the catalog implementation and probably better to fix it there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, perhaps we can start with the approach and evolve if necessary
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if (catalogTable.identifier().catalog().isEmpty()) {
throw new IllegalArgumentException(
"Unable to determine Unity Catalog for table " + catalogTable.identifier() +
": catalog name is missing. Use a fully-qualified table name with an explicit catalog.");
}
String catalogName = catalogTable.identifier().catalog().get();
this works for me, but plz double check in UCSingleCatalog that it is implemented correctly
| scala.collection.immutable.List<scala.Tuple3<String, String, String>> scalaConfigs = | ||
| UCCommitCoordinatorBuilder$.MODULE$.getCatalogConfigs(spark); | ||
|
|
||
| Optional<scala.Tuple3<String, String, String>> configTuple = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How hard will it be to refactor this method by moving it to a utils class
Lines 199 to 230 in 81e9f6e
| private[delta] def getCatalogConfigs(spark: SparkSession): List[(String, String, String)] = { | |
| val catalogConfigs = spark.conf.getAll.filterKeys(_.startsWith(SPARK_SQL_CATALOG_PREFIX)) | |
| catalogConfigs | |
| .keys | |
| .map(_.split("\\.")) | |
| .filter(_.length == 4) | |
| .map(_(3)) | |
| .filter { catalogName: String => | |
| val connector = catalogConfigs.get(s"$SPARK_SQL_CATALOG_PREFIX$catalogName") | |
| connector.contains(UNITY_CATALOG_CONNECTOR_CLASS)} | |
| .flatMap { catalogName: String => | |
| val uri = catalogConfigs.get(s"$SPARK_SQL_CATALOG_PREFIX$catalogName.$URI_SUFFIX") | |
| val token = catalogConfigs.get(s"$SPARK_SQL_CATALOG_PREFIX$catalogName.$TOKEN_SUFFIX") | |
| (uri, token) match { | |
| case (Some(u), Some(t)) => | |
| try { | |
| new URI(u) // Validate the URI | |
| Some((catalogName, u, t)) | |
| } catch { | |
| case _: URISyntaxException => | |
| logWarning(log"Skipping catalog ${MDC(DeltaLogKeys.CATALOG, catalogName)} as it " + | |
| log"does not have a valid URI ${MDC(DeltaLogKeys.URI, u)}.") | |
| None | |
| } | |
| case _ => | |
| logWarning(log"Skipping catalog ${MDC(DeltaLogKeys.CATALOG, catalogName)} as it does " + | |
| "not have both uri and token configured in Spark Session.") | |
| None | |
| }} | |
| .toList | |
| } |
raveeram-db
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall, looks great. a few nits
|
|
||
| // Get catalog name | ||
| scala.Option<String> catalogOption = catalogTable.identifier().catalog(); | ||
| String catalogName = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would be good to log this (especially if we end up using the default catalog)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, sounds good, added to logging and with the change we now require an explicit catalog in the identifier and throw IllegalArgumentException if missing, rather than falling back to currentCatalog().
| // Get catalog name | ||
| scala.Option<String> catalogOption = catalogTable.identifier().catalog(); | ||
| String catalogName = | ||
| catalogOption.isDefined() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, perhaps we can start with the approach and evolve if necessary
| if (!configTuple.isPresent()) { | ||
| throw new IllegalArgumentException( | ||
| "Cannot create UC client: Unity Catalog configuration not found for catalog '" | ||
| + catalogName |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
perhaps log the table identifier as well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, error message now includes the table identifier: "Cannot create UC client for table " + catalogTable.identifier() + ": ..."
| * Java-friendly wrapper for [[getCatalogConfigs]] that returns a Java List of | ||
| * [[UCCatalogConfig]] objects instead of Scala tuples. | ||
| */ | ||
| private[delta] def getCatalogConfigsJava( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's don't explicitly call it for using it for Java. getCatalogConfigs should be good
if java code want to call it, it is java code's work to convert it to java object
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done - removed the separate getCatalogConfigsJava wrapper. Java code now calls getCatalogConfigs directly and handles the Scala Map and Option types.
| } | ||
|
|
||
| /** | ||
| * Java-friendly holder for Unity Catalog configuration extracted from Spark configs. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let just avoid saying "Java-friendly holder", UCCatalogConfig actually is more readable than the tuples
| * [[UCCatalogConfig]] objects instead of Scala tuples. | ||
| */ | ||
| private[delta] def getCatalogConfigsJava( | ||
| spark: SparkSession): java.util.List[UCCatalogConfig] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we consider using a map?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah that's a good idea, oversight from me, changed getCatalogConfigs to return Map[String, UCCatalogConfig] for faster and direct lookups.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ran tests locally, no breaking changes by converting to Map - also confirmed via tracing
raveeram-db
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
08591f9 to
5fc21cd
Compare
...-spark/src/main/java/io/delta/kernel/spark/snapshot/unitycatalog/SparkUnityCatalogUtils.java
Outdated
Show resolved
Hide resolved
...rk/src/main/java/io/delta/kernel/spark/snapshot/unitycatalog/UnityCatalogConnectionInfo.java
Outdated
Show resolved
Hide resolved
...rk/src/main/java/io/delta/kernel/spark/snapshot/unitycatalog/UnityCatalogConnectionInfo.java
Outdated
Show resolved
Hide resolved
...rk/src/main/java/io/delta/kernel/spark/snapshot/unitycatalog/UnityCatalogConnectionInfo.java
Outdated
Show resolved
Hide resolved
...rc/test/java/io/delta/kernel/spark/snapshot/unitycatalog/UnityCatalogConnectionInfoTest.java
Outdated
Show resolved
Hide resolved
...-spark/src/main/java/io/delta/kernel/spark/snapshot/unitycatalog/SparkUnityCatalogUtils.java
Outdated
Show resolved
Hide resolved
kernel-spark/src/main/java/io/delta/kernel/spark/snapshot/unitycatalog/UCUtils.java
Show resolved
Hide resolved
...rk/src/main/java/io/delta/kernel/spark/snapshot/unitycatalog/UnityCatalogConnectionInfo.java
Outdated
Show resolved
Hide resolved
tdas
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
🥞 Stacked PR
Use this link to review incremental changes.
Which Delta project/connector is this regarding?
Description
This PR introduces utilities to extract Unity Catalog (UC) connection information from Spark catalog metadata, enabling the DSv2 connector backed by Kernel to work with UC-managed Delta tables that use managed commits (CCv2).
The primary changes in this PR are:
UnityCatalogConnectionInfo- Immutable POJO encapsulating UC connection details(tableId, tablePath, endpoint, token)tupleSparkUnityCatalogUtils- Utility to extract connection info from Spark'sCatalogTable metadata by:
a. Checking if the table is UC-managed via feature flags
b. Extracting table ID from storage properties
c. Looking up catalog configuration (endpoint/token) from Spark session
flexible createCatalogTable() method
This util will help V2 connector follow how V1 connector's pattern to resolve commit from UC as a short term until the long term solution that does not depends on catalog table for encode UC info being implemented
Architecture Context
(We are implementing the green part)
graph TB subgraph "Spark Layer" SC[SparkSession] CT[CatalogTable, provided by UC OSS] CM[Catalog Metadata] end subgraph "Extraction Layer (This PR)" SUCU[SparkUnityCatalogUtils] UCCI[UnityCatalogConnectionInfo] end subgraph "Kernel V2 connector" CCC[UC CC client] UCA[UnityCatalogSnapshotManager] end subgraph "Unity Catalog" UCServer[UC Server] end SC --> SUCU CT --> CM CM --> SUCU SUCU -->|extracts| UCCI UCCI -->|provides uc's uri/tokens| UCA UCA --> CCC CCC -->|get commits| UCServer style SUCU fill:#90EE90 style UCCI fill:#90EE90How was this patch tested?
Locally and CI
Does this PR introduce any user-facing changes?
No.