diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaConfig.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaConfig.scala index 00f73aa73e7..ff7c1e1dbaf 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaConfig.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaConfig.scala @@ -193,7 +193,7 @@ trait DeltaConfigsBase extends DeltaLogging { kv case lKey if lKey.startsWith(TableFeatureProtocolUtils.FEATURE_PROP_PREFIX) => // This is a table feature, we should allow it. - lKey -> value + key -> value case lKey if lKey.startsWith("delta.") => Option(entries.get(lKey.stripPrefix("delta."))) match { case Some(deltaConfig) if ( diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/actions/TableFeatureSupport.scala b/spark/src/main/scala/org/apache/spark/sql/delta/actions/TableFeatureSupport.scala index 0fb07822798..f934071e012 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/actions/TableFeatureSupport.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/actions/TableFeatureSupport.scala @@ -457,6 +457,9 @@ object TableFeatureProtocolUtils { /** Min reader version that supports writer features. */ val TABLE_FEATURES_MIN_WRITER_VERSION = 7 + /** The table ID property key needed by feature catalogManaged. */ + val UC_TABLE_ID_KEY = "io.unitycatalog.tableId" + /** Get the table property config key for the `feature`. */ def propertyKey(feature: TableFeature): String = propertyKey(feature.name) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/CreateDeltaTableLike.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/CreateDeltaTableLike.scala index 753566bf114..b9701d6937a 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/CreateDeltaTableLike.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/CreateDeltaTableLike.scala @@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.SQLConfHelper import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.connector.catalog.Identifier +import org.apache.spark.sql.delta.actions.TableFeatureProtocolUtils import org.apache.spark.sql.types.StructType /** @@ -130,6 +131,12 @@ trait CreateDeltaTableLike extends SQLConfHelper { } else { table.storage.copy(properties = Map.empty) } + // These table protocol properties, along with the uc table id, are needed when the create table + // request is sent to server. + val tableProtocolProperties = table.properties.view.filterKeys { k => + TableFeatureProtocolUtils.isTableProtocolProperty(k) || + k == TableFeatureProtocolUtils.UC_TABLE_ID_KEY + } // If we have to update the catalog, use the correct schema and table properties, otherwise // empty out the schema and property information @@ -149,13 +156,13 @@ trait CreateDeltaTableLike extends SQLConfHelper { // we store the partition columns as regular data columns. partitionColumnNames = Nil, properties = UpdateCatalog.updatedProperties(snapshot) - ++ additionalProperties, + ++ additionalProperties ++ tableProtocolProperties, storage = storageProps, tracksPartitionsInCatalog = true) } else { table.copy( schema = new StructType(), - properties = Map.empty, + properties = tableProtocolProperties.toMap, partitionColumnNames = Nil, // Remove write specific options when updating the catalog storage = storageProps,