Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import scala.collection.JavaConverters._
import scala.collection.immutable.ListMap
import scala.collection.mutable

import io.delta.storage.commit.uccommitcoordinator.UCCommitCoordinatorClient.UC_TABLE_ID_KEY
import io.delta.storage.commit.uccommitcoordinator.UCCommitCoordinatorClient.UC_TABLE_ID_KEY_OLD
import org.apache.spark.sql.delta.skipping.clustering.ClusteredTableUtils
import org.apache.spark.sql.delta.skipping.clustering.temp.{ClusterBy, ClusterBySpec}
import org.apache.spark.sql.delta.skipping.clustering.temp.{ClusterByTransform => TempClusterByTransform}
Expand All @@ -42,7 +44,6 @@ import org.apache.spark.sql.delta.stats.StatisticsCollection
import org.apache.spark.sql.delta.tablefeatures.DropFeature
import org.apache.spark.sql.delta.util.{Utils => DeltaUtils}
import org.apache.spark.sql.delta.util.PartitionUtils
import org.apache.spark.sql.util.ScalaExtensions._
import org.apache.hadoop.fs.Path

import org.apache.spark.SparkException
Expand Down Expand Up @@ -378,11 +379,12 @@ class AbstractDeltaCatalog extends DelegatingCatalogExtension
if (DeltaSourceUtils.isDeltaDataSourceName(getProvider(properties))) {
// TODO: we should extract write options from table properties for all the cases. We
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will invite @cloud-fan to help co-review this PR, since he left this TODO comments.

// can remove the UC check when we have confidence.
val respectOptions = isUnityCatalog || properties.containsKey("test.simulateUC")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the original intention for the test.simulateUC? Seems like it's for testing purposes, but do you know more about the context ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it's just for testing purpose in test test("CREATE TABLE with OPTIONS").
In the new test I added I also used it.

val (props, writeOptions) = if (respectOptions) {
val isUC = isUnityCatalog || properties.containsKey("test.simulateUC")
val (props, writeOptions) = if (isUC) {
val (props, writeOptions) = getTablePropsAndWriteOptions(properties)
expandTableProps(props, writeOptions, spark.sessionState.conf)
props.remove("test.simulateUC")
translateUCTableIdProperty(props)
(props, writeOptions)
} else {
(properties, Map.empty[String, String])
Expand Down Expand Up @@ -602,6 +604,18 @@ class AbstractDeltaCatalog extends DelegatingCatalogExtension
}
}

/**
* The UC table ID property was renamed from an old name. In a transition period we need to
* translate the old UC table ID property name set by caller to new one. And in case both the new
* and old properties are set, remove the old one. Later in UC server it might throw error if it
* sees both.
* TODO: clean up once callers are migrated.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to explain what are the version that we need to keep this for the compatibility issue ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Primarily for UC-Spark 0.3.1. But it doesn't exist yet so I hesitated to put the specific number in comment.

*/
private def translateUCTableIdProperty(props: util.Map[String, String]): Unit = {
val oldTableIdProperty = Option(props.remove(UC_TABLE_ID_KEY_OLD))
oldTableIdProperty.foreach(props.putIfAbsent(UC_TABLE_ID_KEY, _))
}

/**
* A staged delta table, which creates a HiveMetaStore entry and appends data if this was a
* CTAS/RTAS command. We have a ugly way of using this API right now, but it's the best way to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package org.apache.spark.sql.delta
// scalastyle:off import.ordering.noEmptyLine
import scala.collection.JavaConverters._

import io.delta.storage.commit.uccommitcoordinator.UCCommitCoordinatorClient.UC_TABLE_ID_KEY
import io.delta.storage.commit.uccommitcoordinator.UCCommitCoordinatorClient.UC_TABLE_ID_KEY_OLD
import org.apache.spark.sql.delta.schema.InvariantViolationException
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.test.DeltaSQLCommandTest
Expand Down Expand Up @@ -133,6 +135,47 @@ class DeltaDDLSuite extends DeltaDDLTestBase with SharedSparkSession
}
}
}

test("CREATE TABLE should translate old property `ucTableId` to `io.unitycatalog.tableId`") {
for (withBothNewAndOldProperty <- Seq(false, true)) {
withTempDir { dir =>
withTable("t") {
val path = dir.getCanonicalPath

if (withBothNewAndOldProperty) {
// Create table with old and new property key using test.simulateUC to simulate Unity
// Catalog
sql(s"""
|CREATE TABLE t (id INT) USING delta LOCATION '$path'
|TBLPROPERTIES (
| test.simulateUC=true,
| '$UC_TABLE_ID_KEY_OLD' = 'some-other-id',
| '$UC_TABLE_ID_KEY' = 'correct-table-id'
|)
|""".stripMargin)
} else {
// Create table with old property key using test.simulateUC to simulate Unity Catalog
sql(s"""
|CREATE TABLE t (id INT) USING delta LOCATION '$path'
|TBLPROPERTIES (
| test.simulateUC=true,
| '$UC_TABLE_ID_KEY_OLD' = 'correct-table-id'
|)
|""".stripMargin)
}

val deltaLog = DeltaLog.forTable(spark, TableIdentifier("t"))
val properties = deltaLog.snapshot.getProperties

// Verify the new table id is present with the value from the old key
assert(properties.contains(UC_TABLE_ID_KEY),
s"New table id key '$UC_TABLE_ID_KEY' should be present in table properties")
assert(properties(UC_TABLE_ID_KEY) == "correct-table-id",
s"New table id key '$UC_TABLE_ID_KEY' should have value 'correct-table-id'")
}
}
}
}
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ public UCCommitCoordinatorClient(Map<String, String> conf, UCClient ucClient) {
* Key for identifying Unity Catalog table ID in `delta.coordinatedCommits.tableConf{-preview}`.
*/
final static public String UC_TABLE_ID_KEY = "io.unitycatalog.tableId";
// Previously this key was ucTableId. It was later renamed.
final static public String UC_TABLE_ID_KEY_OLD = "ucTableId";

/**
* Key for identifying Unity Catalog metastore ID in
Expand Down
Loading