Skip to content

Conversation

@TimothyW553
Copy link
Collaborator

@TimothyW553 TimothyW553 commented Nov 18, 2025

🥞 Stacked PR

Use this link to review incremental changes.


Which Delta project/connector is this regarding?

  • Spark
  • Standalone
  • Flink
  • Kernel
  • Other (fill in here)

Description

Adds Unity Catalog (UC) integration for catalog-managed commits (CCv2) to DSv2 connector with Kernel.

Key changes:

  • DeltaSnapshotManagerFactory: Factory that automatically selects CatalogManagedSnapshotManager for UC tables or PathBasedSnapshotManager for regular tables
  • UnityCatalogManagedCommitClient: UC-specific implementation of ManagedCommitClient that wraps UCCatalogManagedClient
  • CatalogManagedSnapshotManager: Generic snapshot manager that delegates to catalog-managed commit clients for loading snapshots/versions
  • CatalogTableUtils: Utilities for detecting UC-managed tables and extracting table IDs from catalog metadata

Architecture:

SparkTable
    └─ DeltaSnapshotManagerFactory
         └─ CatalogManagedSnapshotManager
              └─ UnityCatalogManagedCommitClient
                   └─ UCCatalogManagedClient (kernel/unitycatalog)

This enables the DSv2 connector to read UC catalog-managed tables via Kernel, with automatic fallback to path-based access for regular tables.

How was this patch tested?

  • Unit tests for factory, snapshot manager, UC client, and utils
  • Integration test (DeltaCatalogSuite) validates end-to-end UC table reads via DSv2
  • Existing Delta tests continue to pass with fallback to path-based managers

Does this PR introduce any user-facing changes?

No.

@TimothyW553 TimothyW553 force-pushed the stack/ccv2-catalog-config branch 11 times, most recently from 4726e61 to 7d65e09 Compare November 20, 2025 08:04
@TimothyW553 TimothyW553 force-pushed the stack/ccv2-catalog-config branch from eee2507 to 765a916 Compare November 20, 2025 08:58
@TimothyW553 TimothyW553 reopened this Nov 20, 2025
Signed-off-by: TimothyW553 <[email protected]>
Signed-off-by: TimothyW553 <[email protected]>
Signed-off-by: TimothyW553 <[email protected]>
Signed-off-by: TimothyW553 <[email protected]>
Signed-off-by: Timothy Wang <[email protected]>
Signed-off-by: Timothy Wang <[email protected]>
Signed-off-by: Timothy Wang <[email protected]>
Signed-off-by: Timothy Wang <[email protected]>
Signed-off-by: Timothy Wang <[email protected]>
Signed-off-by: Timothy Wang <[email protected]>
Signed-off-by: TimothyW553 <[email protected]>
Signed-off-by: TimothyW553 <[email protected]>
Signed-off-by: TimothyW553 <[email protected]>
@TimothyW553 TimothyW553 marked this pull request as ready for review November 25, 2025 00:43
zachschuermann pushed a commit that referenced this pull request Nov 25, 2025
## 🥞 Stacked PR
Use this [link](https://github.com/delta-io/delta/pull/5477/files) to
review incremental changes.
-
[**catalogtableutils-ccv2**](#5477)
[[Files changed](https://github.com/delta-io/delta/pull/5477/files)]
-
[stack/ccv2-catalog-config](#5520)
[[Files
changed](https://github.com/delta-io/delta/pull/5520/files/6359268dbee8d1a114e3f66620c6585bc0bdb6eb..4a1d8fa93e56d68b5971fb32970bdeaa5799abdc)]

---------
<!--
Thanks for sending a pull request!  Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md
2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP]
Your PR title ...'.
  3. Be sure to keep the PR description updated to reflect all changes.
  4. Please write your PR title to summarize what this PR proposes.
5. If possible, provide a concise example to reproduce the issue for a
faster review.
6. If applicable, include the corresponding issue number in the PR title
and link it in the body.
-->

#### Which Delta project/connector is this regarding?
<!--
Please add the component selected below to the beginning of the pull
request title
For example: [Spark] Title of my pull request
-->

- [ ] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description

This PR adds utils for CatalogTable, Scala, and for catalogtable
testing. In particular, CatalogTableUtils is used for determining if a
table is managed/owned by UC -- which will determine the source of truth
for operations.

<!--
- Describe what this PR changes.
- Describe why we need the change.
 
If this PR resolves an issue be sure to include "Resolves #XXX" to
correctly link and close the issue upon merge.
-->

## How was this patch tested?

- tested locally via `build/sbt -DsparkVersion=master "++ 2.13.16" clean
sparkV2/test`
- passing CI tests 

<!--
If tests were added, say they were added here. Please make sure to test
the changes thoroughly including negative and positive cases if
possible.
If the changes were tested in any way other than unit tests, please
clarify how you tested step by step (ideally copy and paste-able, so
that other reviewers can test and check, and descendants can verify in
the future).
If the changes were not tested, please explain why.
-->

## Does this PR introduce _any_ user-facing changes?
No.
<!--
If yes, please clarify the previous behavior and the change this PR
proposes - provide the console output, description and/or an example to
show the behavior difference if possible.
If possible, please also clarify if this is a user-facing change
compared to the released Delta Lake versions or within the unreleased
branches such as master.
If no, write 'No'.
-->

---------

Signed-off-by: TimothyW553 <[email protected]>
Signed-off-by: Timothy Wang <[email protected]>
zikangh pushed a commit to zikangh/delta that referenced this pull request Nov 26, 2025
Use this [link](https://github.com/delta-io/delta/pull/5477/files) to
review incremental changes.
-
[**catalogtableutils-ccv2**](delta-io#5477)
[[Files changed](https://github.com/delta-io/delta/pull/5477/files)]
-
[stack/ccv2-catalog-config](delta-io#5520)
[[Files
changed](https://github.com/delta-io/delta/pull/5520/files/6359268dbee8d1a114e3f66620c6585bc0bdb6eb..4a1d8fa93e56d68b5971fb32970bdeaa5799abdc)]

---------
<!--
Thanks for sending a pull request!  Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md
2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP]
Your PR title ...'.
  3. Be sure to keep the PR description updated to reflect all changes.
  4. Please write your PR title to summarize what this PR proposes.
5. If possible, provide a concise example to reproduce the issue for a
faster review.
6. If applicable, include the corresponding issue number in the PR title
and link it in the body.
-->

<!--
Please add the component selected below to the beginning of the pull
request title
For example: [Spark] Title of my pull request
-->

- [ ] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

This PR adds utils for CatalogTable, Scala, and for catalogtable
testing. In particular, CatalogTableUtils is used for determining if a
table is managed/owned by UC -- which will determine the source of truth
for operations.

<!--
- Describe what this PR changes.
- Describe why we need the change.

If this PR resolves an issue be sure to include "Resolves #XXX" to
correctly link and close the issue upon merge.
-->

- tested locally via `build/sbt -DsparkVersion=master "++ 2.13.16" clean
sparkV2/test`
- passing CI tests

<!--
If tests were added, say they were added here. Please make sure to test
the changes thoroughly including negative and positive cases if
possible.
If the changes were tested in any way other than unit tests, please
clarify how you tested step by step (ideally copy and paste-able, so
that other reviewers can test and check, and descendants can verify in
the future).
If the changes were not tested, please explain why.
-->

No.
<!--
If yes, please clarify the previous behavior and the change this PR
proposes - provide the console output, description and/or an example to
show the behavior difference if possible.
If possible, please also clarify if this is a user-facing change
compared to the released Delta Lake versions or within the unreleased
branches such as master.
If no, write 'No'.
-->

---------

Signed-off-by: TimothyW553 <[email protected]>
Signed-off-by: Timothy Wang <[email protected]>
@TimothyW553 TimothyW553 changed the title [Kernel][CCv2] Create UC client [DSv2] Create UC client Nov 26, 2025
* Catalog-managed commit client that knows how to load snapshots and commit ranges for a specific
* table.
*/
public interface ManagedCommitClient extends AutoCloseable {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe not called it Client?This abstraction is 1 level above to encapsulate specific Catalog implementation, maybe call it ManagedCatalogAdapter?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, yes. Renamed to ManagedCatalogAdapter to better reflect that this is an adapter abstraction above catalog-specific implementations. Also renamed the UC implementation to UnityCatalogAdapter...

* @throws IllegalArgumentException if the table lacks UC identifiers or catalog config is missing
*/
public static Optional<ManagedCommitClient> fromCatalog(
CatalogTable catalogTable, SparkSession spark) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Try not introduce Spark dependency to this class -- maybe add an pojo

public class UnityCatalogConnectionInfo {
    private final String tableId;
    private final String endpoint;
    private final String token;
    private final String catalogName;

    public UnityCatalogConnectionInfo(String tableId, String endpoint, String token, String catalogName) {
        this.tableId = tableId;
        this.endpoint = endpoint;
        this.token = token;
        this.catalogName = catalogName;
    }

and

public class SparkUnityCatalogUtils {
    
    public static UnityCatalogConnectionInfo extractConnectionInfo(CatalogTable table, SparkSession spark) {}

Copy link
Collaborator Author

@TimothyW553 TimothyW553 Dec 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yep that's cleaner and makes it more extensible. Refactored to add:

  1. UnityCatalogConnectionInfo - Pure data POJO (tableId, tablePath, endpoint, token)
  2. SparkUnityCatalogUtils - Extracts UC config from Spark metadata
  3. UnityCatalogAdapter - Two factory methods:
    • fromCatalog(catalogTable, spark) - convenient wrapper
    • fromConnectionInfo(info) - no Spark dependency

Just for personal and future reference.

The original design tightly coupled UnityCatalogAdapter to Spark - method signature was like f(CatalogTable, Spark). Now the adapter depends only on the POJO, which allows:

  • Creating adapters from config files, env vars, REST APIs (not just Spark)
  • Testing UC logic without mocking Spark
  • Serializing/passing connection info across boundaries

Example with pojo

// From config file
Properties props = loadConfig("uc.properties");
UnityCatalogConnectionInfo info = new UnityCatalogConnectionInfo(...);
UnityCatalogAdapter.fromConnectionInfo(info);

List<ParsedLogData> logData = commitClient.getRatifiedCommits(Optional.empty());

// Convert to ParsedCatalogCommitData for DeltaHistoryManager
List<ParsedCatalogCommitData> catalogCommits =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

loadLatestSnapshot() will call commitClient.getRatifiedCommits as well. We need to avoid calling it twice.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes, I see the codepath now for getting commits and the latest snapshot. Fixed by extracting catalog commits directly from the snapshot's log segment instead of calling UC twice:

List<ParsedCatalogCommitData> catalogCommits = latestSnapshot.getLogSegment().getAllCatalogCommits();

SnapshotImpl latestSnapshot = (SnapshotImpl) loadLatestSnapshot();

// Get ratified commits from the catalog
List<ParsedLogData> logData = commitClient.getRatifiedCommits(Optional.empty());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Plz add comments if the param passed in is not obvious. (Optional.empty()/param_name/)

Copy link
Collaborator Author

@TimothyW553 TimothyW553 Dec 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated. I did it this way

catalogAdapter.loadSnapshot(kernelEngine, /* versionOpt = */ Optional.empty(), /* timestampOpt = */ Optional.empty())

(a previous PR was done like this)

*/
public interface ManagedCommitClient extends AutoCloseable {

/** @return catalog-managed table identifier (for logging/telemetry). */
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not necessary for logging/telemetry I guess

* @throws IllegalArgumentException if catalogTable is UC-managed but configuration is invalid
*/
public static DeltaSnapshotManager create(
String tablePath,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not having two method: fromPath(String tablePath) fromCatalogTable(CatalogTable catalogTable) to simplify the code?

Copy link
Collaborator Author

@TimothyW553 TimothyW553 Dec 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good, I've split the factory into two methods:

  • fromPath(tablePath, hadoopConf) - for filesystem-based tables
  • fromCatalogTable(catalogTable, spark, hadoopConf) - for catalog tables with automatic UC detection

Makes the API cleaner and avoids requiring SparkSession for simple path-based access.

@Override
public io.delta.kernel.Snapshot loadSnapshot(
io.delta.kernel.engine.Engine engine,
java.util.Optional<Long> versionOpt,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe just import vs using fully names

import org.junit.jupiter.api.Test;

/** Tests for {@link CatalogManagedSnapshotManager}. */
class CatalogManagedSnapshotManagerTest {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you try using the sample catalog managed table used by kernelUnityCatalog?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants