Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.delta.kernel.spark.read.SparkScanBuilder;
import io.delta.kernel.spark.snapshot.DeltaSnapshotManager;
import io.delta.kernel.spark.snapshot.PathBasedSnapshotManager;
import io.delta.kernel.spark.unity.UnityCatalogClientFactory;
import io.delta.kernel.spark.utils.SchemaUtils;
import java.util.*;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -58,6 +59,7 @@ public class SparkTable implements Table, SupportsRead {
private final Column[] columns;
private final Transform[] partitionTransforms;
private final Optional<CatalogTable> catalogTable;
private final Optional<UnityCatalogClientFactory.UnityCatalogClient> unityCatalogClient;

/**
* Creates a SparkTable from a filesystem path without a catalog table.
Expand Down Expand Up @@ -118,6 +120,9 @@ private SparkTable(
Optional<CatalogTable> catalogTable) {
this.identifier = requireNonNull(identifier, "identifier is null");
this.catalogTable = catalogTable;
this.unityCatalogClient =
catalogTable.flatMap(
table -> UnityCatalogClientFactory.create(SparkSession.active(), identifier, table));
// Merge options: file system options from catalog + user options (user takes precedence)
// This follows the same pattern as DeltaTableV2 in delta-spark
Map<String, String> merged = new HashMap<>();
Expand Down Expand Up @@ -197,6 +202,10 @@ public Optional<CatalogTable> getCatalogTable() {
return catalogTable;
}

public Optional<UnityCatalogClientFactory.UnityCatalogClient> getUnityCatalogClient() {
return unityCatalogClient;
}

@Override
public String name() {
return identifier.name();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,278 @@
package io.delta.kernel.spark.unity;

import static java.util.Objects.requireNonNull;

import io.delta.kernel.spark.utils.CatalogTableUtils;
import io.delta.kernel.spark.utils.ScalaUtils;
import io.delta.storage.commit.uccommitcoordinator.UCClient;
import io.delta.storage.commit.uccommitcoordinator.UCTokenBasedRestClient;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.BiFunction;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.catalog.CatalogTable;
import org.apache.spark.sql.connector.catalog.Identifier;

/**
* Factory for constructing Unity Catalog clients from Spark session catalog configuration.
*
* <p>The logic mirrors the config resolution performed by {@code
* org.apache.spark.sql.delta.coordinatedcommits.UCCommitCoordinatorBuilder}, ensuring the connector
* honours the same semantics across V1 and Kernel-backed paths.
*/
public final class UnityCatalogClientFactory {

private static final String SPARK_SQL_CATALOG_PREFIX = "spark.sql.catalog.";
private static final String URI_SUFFIX = "uri";
private static final String TOKEN_SUFFIX = "token";
private static final String WAREHOUSE_SUFFIX = "warehouse";
private static final String UNITY_CATALOG_CONNECTOR_CLASS =
"io.unitycatalog.spark.UCSingleCatalog";

private UnityCatalogClientFactory() {}

private static volatile BiFunction<String, String, UCClient> clientBuilder =
UCTokenBasedRestClient::new;

/**
* Creates a Unity Catalog client for the provided catalog table if it is Unity Catalog managed.
*
* @param spark active Spark session
* @param identifier table identifier supplied by Spark DSv2
* @param catalogTable catalog metadata for the table
* @return optional Unity Catalog client details; empty when the table is not UC managed
*/
public static Optional<UnityCatalogClient> create(
SparkSession spark, Identifier identifier, CatalogTable catalogTable) {
requireNonNull(spark, "spark session is null");
requireNonNull(identifier, "identifier is null");
requireNonNull(catalogTable, "catalogTable is null");

if (!CatalogTableUtils.isUnityCatalogManagedTable(catalogTable)) {
return Optional.empty();
}

Map<String, String> sparkConf = ScalaUtils.toJavaMap(spark.conf().getAll());

List<CatalogConfig> unityCatalogs = collectUnityCatalogConfigs(sparkConf);
if (unityCatalogs.isEmpty()) {
throw new IllegalStateException(
"Unity Catalog table detected but no Unity Catalog connectors are configured.");
}

Optional<String> requestedCatalogName = extractCatalogName(identifier);
CatalogConfig selectedCatalog =
selectCatalogConfig(requestedCatalogName, unityCatalogs, identifier.name());

UCClient ucClient = clientBuilder.apply(selectedCatalog.uri, selectedCatalog.token);
UnityCatalogClient clientHandle =
new UnityCatalogClient(selectedCatalog.name, ucClient, selectedCatalog.warehouse);
return Optional.of(clientHandle);
}

private static CatalogConfig selectCatalogConfig(
Optional<String> requestedCatalogName, List<CatalogConfig> unityCatalogs, String tableName) {
if (requestedCatalogName.isPresent()) {
Optional<CatalogConfig> match =
unityCatalogs.stream()
.filter(config -> config.name.equalsIgnoreCase(requestedCatalogName.get()))
.findFirst();
if (match.isPresent()) {
return match.get();
}
if (unityCatalogs.size() == 1) {
return unityCatalogs.get(0);
}
throw new IllegalStateException(
String.format(
Locale.ROOT,
"Unable to locate Unity Catalog connector '%s' for table '%s'.",
requestedCatalogName.get(),
tableName));
}

if (unityCatalogs.size() == 1) {
return unityCatalogs.get(0);
}

throw new IllegalStateException(
String.format(
Locale.ROOT,
"Multiple Unity Catalog connectors configured (%s) but table '%s' does not carry a "
+ "catalog-qualified identifier.",
listCatalogNames(unityCatalogs),
tableName));
}

private static List<CatalogConfig> collectUnityCatalogConfigs(Map<String, String> sparkConf) {
Map<String, CatalogProperties> catalogProperties = new HashMap<>();

for (Map.Entry<String, String> entry : sparkConf.entrySet()) {
String key = entry.getKey();
if (!key.startsWith(SPARK_SQL_CATALOG_PREFIX)) {
continue;
}
String remainder = key.substring(SPARK_SQL_CATALOG_PREFIX.length());
int dotIndex = remainder.indexOf('.');
String catalogName;
String propertyKey = null;
if (dotIndex == -1) {
catalogName = remainder;
} else {
catalogName = remainder.substring(0, dotIndex);
propertyKey = remainder.substring(dotIndex + 1);
}

CatalogProperties properties =
catalogProperties.computeIfAbsent(catalogName, CatalogProperties::new);
if (propertyKey == null) {
properties.connectorClass = entry.getValue();
} else if (URI_SUFFIX.equals(propertyKey)) {
properties.uri = entry.getValue();
} else if (TOKEN_SUFFIX.equals(propertyKey)) {
properties.token = entry.getValue();
} else if (WAREHOUSE_SUFFIX.equals(propertyKey)) {
properties.warehouse = Optional.ofNullable(entry.getValue()).filter(v -> !v.isEmpty());
}
}

List<CatalogConfig> unityCatalogs = new ArrayList<>();
for (CatalogProperties properties : catalogProperties.values()) {
if (!UNITY_CATALOG_CONNECTOR_CLASS.equals(properties.connectorClass)) {
continue;
}

String uri = requireTrimmed(properties.uri, properties.name, URI_SUFFIX);
try {
new URI(uri);
} catch (URISyntaxException e) {
throw new IllegalStateException(
String.format(
Locale.ROOT,
"Invalid Unity Catalog URI '%s' configured for catalog '%s'.",
uri,
properties.name),
e);
}

String token = requireTrimmed(properties.token, properties.name, TOKEN_SUFFIX);
unityCatalogs.add(new CatalogConfig(properties.name, uri, token, properties.warehouse));
}

return unityCatalogs;
}

private static String listCatalogNames(List<CatalogConfig> configs) {
List<String> names = new ArrayList<>(configs.size());
for (CatalogConfig config : configs) {
names.add(config.name);
}
Collections.sort(names, String.CASE_INSENSITIVE_ORDER);
return String.join(",", names);
}

private static String requireTrimmed(String value, String catalogName, String propertySuffix) {
if (value == null) {
throw new IllegalStateException(
String.format(
Locale.ROOT,
"Missing Unity Catalog configuration '%s%s%s'.",
SPARK_SQL_CATALOG_PREFIX,
catalogName,
propertySuffix.isEmpty() ? "" : "." + propertySuffix));
}
String trimmed = value.trim();
if (trimmed.isEmpty()) {
throw new IllegalStateException(
String.format(
Locale.ROOT,
"Unity Catalog configuration '%s%s.%s' cannot be empty.",
SPARK_SQL_CATALOG_PREFIX,
catalogName,
propertySuffix));
}
return trimmed;
}

private static Optional<String> extractCatalogName(Identifier identifier) {
String[] namespace = identifier.namespace();
if (namespace != null && namespace.length > 0) {
return Optional.of(namespace[0]);
}
return Optional.empty();
}

/** Unity Catalog client handle containing additional metadata required by the connector. */
public static final class UnityCatalogClient implements AutoCloseable {
private final String catalogName;
private final UCClient ucClient;
private final Optional<String> warehouse;

UnityCatalogClient(String catalogName, UCClient ucClient, Optional<String> warehouse) {
this.catalogName = requireNonNull(catalogName, "catalogName is null");
this.ucClient = requireNonNull(ucClient, "ucClient is null");
this.warehouse = Objects.requireNonNullElseGet(warehouse, Optional::empty);
}

public String getCatalogName() {
return catalogName;
}

public UCClient getUcClient() {
return ucClient;
}

public Optional<String> getWarehouse() {
return warehouse;
}

@Override
public void close() throws Exception {
ucClient.close();
}
}

private static final class CatalogProperties {
private final String name;
private String connectorClass;
private String uri;
private String token;
private Optional<String> warehouse = Optional.empty();

private CatalogProperties(String name) {
this.name = name;
}
}

private static final class CatalogConfig {
private final String name;
private final String uri;
private final String token;
private final Optional<String> warehouse;

private CatalogConfig(String name, String uri, String token, Optional<String> warehouse) {
this.name = name;
this.uri = uri;
this.token = token;
this.warehouse = warehouse;
}
}

/** Visible for testing: override the UC client builder. */
public static void setClientBuilderForTesting(BiFunction<String, String, UCClient> builder) {
clientBuilder = requireNonNull(builder, "builder is null");
}

/** Visible for testing: reset the UC client builder to the default implementation. */
public static void resetClientBuilderForTesting() {
clientBuilder = UCTokenBasedRestClient::new;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package io.delta.kernel.spark.utils;

import static java.util.Objects.requireNonNull;

import io.delta.storage.commit.uccommitcoordinator.UCCommitCoordinatorClient;
import java.util.HashMap;
import java.util.Map;
import org.apache.spark.sql.catalyst.catalog.CatalogTable;

/**
* Utility helpers for inspecting Delta-related metadata persisted on Spark {@link CatalogTable}
* instances by Unity Catalog.
*
* <p>Unity Catalog marks catalog-managed tables via feature flags stored in table storage
* properties. This helper centralises the logic for interpreting those properties so the Kernel
* connector can decide when to use catalog-owned (CCv2) behaviour.
*
* <ul>
* <li>{@link #isCatalogManaged(CatalogTable)} checks whether either {@code
* delta.feature.catalogManaged} or {@code delta.feature.catalogOwned-preview} is set to
* {@code supported}, signalling that a catalog manages the table.
* <li>{@link #isUnityCatalogManagedTable(CatalogTable)} additionally verifies the presence of the
* Unity Catalog table identifier ({@link UCCommitCoordinatorClient#UC_TABLE_ID_KEY}) to
* confirm that the table is backed by Unity Catalog.
* </ul>
*/
public final class CatalogTableUtils {
static final String FEATURE_CATALOG_MANAGED = "delta.feature.catalogManaged";
static final String FEATURE_CATALOG_OWNED_PREVIEW = "delta.feature.catalogOwned-preview";
private static final String SUPPORTED = "supported";

private CatalogTableUtils() {}

// Checks whether *any* catalog manages this table via CCv2 semantics by checking
// if the catalogManaged/catalogOwned-preview flags are 'supported'
public static boolean isCatalogManaged(CatalogTable table) {
requireNonNull(table, "table is null");
Map<String, String> storageProperties = getStorageProperties(table);
return isCatalogManagedFeatureEnabled(storageProperties, FEATURE_CATALOG_MANAGED)
|| isCatalogManagedFeatureEnabled(storageProperties, FEATURE_CATALOG_OWNED_PREVIEW);
}

// Checks if table is *Unity Catalog* managed - meaning it isCatalogManaged, and it contains
// a UC identifier (UC_TABLE_ID_KEY)
public static boolean isUnityCatalogManagedTable(CatalogTable table) {
requireNonNull(table, "table is null");
Map<String, String> storageProperties = getStorageProperties(table);
boolean isUCBacked = storageProperties.containsKey(UCCommitCoordinatorClient.UC_TABLE_ID_KEY);
return isUCBacked && isCatalogManaged(table);
}

// We merge the storage and logical properties into a single map because we want to be able to
// access both sets of properties in a single method.
public static Map<String, String> getStorageProperties(CatalogTable table) {
requireNonNull(table, "table is null");
Map<String, String> merged = new HashMap<>();
merged.putAll(ScalaUtils.toJavaMap(table.storage().properties()));
merged.putAll(ScalaUtils.toJavaMap(table.properties()));
return merged;
}

public static boolean isCatalogManagedFeatureEnabled(
Map<String, String> tableProperties, String featureKey) {
requireNonNull(tableProperties, "tableProperties is null");
requireNonNull(featureKey, "featureKey is null");
String featureValue = tableProperties.get(featureKey);
if (featureValue == null) {
return false;
}
return featureValue.equalsIgnoreCase(SUPPORTED);
}
}
Loading
Loading