diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/catalog/AbstractDeltaCatalog.scala b/spark/src/main/scala/org/apache/spark/sql/delta/catalog/AbstractDeltaCatalog.scala index 4f0611bcdb1..df39f2fff51 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/catalog/AbstractDeltaCatalog.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/catalog/AbstractDeltaCatalog.scala @@ -25,6 +25,8 @@ import scala.collection.JavaConverters._ import scala.collection.immutable.ListMap import scala.collection.mutable +import io.delta.storage.commit.uccommitcoordinator.UCCommitCoordinatorClient.UC_TABLE_ID_KEY +import io.delta.storage.commit.uccommitcoordinator.UCCommitCoordinatorClient.UC_TABLE_ID_KEY_OLD import org.apache.spark.sql.delta.skipping.clustering.ClusteredTableUtils import org.apache.spark.sql.delta.skipping.clustering.temp.{ClusterBy, ClusterBySpec} import org.apache.spark.sql.delta.skipping.clustering.temp.{ClusterByTransform => TempClusterByTransform} @@ -42,7 +44,6 @@ import org.apache.spark.sql.delta.stats.StatisticsCollection import org.apache.spark.sql.delta.tablefeatures.DropFeature import org.apache.spark.sql.delta.util.{Utils => DeltaUtils} import org.apache.spark.sql.delta.util.PartitionUtils -import org.apache.spark.sql.util.ScalaExtensions._ import org.apache.hadoop.fs.Path import org.apache.spark.SparkException @@ -378,11 +379,12 @@ class AbstractDeltaCatalog extends DelegatingCatalogExtension if (DeltaSourceUtils.isDeltaDataSourceName(getProvider(properties))) { // TODO: we should extract write options from table properties for all the cases. We // can remove the UC check when we have confidence. - val respectOptions = isUnityCatalog || properties.containsKey("test.simulateUC") - val (props, writeOptions) = if (respectOptions) { + val isUC = isUnityCatalog || properties.containsKey("test.simulateUC") + val (props, writeOptions) = if (isUC) { val (props, writeOptions) = getTablePropsAndWriteOptions(properties) expandTableProps(props, writeOptions, spark.sessionState.conf) props.remove("test.simulateUC") + translateUCTableIdProperty(props) (props, writeOptions) } else { (properties, Map.empty[String, String]) @@ -602,6 +604,18 @@ class AbstractDeltaCatalog extends DelegatingCatalogExtension } } + /** + * The UC table ID property was renamed from an old name. In a transition period we need to + * translate the old UC table ID property name set by caller to new one. And in case both the new + * and old properties are set, remove the old one. Later in UC server it might throw error if it + * sees both. + * TODO: clean up once callers are migrated. + */ + private def translateUCTableIdProperty(props: util.Map[String, String]): Unit = { + val oldTableIdProperty = Option(props.remove(UC_TABLE_ID_KEY_OLD)) + oldTableIdProperty.foreach(props.putIfAbsent(UC_TABLE_ID_KEY, _)) + } + /** * A staged delta table, which creates a HiveMetaStore entry and appends data if this was a * CTAS/RTAS command. We have a ugly way of using this API right now, but it's the best way to diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaDDLSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaDDLSuite.scala index a4fd227dabf..9a4a1080a02 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaDDLSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaDDLSuite.scala @@ -19,6 +19,8 @@ package org.apache.spark.sql.delta // scalastyle:off import.ordering.noEmptyLine import scala.collection.JavaConverters._ +import io.delta.storage.commit.uccommitcoordinator.UCCommitCoordinatorClient.UC_TABLE_ID_KEY +import io.delta.storage.commit.uccommitcoordinator.UCCommitCoordinatorClient.UC_TABLE_ID_KEY_OLD import org.apache.spark.sql.delta.schema.InvariantViolationException import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.spark.sql.delta.test.DeltaSQLCommandTest @@ -133,6 +135,47 @@ class DeltaDDLSuite extends DeltaDDLTestBase with SharedSparkSession } } } + + test("CREATE TABLE should translate old property `ucTableId` to `io.unitycatalog.tableId`") { + for (withBothNewAndOldProperty <- Seq(false, true)) { + withTempDir { dir => + withTable("t") { + val path = dir.getCanonicalPath + + if (withBothNewAndOldProperty) { + // Create table with old and new property key using test.simulateUC to simulate Unity + // Catalog + sql(s""" + |CREATE TABLE t (id INT) USING delta LOCATION '$path' + |TBLPROPERTIES ( + | test.simulateUC=true, + | '$UC_TABLE_ID_KEY_OLD' = 'some-other-id', + | '$UC_TABLE_ID_KEY' = 'correct-table-id' + |) + |""".stripMargin) + } else { + // Create table with old property key using test.simulateUC to simulate Unity Catalog + sql(s""" + |CREATE TABLE t (id INT) USING delta LOCATION '$path' + |TBLPROPERTIES ( + | test.simulateUC=true, + | '$UC_TABLE_ID_KEY_OLD' = 'correct-table-id' + |) + |""".stripMargin) + } + + val deltaLog = DeltaLog.forTable(spark, TableIdentifier("t")) + val properties = deltaLog.snapshot.getProperties + + // Verify the new table id is present with the value from the old key + assert(properties.contains(UC_TABLE_ID_KEY), + s"New table id key '$UC_TABLE_ID_KEY' should be present in table properties") + assert(properties(UC_TABLE_ID_KEY) == "correct-table-id", + s"New table id key '$UC_TABLE_ID_KEY' should have value 'correct-table-id'") + } + } + } + } } diff --git a/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCCommitCoordinatorClient.java b/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCCommitCoordinatorClient.java index dde56241cec..5259753c605 100644 --- a/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCCommitCoordinatorClient.java +++ b/storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCCommitCoordinatorClient.java @@ -76,6 +76,8 @@ public UCCommitCoordinatorClient(Map conf, UCClient ucClient) { * Key for identifying Unity Catalog table ID in `delta.coordinatedCommits.tableConf{-preview}`. */ final static public String UC_TABLE_ID_KEY = "io.unitycatalog.tableId"; + // Previously this key was ucTableId. It was later renamed. + final static public String UC_TABLE_ID_KEY_OLD = "ucTableId"; /** * Key for identifying Unity Catalog metastore ID in