Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Copyright (2025) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.spark.utils;

import static java.util.Objects.requireNonNull;

import io.delta.kernel.internal.tablefeatures.TableFeatures;
import io.delta.storage.commit.uccommitcoordinator.UCCommitCoordinatorClient;
import java.util.Collections;
import java.util.Map;
import org.apache.spark.sql.catalyst.catalog.CatalogTable;

/**
* Utility helpers for inspecting Delta-related metadata persisted on Spark {@link CatalogTable}
* instances by Unity Catalog.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should assume or default to Unity - eh?

*
* <p>Unity Catalog marks catalog-managed tables via feature flags stored in table storage
* properties. This helper centralises the logic for interpreting those properties so the Kernel
* connector can decide when to use catalog-owned (CCv2) behaviour.
*
* <ul>
* <li>{@link #isCatalogManaged(CatalogTable)} checks whether either {@code
* delta.feature.catalogManaged} or {@code delta.feature.catalogOwned-preview} is set to
* {@code supported}, signalling that a catalog manages the table.
* <li>{@link #isUnityCatalogManagedTable(CatalogTable)} additionally verifies the presence of the
* Unity Catalog table identifier ({@link UCCommitCoordinatorClient#UC_TABLE_ID_KEY}) to
* confirm that the table is backed by Unity Catalog.
* </ul>
*/
public final class CatalogTableUtils {
/**
* Property key for catalog-managed feature flag. Constructed from {@link
* TableFeatures#CATALOG_MANAGED_RW_FEATURE} (delta.feature.catalogManaged) and preview variant
* (delta.feature.catalogOwned-preview)
*/
static final String FEATURE_CATALOG_MANAGED =
TableFeatures.SET_TABLE_FEATURE_SUPPORTED_PREFIX
+ TableFeatures.CATALOG_MANAGED_RW_FEATURE.featureName();

static final String FEATURE_CATALOG_OWNED_PREVIEW =
TableFeatures.SET_TABLE_FEATURE_SUPPORTED_PREFIX + "catalogOwned-preview";
private static final String SUPPORTED = TableFeatures.SET_TABLE_FEATURE_SUPPORTED_VALUE;

private CatalogTableUtils() {}

/**
* Checks whether any catalog manages this table via CCv2 semantics.
*
* @param table Spark {@link CatalogTable} descriptor
* @return {@code true} when either catalog feature flag is set to {@code supported}
*/
public static boolean isCatalogManaged(CatalogTable table) {
requireNonNull(table, "table is null");
Map<String, String> storageProperties = getStorageProperties(table);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

QQ: is storageProperties always not null? Given the method returns a boolean, shall we simply return false if storageProperties is null?

Copy link
Collaborator Author

@TimothyW553 TimothyW553 Nov 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep! that's a good idea. I've updated it so that if the table.storage().properties() is empty we return an emptyMap (less code, instead of checking for null each time) the empty map will result in a false anyway.

return isCatalogManagedFeatureEnabled(storageProperties, FEATURE_CATALOG_MANAGED)
|| isCatalogManagedFeatureEnabled(storageProperties, FEATURE_CATALOG_OWNED_PREVIEW);
}

/**
* Checks whether the table is Unity Catalog managed.
*
* @param table Spark {@link CatalogTable} descriptor
* @return {@code true} when the table is catalog managed and contains the UC identifier
*/
public static boolean isUnityCatalogManagedTable(CatalogTable table) {
requireNonNull(table, "table is null");
Map<String, String> storageProperties = getStorageProperties(table);
boolean isUCBacked = storageProperties.containsKey(UCCommitCoordinatorClient.UC_TABLE_ID_KEY);
return isUCBacked && isCatalogManaged(table);
}

/**
* Checks whether the given feature key is enabled in the table properties.
*
* @param tableProperties The table properties
* @param featureKey The feature key
* @return {@code true} when the feature key is set to {@code supported}
*/
private static boolean isCatalogManagedFeatureEnabled(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not a blocker for this PR, but I dont like this approach of embedding so much context of UC-specific context deep inside this v2 connector. please think about @huan233usc better abstractions for this. maybe something like a CatalogTableWithManagedCommits extends Catalog that has the additional context (in a generic way) of whether the catalog impl supports catalog-managed commits.

Copy link
Collaborator Author

@TimothyW553 TimothyW553 Nov 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, makes sense. For extensibility/support other catalogs we definitely dont want to couple this connector so much with UC specifics and overall make Delta catalog agnostic. Will keep that in mind and think about how we can make better abstractions for this.

Map<String, String> tableProperties, String featureKey) {
requireNonNull(tableProperties, "tableProperties is null");
requireNonNull(featureKey, "featureKey is null");
String featureValue = tableProperties.get(featureKey);
if (featureValue == null) {
return false;
}
return featureValue.equalsIgnoreCase(SUPPORTED);
}

/**
* Returns the catalog storage properties published with a {@link CatalogTable}.
*
* @param table Spark {@link CatalogTable} descriptor
* @return Java map view of the storage properties, never null
*/
private static Map<String, String> getStorageProperties(CatalogTable table) {
requireNonNull(table, "table is null");
if (table.storage() == null) {
return Collections.emptyMap();
}
Map<String, String> javaStorageProperties = ScalaUtils.toJavaMap(table.storage().properties());
return javaStorageProperties == null ? Collections.emptyMap() : javaStorageProperties;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@
*/
package io.delta.kernel.spark.utils;

import java.util.Collections;
import java.util.Map;
import scala.Tuple2;
import scala.collection.immutable.Map$;
import scala.collection.mutable.Builder;
import scala.jdk.javaapi.CollectionConverters;

public final class ScalaUtils {
public static scala.collection.immutable.Map<String, String> toScalaMap(
Expand All @@ -35,4 +37,15 @@ public static scala.collection.immutable.Map<String, String> toScalaMap(
}
return b.result();
}

public static Map<String, String> toJavaMap(
scala.collection.immutable.Map<String, String> scalaMap) {
if (scalaMap == null) {
return null;
}
if (scalaMap.isEmpty()) {
return Collections.emptyMap();
}
return CollectionConverters.asJava(scalaMap);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
/*
* Copyright (2025) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.spark.utils;

import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

import io.delta.storage.commit.uccommitcoordinator.UCCommitCoordinatorClient;
import java.util.Collections;
import java.util.Map;
import org.apache.spark.sql.catalyst.catalog.CatalogTable;
import org.junit.jupiter.api.Test;

/** Tests for {@link CatalogTableUtils}. */
class CatalogTableUtilsTest {

@Test
void testIsCatalogManaged_CatalogFlagEnabled_ReturnsTrue() {
CatalogTable table =
catalogTable(
Collections.emptyMap(), Map.of(CatalogTableUtils.FEATURE_CATALOG_MANAGED, "supported"));

assertTrue(
CatalogTableUtils.isCatalogManaged(table), "Catalog-managed flag should enable detection");
}

@Test
void testIsCatalogManaged_PreviewFlagEnabled_ReturnsTrue() {
CatalogTable table =
catalogTable(
Collections.emptyMap(),
Map.of(CatalogTableUtils.FEATURE_CATALOG_OWNED_PREVIEW, "SuPpOrTeD"));

assertTrue(
CatalogTableUtils.isCatalogManaged(table),
"Preview flag should enable detection ignoring case");
}

@Test
void testIsCatalogManaged_NoFlags_ReturnsFalse() {
CatalogTable table = catalogTable(Collections.emptyMap(), Collections.emptyMap());

assertFalse(
CatalogTableUtils.isCatalogManaged(table), "No catalog flags should disable detection");
}

@Test
void testIsUnityCatalogManaged_FlagAndIdPresent_ReturnsTrue() {
CatalogTable table =
catalogTable(
Collections.emptyMap(),
Map.of(
CatalogTableUtils.FEATURE_CATALOG_MANAGED,
"supported",
UCCommitCoordinatorClient.UC_TABLE_ID_KEY,
"abc-123"));

assertTrue(
CatalogTableUtils.isUnityCatalogManagedTable(table),
"Unity Catalog detection should require flag and identifier");
}

@Test
void testIsUnityCatalogManaged_MissingId_ReturnsFalse() {
CatalogTable table =
catalogTable(
Collections.emptyMap(), Map.of(CatalogTableUtils.FEATURE_CATALOG_MANAGED, "supported"));

assertFalse(
CatalogTableUtils.isUnityCatalogManagedTable(table),
"Missing table identifier should break Unity detection");
}

@Test
void testIsUnityCatalogManaged_PreviewFlagMissingId_ReturnsFalse() {
CatalogTable table =
catalogTable(
Collections.emptyMap(),
Map.of(CatalogTableUtils.FEATURE_CATALOG_OWNED_PREVIEW, "supported"));

assertFalse(
CatalogTableUtils.isUnityCatalogManagedTable(table),
"Preview flag without ID should not be considered Unity managed");
}

@Test
void testIsCatalogManaged_NullTable_ThrowsException() {
assertThrows(
NullPointerException.class,
() -> CatalogTableUtils.isCatalogManaged(null),
"Null table should throw NullPointerException");
}

@Test
void testIsUnityCatalogManaged_NullTable_ThrowsException() {
assertThrows(
NullPointerException.class,
() -> CatalogTableUtils.isUnityCatalogManagedTable(null),
"Null table should throw NullPointerException");
}

@Test
void testIsCatalogManaged_NullStorage_ReturnsFalse() {
CatalogTable table = catalogTableWithNullStorage(Collections.emptyMap());

assertFalse(
CatalogTableUtils.isCatalogManaged(table),
"Null storage should not be considered catalog managed");
}

@Test
void testIsUnityCatalogManaged_NullStorage_ReturnsFalse() {
CatalogTable table = catalogTableWithNullStorage(Collections.emptyMap());

assertFalse(
CatalogTableUtils.isUnityCatalogManagedTable(table),
"Null storage should not be considered Unity managed");
}

@Test
void testIsCatalogManaged_NullStorageProperties_ReturnsFalse() {
CatalogTable table = catalogTableWithNullStorageProperties(Collections.emptyMap());

assertFalse(
CatalogTableUtils.isCatalogManaged(table),
"Null storage properties should not be considered catalog managed");
}

@Test
void testIsUnityCatalogManaged_NullStorageProperties_ReturnsFalse() {
CatalogTable table = catalogTableWithNullStorageProperties(Collections.emptyMap());

assertFalse(
CatalogTableUtils.isUnityCatalogManagedTable(table),
"Null storage properties should not be considered Unity managed");
}

private static CatalogTable catalogTable(
Map<String, String> properties, Map<String, String> storageProperties) {
return CatalogTableTestUtils$.MODULE$.catalogTableWithProperties(properties, storageProperties);
}

private static CatalogTable catalogTableWithNullStorage(Map<String, String> properties) {
return CatalogTableTestUtils$.MODULE$.catalogTableWithNullStorage(properties);
}

private static CatalogTable catalogTableWithNullStorageProperties(
Map<String, String> properties) {
return CatalogTableTestUtils$.MODULE$.catalogTableWithNullStorageProperties(properties);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright (2025) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.spark.utils;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.util.Collections;
import java.util.Map;
import org.junit.jupiter.api.Test;

class ScalaUtilsTest {

@Test
void testToJavaMap_NullInput_ReturnsNull() {
assertNull(ScalaUtils.toJavaMap(null), "Null scala maps should return null");
}

@Test
void testToJavaMap_EmptyInput_ReturnsEmptyMap() {
scala.collection.immutable.Map<String, String> emptyScalaMap =
ScalaUtils.toScalaMap(Collections.emptyMap());

Map<String, String> javaMap = ScalaUtils.toJavaMap(emptyScalaMap);

assertTrue(javaMap.isEmpty(), "Empty scala maps should convert to empty java maps");
}

@Test
void testToJavaMap_PopulatedInput_PreservesEntries() {
scala.collection.immutable.Map<String, String> scalaMap =
ScalaUtils.toScalaMap(Map.of("foo", "bar"));

Map<String, String> javaMap = ScalaUtils.toJavaMap(scalaMap);

assertEquals(Map.of("foo", "bar"), javaMap, "Scala map entries should be preserved");
}
}
Loading
Loading