Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
540c3a4
update methods and testS
TimothyW553 Nov 12, 2025
80d4662
delete workspace
TimothyW553 Nov 12, 2025
7cf2c28
delete
TimothyW553 Nov 12, 2025
5fca199
change to catalogtable
TimothyW553 Nov 11, 2025
b0c0b6f
add tests
TimothyW553 Nov 12, 2025
fdec89e
fmt
TimothyW553 Nov 12, 2025
4807c4f
test fix
TimothyW553 Nov 13, 2025
8e992d9
storage props
TimothyW553 Nov 14, 2025
bbc37b5
merge props
TimothyW553 Nov 17, 2025
28e17b5
address comments
TimothyW553 Nov 17, 2025
afcdfb1
remove private method test
TimothyW553 Nov 18, 2025
e1aed3e
null check for storage properties
TimothyW553 Nov 18, 2025
d709460
Restore UC connection utils after rebase
TimothyW553 Dec 2, 2025
e7ef109
clean up...
TimothyW553 Dec 2, 2025
9835086
Restore UC connection info utils and add SparkUnityCatalogUtils suite
TimothyW553 Dec 2, 2025
d066f73
Update tests
TimothyW553 Dec 2, 2025
1f09ddc
spark
TimothyW553 Dec 2, 2025
c63731f
Clean up test utils and refactor
TimothyW553 Dec 2, 2025
fc13a3a
revert DeltaSQLConfV2.scala
TimothyW553 Dec 2, 2025
7ccf4aa
remove npe tests
TimothyW553 Dec 2, 2025
3be445e
remove more npe
TimothyW553 Dec 2, 2025
2ada816
Centralize utils
TimothyW553 Dec 4, 2025
3344671
fmt
TimothyW553 Dec 4, 2025
1028015
FMT
TimothyW553 Dec 4, 2025
5cddf21
Address comments - remove unecessary wrappers and add logging
TimothyW553 Dec 4, 2025
73c9ecd
Address comments - remove unecessary wrappers and add logging
TimothyW553 Dec 4, 2025
5cd8807
Add wrapper for map
TimothyW553 Dec 4, 2025
ab7f0fd
use SharedSparkSession
TimothyW553 Dec 4, 2025
37631c0
fix tests
TimothyW553 Dec 4, 2025
ca68661
fix tests
TimothyW553 Dec 4, 2025
e56a59f
annotate constant params
TimothyW553 Dec 5, 2025
5fc21cd
annotate constant params
TimothyW553 Dec 5, 2025
2db986b
Rename and follow naming conventions
TimothyW553 Dec 8, 2025
7e405bc
Rename and follow naming conventions
TimothyW553 Dec 8, 2025
28a6259
update context
TimothyW553 Dec 9, 2025
ad3ef53
Add catalog-managed snapshot wireframe stubs
TimothyW553 Dec 1, 2025
c03c260
Adjust wireframe manager to always throw for table changes
TimothyW553 Dec 1, 2025
178d73b
Keep catalog factory/adapter unwired in skeleton
TimothyW553 Dec 1, 2025
8e11f49
fmt
TimothyW553 Dec 1, 2025
8b27c4d
import
TimothyW553 Dec 1, 2025
61ad41f
fmt
TimothyW553 Dec 1, 2025
15066b4
clean up tests
TimothyW553 Dec 2, 2025
9fe6cf9
Update interfaces to better reflect CCv2 spec
TimothyW553 Dec 8, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Copyright (2025) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.spark.snapshot;

import static java.util.Objects.requireNonNull;

import org.apache.hadoop.conf.Configuration;
import org.apache.spark.annotation.Experimental;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.catalog.CatalogTable;

/**
* Factory for creating {@link DeltaSnapshotManager} instances.
*
* <p>This factory provides two creation methods:
*
* <ul>
* <li>{@link #fromPath} - Creates a {@link PathBasedSnapshotManager} for filesystem-based Delta
* tables
* <li>{@link #fromCatalogTable} - Creates snapshot manager from catalog metadata, automatically
* selecting {@link CatalogManagedSnapshotManager} for UC tables or falling back to {@link
* PathBasedSnapshotManager}
* </ul>
*
* <p><strong>Example usage:</strong>
*
* <pre>{@code
* // For path-based tables
* DeltaSnapshotManager manager = DeltaSnapshotManagerFactory.fromPath(
* tablePath,
* hadoopConf
* );
*
* // For catalog tables
* DeltaSnapshotManager manager = DeltaSnapshotManagerFactory.fromCatalogTable(
* catalogTable,
* spark,
* hadoopConf
* );
* }</pre>
*/
@Experimental
public final class DeltaSnapshotManagerFactory {

// Utility class - no instances
private DeltaSnapshotManagerFactory() {}

/**
* Creates a path-based snapshot manager for filesystem Delta tables.
*
* <p>Use this when no catalog metadata is available or when you want to work directly with a
* filesystem path.
*
* @param tablePath filesystem path to the Delta table root
* @param hadoopConf Hadoop configuration for the Delta Kernel engine
* @return PathBasedSnapshotManager instance
* @throws NullPointerException if tablePath or hadoopConf is null
*/
public static DeltaSnapshotManager fromPath(String tablePath, Configuration hadoopConf) {
requireNonNull(tablePath, "tablePath is null");
requireNonNull(hadoopConf, "hadoopConf is null");
return new PathBasedSnapshotManager(tablePath, hadoopConf);
}

/**
* Creates a snapshot manager from catalog table metadata.
*
* <p>Wire-up is intentionally deferred; this skeleton method currently throws until implemented
* in a follow-up PR.
*
* @throws UnsupportedOperationException always, until UC wiring is added
*/
public static DeltaSnapshotManager fromCatalogTable(
CatalogTable catalogTable, SparkSession spark, Configuration hadoopConf) {
requireNonNull(catalogTable, "catalogTable is null");
requireNonNull(spark, "spark is null");
requireNonNull(hadoopConf, "hadoopConf is null");
throw new UnsupportedOperationException("UC catalog wiring not implemented in skeleton");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Copyright (2025) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.spark.snapshot;

import io.delta.storage.commit.GetCommitsResponse;
import java.util.Optional;

/**
* Adapter interface for catalog-managed Delta tables.
*
* <p>This is a thin, protocol-aligned interface that adapters implement to fetch commit metadata
* from a catalog's commit coordinator API. Adapters are responsible only for communication with the
* catalog - they don't know about Delta snapshots or Kernel internals.
*
* <p>The {@link CatalogManagedSnapshotManager} uses this interface to retrieve commits and then
* builds Delta snapshots and commit ranges using Kernel's TableManager APIs.
*
* <p>Implementations should be catalog-specific (e.g., UnityCatalogAdapter, GlueCatalogAdapter) but
* share this common interface so the snapshot manager can work with any catalog.
*/
public interface ManagedCatalogAdapter extends AutoCloseable {

/**
* Returns the unique identifier for this table in the catalog.
*
* @return the catalog-assigned table identifier
*/
String getTableId();

/**
* Returns the storage path for this table.
*
* @return the filesystem path to the Delta table root
*/
String getTablePath();

/**
* Retrieves commits from the catalog's commit coordinator.
*
* <p>This is the primary method that adapters must implement. It calls the catalog's API to get
* the list of ratified commits within the specified version range.
*
* @param startVersion the starting version (inclusive), typically 0 for initial load
* @param endVersion optional ending version (inclusive); if empty, returns up to latest
* @return response containing the list of commits and the latest ratified table version
*/
GetCommitsResponse getCommits(long startVersion, Optional<Long> endVersion);

/**
* Returns the latest ratified table version from the catalog.
*
* <p>For catalog-managed tables, this is the highest version that has been successfully ratified
* by the catalog coordinator. Returns -1 if the catalog hasn't registered any commits yet (which
* can happen when version 0 exists but hasn't been ratified).
*
* <p>Default implementation calls {@link #getCommits} with no end version and extracts the latest
* version from the response. Implementations may override for efficiency if the catalog provides
* a dedicated API.
*
* @return the latest version ratified by the catalog, or -1 if none registered
*/
default long getLatestRatifiedVersion() {
return getCommits(0, Optional.empty()).getLatestTableVersion();
}

@Override
void close();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright (2025) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.spark.snapshot.unitycatalog;

import static java.util.Objects.requireNonNull;

/**
* Table information for Unity Catalog managed tables.
*
* <p>This POJO encapsulates all the information needed to interact with a Unity Catalog table
* without requiring Spark dependencies.
*/
public final class UCTableInfo {
private final String tableId;
private final String tablePath;
private final String ucUri;
private final String ucToken;

public UCTableInfo(String tableId, String tablePath, String ucUri, String ucToken) {
this.tableId = requireNonNull(tableId, "tableId is null");
this.tablePath = requireNonNull(tablePath, "tablePath is null");
this.ucUri = requireNonNull(ucUri, "ucUri is null");
this.ucToken = requireNonNull(ucToken, "ucToken is null");
}

public String getTableId() {
return tableId;
}

public String getTablePath() {
return tablePath;
}

public String getUcUri() {
return ucUri;
}

public String getUcToken() {
return ucToken;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Copyright (2025) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.spark.snapshot.unitycatalog;

import static java.util.Objects.requireNonNull;

import io.delta.kernel.spark.utils.CatalogTableUtils;
import io.delta.storage.commit.uccommitcoordinator.UCCommitCoordinatorClient;
import java.util.Map;
import java.util.Optional;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.catalog.CatalogTable;
import org.apache.spark.sql.delta.coordinatedcommits.UCCatalogConfig;
import org.apache.spark.sql.delta.coordinatedcommits.UCCommitCoordinatorBuilder$;

/**
* Utility class for extracting Unity Catalog table information from Spark catalog metadata.
*
* <p>This class isolates Spark dependencies, allowing {@link UCManagedSnapshotManager} to be
* created without Spark if table info is provided directly via {@link UCTableInfo}.
*/
public final class UCUtils {

// Utility class - no instances
private UCUtils() {}

/**
* Extracts Unity Catalog table information from Spark catalog table metadata.
*
* @param catalogTable Spark catalog table metadata
* @param spark SparkSession for resolving Unity Catalog configurations
* @return table info if table is UC-managed, empty otherwise
* @throws IllegalArgumentException if table is UC-managed but configuration is invalid
*/
public static Optional<UCTableInfo> extractTableInfo(
CatalogTable catalogTable, SparkSession spark) {
requireNonNull(catalogTable, "catalogTable is null");
requireNonNull(spark, "spark is null");

if (!CatalogTableUtils.isUnityCatalogManagedTable(catalogTable)) {
return Optional.empty();
}

String tableId = extractUCTableId(catalogTable);
String tablePath = extractTablePath(catalogTable);

// Get catalog name - require explicit catalog in identifier
scala.Option<String> catalogOption = catalogTable.identifier().catalog();
if (catalogOption.isEmpty()) {
throw new IllegalArgumentException(
"Unable to determine Unity Catalog for table "
+ catalogTable.identifier()
+ ": catalog name is missing. Use a fully-qualified table name with an explicit "
+ "catalog (e.g., catalog.schema.table).");
}
String catalogName = catalogOption.get();

// Get UC endpoint and token from Spark configs
scala.collection.immutable.Map<String, UCCatalogConfig> ucConfigs =
UCCommitCoordinatorBuilder$.MODULE$.getCatalogConfigMap(spark);

scala.Option<UCCatalogConfig> configOpt = ucConfigs.get(catalogName);

if (configOpt.isEmpty()) {
throw new IllegalArgumentException(
"Cannot create UC client for table "
+ catalogTable.identifier()
+ ": Unity Catalog configuration not found for catalog '"
+ catalogName
+ "'.");
}

UCCatalogConfig config = configOpt.get();
String ucUri = config.uri();
String ucToken = config.token();

return Optional.of(new UCTableInfo(tableId, tablePath, ucUri, ucToken));
}

private static String extractUCTableId(CatalogTable catalogTable) {
Map<String, String> storageProperties =
scala.jdk.javaapi.CollectionConverters.asJava(catalogTable.storage().properties());

// TODO: UC constants should be consolidated in a shared location (future PR)
String ucTableId = storageProperties.get(UCCommitCoordinatorClient.UC_TABLE_ID_KEY);
if (ucTableId == null || ucTableId.isEmpty()) {
throw new IllegalArgumentException(
"Cannot extract ucTableId from table " + catalogTable.identifier());
}
return ucTableId;
}

private static String extractTablePath(CatalogTable catalogTable) {
if (catalogTable.location() == null) {
throw new IllegalArgumentException(
"Cannot extract table path: location is null for table " + catalogTable.identifier());
}
return catalogTable.location().toString();
}
}
Loading
Loading