[DSv2][POC] Adding CCv2 support to Kernel-backed DSv2 connector #5627
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Which Delta project/connector is this regarding?
Description
CatalogWithManagedCommits(GET/POSTcommits,
extractTableId) lives in kernel-defaults and is discovered via ServiceLoader. This removesUC/Spark dependencies from the core and lets UC/Glue/Polaris plug in without kernel-spark changes.
CatalogManagedSnapshotManagernow paginates
getCommits, enforces contiguity, maps ratified commits to_delta_logfiles, andbuilds snapshots with
withLogData/.withMaxCatalogVersion(and.atVersionfor time travel). Catalogimplementations only fetch ratified commit metadata (and propose commits), reducing duplication
and coupling.
DeltaSnapshotManagerFactory.fromCatalogTable/fromPath; path tables remain unchanged. Managed tablesauto-resolve via ServiceLoader if a table ID is present, otherwise fall back to path-based.
This flow shows how a catalog-managed Delta table snapshot is loaded when CCv2 is in play:
catalog’s ratified commits and build a Delta Snapshot using only those commits, while falling back
to path-based when no catalog info is present.
is discoverable (via ServiceLoader) and yields a table ID, the catalog-managed manager is used;
otherwise, it’s path-based. The catalog-managed manager calls the catalog’s GET commits, paginates
and enforces contiguous versions, maps the commit entries to the exact
_delta_logfiles, thenbuilds the Snapshot with
withLogDataandwithMaxCatalogVersion(and optional time travel). Thefinal Snapshot is returned to SparkTable for reads.
Diagram:
flowchart LR A["SparkTable"] -->|1| B["DeltaSnapshotManagerFactory.fromCatalogTable/fromPath"] B -->|2| C{"ServiceLoader finds CatalogWithManagedCommits?"} C -- "yes" --> D["CatalogManagedSnapshotManager"] C -- "no" --> P["PathBasedSnapshotManager"] D -->|3| E["getCommits(tableId, path, start=0, end?)"] E -->|4| F["Paginate & validate contiguity"] F -->|5| G["Map to ParsedCatalogCommitData<br/>(_delta_log/<file>)"] G -->|6| H["TableManager.loadSnapshot(tablePath)<br/>.withLogData(logData)<br/>.withMaxCatalogVersion(latest)<br/>(.atVersion if set)"] H -->|7| S["Snapshot"] subgraph Catalog_Impl["Catalog Impl"] E --> UC["CatalogWithManagedCommits impl<br/>(e.g., UC client)"] endSequence work flow:
sequenceDiagram participant "ST" as "SparkTable (catalog/SparkTable)" participant "FACT" as "DeltaSnapshotManagerFactory" participant "MGR" as "CatalogManagedSnapshotManager" participant "CAT" as "CatalogWithManagedCommits (UC client via ServiceLoader)" participant "TM" as "TableManager (kernel)" participant "FS" as "_delta_log storage" "ST"->>"FACT": "fromCatalogTable(catalogTable, spark, hadoopConf)" "FACT"->>"CAT": "ServiceLoader.load(CatalogWithManagedCommits)" "CAT"-->>"FACT": "extractTableId(props) → Some(tableId)" "FACT"-->>"ST": "new CatalogManagedSnapshotManager(...)" "ST"->>"MGR": "loadLatestSnapshot()" "MGR"->>"CAT": "getCommits(tableId, tablePath, start=0, end=None)" "CAT"-->>"MGR": "GetCommitsResult(commits[], latestTableVersion)" "MGR"->>"MGR": "paginate if needed & validateContiguous" "MGR"->>"MGR": "toParsedLogData(commits) → ParsedCatalogCommitData" "MGR"->>"TM": "loadSnapshot(tablePath)<br/>.withLogData(logData)<br/>.withMaxCatalogVersion(latestTableVersion)<br/>(.atVersion(v) if provided)" "TM"->>"FS": "read commit/checkpoint files" "FS"-->>"TM": "file contents" "TM"-->>"MGR": "Snapshot" "MGR"-->>"ST": "Snapshot"How was this patch tested?
Does this PR introduce any user-facing changes?