diff --git a/kernel-spark/src/main/java/io/delta/kernel/spark/utils/CatalogTableUtils.java b/kernel-spark/src/main/java/io/delta/kernel/spark/utils/CatalogTableUtils.java
new file mode 100644
index 00000000000..9c56003476c
--- /dev/null
+++ b/kernel-spark/src/main/java/io/delta/kernel/spark/utils/CatalogTableUtils.java
@@ -0,0 +1,117 @@
+/*
+ * Copyright (2025) The Delta Lake Project Authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.delta.kernel.spark.utils;
+
+import static java.util.Objects.requireNonNull;
+
+import io.delta.kernel.internal.tablefeatures.TableFeatures;
+import io.delta.storage.commit.uccommitcoordinator.UCCommitCoordinatorClient;
+import java.util.Collections;
+import java.util.Map;
+import org.apache.spark.sql.catalyst.catalog.CatalogTable;
+
+/**
+ * Utility helpers for inspecting Delta-related metadata persisted on Spark {@link CatalogTable}
+ * instances by Unity Catalog.
+ *
+ *
Unity Catalog marks catalog-managed tables via feature flags stored in table storage
+ * properties. This helper centralises the logic for interpreting those properties so the Kernel
+ * connector can decide when to use catalog-owned (CCv2) behaviour.
+ *
+ *
+ * - {@link #isCatalogManaged(CatalogTable)} checks whether either {@code
+ * delta.feature.catalogManaged} or {@code delta.feature.catalogOwned-preview} is set to
+ * {@code supported}, signalling that a catalog manages the table.
+ *
- {@link #isUnityCatalogManagedTable(CatalogTable)} additionally verifies the presence of the
+ * Unity Catalog table identifier ({@link UCCommitCoordinatorClient#UC_TABLE_ID_KEY}) to
+ * confirm that the table is backed by Unity Catalog.
+ *
+ */
+public final class CatalogTableUtils {
+ /**
+ * Property key for catalog-managed feature flag. Constructed from {@link
+ * TableFeatures#CATALOG_MANAGED_RW_FEATURE} (delta.feature.catalogManaged) and preview variant
+ * (delta.feature.catalogOwned-preview)
+ */
+ static final String FEATURE_CATALOG_MANAGED =
+ TableFeatures.SET_TABLE_FEATURE_SUPPORTED_PREFIX
+ + TableFeatures.CATALOG_MANAGED_RW_FEATURE.featureName();
+
+ static final String FEATURE_CATALOG_OWNED_PREVIEW =
+ TableFeatures.SET_TABLE_FEATURE_SUPPORTED_PREFIX + "catalogOwned-preview";
+ private static final String SUPPORTED = TableFeatures.SET_TABLE_FEATURE_SUPPORTED_VALUE;
+
+ private CatalogTableUtils() {}
+
+ /**
+ * Checks whether any catalog manages this table via CCv2 semantics.
+ *
+ * @param table Spark {@link CatalogTable} descriptor
+ * @return {@code true} when either catalog feature flag is set to {@code supported}
+ */
+ public static boolean isCatalogManaged(CatalogTable table) {
+ requireNonNull(table, "table is null");
+ Map storageProperties = getStorageProperties(table);
+ return isCatalogManagedFeatureEnabled(storageProperties, FEATURE_CATALOG_MANAGED)
+ || isCatalogManagedFeatureEnabled(storageProperties, FEATURE_CATALOG_OWNED_PREVIEW);
+ }
+
+ /**
+ * Checks whether the table is Unity Catalog managed.
+ *
+ * @param table Spark {@link CatalogTable} descriptor
+ * @return {@code true} when the table is catalog managed and contains the UC identifier
+ */
+ public static boolean isUnityCatalogManagedTable(CatalogTable table) {
+ requireNonNull(table, "table is null");
+ Map storageProperties = getStorageProperties(table);
+ boolean isUCBacked = storageProperties.containsKey(UCCommitCoordinatorClient.UC_TABLE_ID_KEY);
+ return isUCBacked && isCatalogManaged(table);
+ }
+
+ /**
+ * Checks whether the given feature key is enabled in the table properties.
+ *
+ * @param tableProperties The table properties
+ * @param featureKey The feature key
+ * @return {@code true} when the feature key is set to {@code supported}
+ */
+ private static boolean isCatalogManagedFeatureEnabled(
+ Map tableProperties, String featureKey) {
+ requireNonNull(tableProperties, "tableProperties is null");
+ requireNonNull(featureKey, "featureKey is null");
+ String featureValue = tableProperties.get(featureKey);
+ if (featureValue == null) {
+ return false;
+ }
+ return featureValue.equalsIgnoreCase(SUPPORTED);
+ }
+
+ /**
+ * Returns the catalog storage properties published with a {@link CatalogTable}.
+ *
+ * @param table Spark {@link CatalogTable} descriptor
+ * @return Java map view of the storage properties, never null
+ */
+ private static Map getStorageProperties(CatalogTable table) {
+ requireNonNull(table, "table is null");
+ if (table.storage() == null) {
+ return Collections.emptyMap();
+ }
+ Map javaStorageProperties = ScalaUtils.toJavaMap(table.storage().properties());
+ return javaStorageProperties == null ? Collections.emptyMap() : javaStorageProperties;
+ }
+}
diff --git a/kernel-spark/src/main/java/io/delta/kernel/spark/utils/ScalaUtils.java b/kernel-spark/src/main/java/io/delta/kernel/spark/utils/ScalaUtils.java
index 3832dbd5fb3..9de26b09760 100644
--- a/kernel-spark/src/main/java/io/delta/kernel/spark/utils/ScalaUtils.java
+++ b/kernel-spark/src/main/java/io/delta/kernel/spark/utils/ScalaUtils.java
@@ -15,10 +15,12 @@
*/
package io.delta.kernel.spark.utils;
+import java.util.Collections;
import java.util.Map;
import scala.Tuple2;
import scala.collection.immutable.Map$;
import scala.collection.mutable.Builder;
+import scala.jdk.javaapi.CollectionConverters;
public final class ScalaUtils {
public static scala.collection.immutable.Map toScalaMap(
@@ -35,4 +37,15 @@ public static scala.collection.immutable.Map toScalaMap(
}
return b.result();
}
+
+ public static Map toJavaMap(
+ scala.collection.immutable.Map scalaMap) {
+ if (scalaMap == null) {
+ return null;
+ }
+ if (scalaMap.isEmpty()) {
+ return Collections.emptyMap();
+ }
+ return CollectionConverters.asJava(scalaMap);
+ }
}
diff --git a/kernel-spark/src/test/java/io/delta/kernel/spark/utils/CatalogTableUtilsTest.java b/kernel-spark/src/test/java/io/delta/kernel/spark/utils/CatalogTableUtilsTest.java
new file mode 100644
index 00000000000..24e6593890a
--- /dev/null
+++ b/kernel-spark/src/test/java/io/delta/kernel/spark/utils/CatalogTableUtilsTest.java
@@ -0,0 +1,165 @@
+/*
+ * Copyright (2025) The Delta Lake Project Authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.delta.kernel.spark.utils;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import io.delta.storage.commit.uccommitcoordinator.UCCommitCoordinatorClient;
+import java.util.Collections;
+import java.util.Map;
+import org.apache.spark.sql.catalyst.catalog.CatalogTable;
+import org.junit.jupiter.api.Test;
+
+/** Tests for {@link CatalogTableUtils}. */
+class CatalogTableUtilsTest {
+
+ @Test
+ void testIsCatalogManaged_CatalogFlagEnabled_ReturnsTrue() {
+ CatalogTable table =
+ catalogTable(
+ Collections.emptyMap(), Map.of(CatalogTableUtils.FEATURE_CATALOG_MANAGED, "supported"));
+
+ assertTrue(
+ CatalogTableUtils.isCatalogManaged(table), "Catalog-managed flag should enable detection");
+ }
+
+ @Test
+ void testIsCatalogManaged_PreviewFlagEnabled_ReturnsTrue() {
+ CatalogTable table =
+ catalogTable(
+ Collections.emptyMap(),
+ Map.of(CatalogTableUtils.FEATURE_CATALOG_OWNED_PREVIEW, "SuPpOrTeD"));
+
+ assertTrue(
+ CatalogTableUtils.isCatalogManaged(table),
+ "Preview flag should enable detection ignoring case");
+ }
+
+ @Test
+ void testIsCatalogManaged_NoFlags_ReturnsFalse() {
+ CatalogTable table = catalogTable(Collections.emptyMap(), Collections.emptyMap());
+
+ assertFalse(
+ CatalogTableUtils.isCatalogManaged(table), "No catalog flags should disable detection");
+ }
+
+ @Test
+ void testIsUnityCatalogManaged_FlagAndIdPresent_ReturnsTrue() {
+ CatalogTable table =
+ catalogTable(
+ Collections.emptyMap(),
+ Map.of(
+ CatalogTableUtils.FEATURE_CATALOG_MANAGED,
+ "supported",
+ UCCommitCoordinatorClient.UC_TABLE_ID_KEY,
+ "abc-123"));
+
+ assertTrue(
+ CatalogTableUtils.isUnityCatalogManagedTable(table),
+ "Unity Catalog detection should require flag and identifier");
+ }
+
+ @Test
+ void testIsUnityCatalogManaged_MissingId_ReturnsFalse() {
+ CatalogTable table =
+ catalogTable(
+ Collections.emptyMap(), Map.of(CatalogTableUtils.FEATURE_CATALOG_MANAGED, "supported"));
+
+ assertFalse(
+ CatalogTableUtils.isUnityCatalogManagedTable(table),
+ "Missing table identifier should break Unity detection");
+ }
+
+ @Test
+ void testIsUnityCatalogManaged_PreviewFlagMissingId_ReturnsFalse() {
+ CatalogTable table =
+ catalogTable(
+ Collections.emptyMap(),
+ Map.of(CatalogTableUtils.FEATURE_CATALOG_OWNED_PREVIEW, "supported"));
+
+ assertFalse(
+ CatalogTableUtils.isUnityCatalogManagedTable(table),
+ "Preview flag without ID should not be considered Unity managed");
+ }
+
+ @Test
+ void testIsCatalogManaged_NullTable_ThrowsException() {
+ assertThrows(
+ NullPointerException.class,
+ () -> CatalogTableUtils.isCatalogManaged(null),
+ "Null table should throw NullPointerException");
+ }
+
+ @Test
+ void testIsUnityCatalogManaged_NullTable_ThrowsException() {
+ assertThrows(
+ NullPointerException.class,
+ () -> CatalogTableUtils.isUnityCatalogManagedTable(null),
+ "Null table should throw NullPointerException");
+ }
+
+ @Test
+ void testIsCatalogManaged_NullStorage_ReturnsFalse() {
+ CatalogTable table = catalogTableWithNullStorage(Collections.emptyMap());
+
+ assertFalse(
+ CatalogTableUtils.isCatalogManaged(table),
+ "Null storage should not be considered catalog managed");
+ }
+
+ @Test
+ void testIsUnityCatalogManaged_NullStorage_ReturnsFalse() {
+ CatalogTable table = catalogTableWithNullStorage(Collections.emptyMap());
+
+ assertFalse(
+ CatalogTableUtils.isUnityCatalogManagedTable(table),
+ "Null storage should not be considered Unity managed");
+ }
+
+ @Test
+ void testIsCatalogManaged_NullStorageProperties_ReturnsFalse() {
+ CatalogTable table = catalogTableWithNullStorageProperties(Collections.emptyMap());
+
+ assertFalse(
+ CatalogTableUtils.isCatalogManaged(table),
+ "Null storage properties should not be considered catalog managed");
+ }
+
+ @Test
+ void testIsUnityCatalogManaged_NullStorageProperties_ReturnsFalse() {
+ CatalogTable table = catalogTableWithNullStorageProperties(Collections.emptyMap());
+
+ assertFalse(
+ CatalogTableUtils.isUnityCatalogManagedTable(table),
+ "Null storage properties should not be considered Unity managed");
+ }
+
+ private static CatalogTable catalogTable(
+ Map properties, Map storageProperties) {
+ return CatalogTableTestUtils$.MODULE$.catalogTableWithProperties(properties, storageProperties);
+ }
+
+ private static CatalogTable catalogTableWithNullStorage(Map properties) {
+ return CatalogTableTestUtils$.MODULE$.catalogTableWithNullStorage(properties);
+ }
+
+ private static CatalogTable catalogTableWithNullStorageProperties(
+ Map properties) {
+ return CatalogTableTestUtils$.MODULE$.catalogTableWithNullStorageProperties(properties);
+ }
+}
diff --git a/kernel-spark/src/test/java/io/delta/kernel/spark/utils/ScalaUtilsTest.java b/kernel-spark/src/test/java/io/delta/kernel/spark/utils/ScalaUtilsTest.java
new file mode 100644
index 00000000000..67e71e1e4d7
--- /dev/null
+++ b/kernel-spark/src/test/java/io/delta/kernel/spark/utils/ScalaUtilsTest.java
@@ -0,0 +1,52 @@
+/*
+ * Copyright (2025) The Delta Lake Project Authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.delta.kernel.spark.utils;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.Collections;
+import java.util.Map;
+import org.junit.jupiter.api.Test;
+
+class ScalaUtilsTest {
+
+ @Test
+ void testToJavaMap_NullInput_ReturnsNull() {
+ assertNull(ScalaUtils.toJavaMap(null), "Null scala maps should return null");
+ }
+
+ @Test
+ void testToJavaMap_EmptyInput_ReturnsEmptyMap() {
+ scala.collection.immutable.Map emptyScalaMap =
+ ScalaUtils.toScalaMap(Collections.emptyMap());
+
+ Map javaMap = ScalaUtils.toJavaMap(emptyScalaMap);
+
+ assertTrue(javaMap.isEmpty(), "Empty scala maps should convert to empty java maps");
+ }
+
+ @Test
+ void testToJavaMap_PopulatedInput_PreservesEntries() {
+ scala.collection.immutable.Map scalaMap =
+ ScalaUtils.toScalaMap(Map.of("foo", "bar"));
+
+ Map javaMap = ScalaUtils.toJavaMap(scalaMap);
+
+ assertEquals(Map.of("foo", "bar"), javaMap, "Scala map entries should be preserved");
+ }
+}
diff --git a/kernel-spark/src/test/scala/io/delta/kernel/spark/utils/CatalogTableTestUtils.scala b/kernel-spark/src/test/scala/io/delta/kernel/spark/utils/CatalogTableTestUtils.scala
new file mode 100644
index 00000000000..4f80f346bb8
--- /dev/null
+++ b/kernel-spark/src/test/scala/io/delta/kernel/spark/utils/CatalogTableTestUtils.scala
@@ -0,0 +1,91 @@
+/*
+ * Copyright (2025) The Delta Lake Project Authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.delta.kernel.spark.utils
+
+import scala.collection.immutable.Seq
+
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType}
+import org.apache.spark.sql.types.StructType
+
+/**
+ * Helpers for constructing [[CatalogTable]] instances inside Java tests.
+ *
+ * Spark's [[CatalogTable]] is defined in Scala and its constructor signature shifts between Spark
+ * releases. Centralising the construction in Scala keeps the kernel tests insulated from those
+ * binary changes and saves Java tests from manually wiring the many optional parameters.
+ */
+object CatalogTableTestUtils {
+
+ def catalogTableWithProperties(
+ properties: java.util.Map[String, String],
+ storageProperties: java.util.Map[String, String]): CatalogTable = {
+ val scalaProps = ScalaUtils.toScalaMap(properties)
+ val scalaStorageProps = ScalaUtils.toScalaMap(storageProperties)
+
+ CatalogTable(
+ identifier = TableIdentifier("tbl"),
+ tableType = CatalogTableType.MANAGED,
+ storage = CatalogStorageFormat(
+ locationUri = None,
+ inputFormat = None,
+ outputFormat = None,
+ serde = None,
+ compressed = false,
+ properties = scalaStorageProps),
+ schema = new StructType(),
+ provider = None,
+ partitionColumnNames = Seq.empty,
+ bucketSpec = None,
+ properties = scalaProps)
+ }
+
+ def catalogTableWithNullStorage(
+ properties: java.util.Map[String, String]): CatalogTable = {
+ val scalaProps = ScalaUtils.toScalaMap(properties)
+
+ CatalogTable(
+ identifier = TableIdentifier("tbl"),
+ tableType = CatalogTableType.MANAGED,
+ storage = null,
+ schema = new StructType(),
+ provider = None,
+ partitionColumnNames = Seq.empty,
+ bucketSpec = None,
+ properties = scalaProps)
+ }
+
+ def catalogTableWithNullStorageProperties(
+ properties: java.util.Map[String, String]): CatalogTable = {
+ val scalaProps = ScalaUtils.toScalaMap(properties)
+
+ CatalogTable(
+ identifier = TableIdentifier("tbl"),
+ tableType = CatalogTableType.MANAGED,
+ storage = CatalogStorageFormat(
+ locationUri = None,
+ inputFormat = None,
+ outputFormat = None,
+ serde = None,
+ compressed = false,
+ properties = null),
+ schema = new StructType(),
+ provider = None,
+ partitionColumnNames = Seq.empty,
+ bucketSpec = None,
+ properties = scalaProps)
+ }
+}