Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -883,6 +883,9 @@ case class AlterTableChangeColumnDeltaCommand(

val transformedSchema = columnChanges.foldLeft(oldSchema)(transformSchemaOnce)

// Validate clustering columns remain in stats schema after column reordering
validateClusteringColumnsAfterReordering(sparkSession, txn, columnChanges)

val newSchemaWithTypeWideningMetadata =
TypeWideningMetadata.addTypeWideningMetadata(
txn,
Expand Down Expand Up @@ -993,6 +996,40 @@ case class AlterTableChangeColumnDeltaCommand(
newFieldList.toSeq
}

/**
* Validates that clustering columns remain in the stats schema after column reordering.
*
* This validation ensures that when a user executes `ALTER TABLE ALTER COLUMN col1 AFTER col2`,
* all clustering columns that were in the stats schema before the reordering remain in the
* stats schema after the operation. When DELTA_LIQUID_ALTER_COLUMN_AFTER_STATS_SCHEMA_CHECK
* is enabled, the validation runs and throws an error if any clustering column would lose
* stats collection due to position-based indexing. When disabled (default), no validation
* is performed and stats collection may follow position-based indexing rules.
*
* @param spark The SparkSession
* @param txn The transaction
* @param columnChanges The column changes being applied
*/
private def validateClusteringColumnsAfterReordering(
spark: SparkSession,
txn: OptimisticTransaction,
columnChanges: Seq[DeltaChangeColumnSpec]): Unit = {
if (!spark.conf.get(
DeltaSQLConf.DELTA_LIQUID_ALTER_COLUMN_AFTER_STATS_SCHEMA_CHECK)) {
return
}
// Only validate if table supports clustering and check is enabled
if (ClusteredTableUtils.isSupported(txn.snapshot.protocol) &&
columnChanges.exists(_.colPosition.isDefined)) {
val clusteringColumns = ClusteringColumnInfo.extractLogicalNames(txn.snapshot)
if (clusteringColumns.nonEmpty) {
// Validate that prior stats schema is preserved (clustering columns remain in stats)
ClusteredTableUtils.validateClusteringColumnsInStatsSchema(
txn.snapshot, clusteringColumns)
}
}
}

/**
* Given two columns, verify whether replacing the original column with the new column is a valid
* operation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2005,6 +2005,24 @@ trait DeltaSQLConfBase extends DeltaSQLConfUtils {
.booleanConf
.createWithDefault(true)

val DELTA_LIQUID_ALTER_COLUMN_AFTER_STATS_SCHEMA_CHECK =
buildConf("liquid.alterColumnAfter.statsSchemaCheck")
.internal()
.doc(
"""
|When enabled, validates that clustering columns remain in the stats schema after
| a user executes `ALTER TABLE ALTER COLUMN col1 AFTER col2`. The validation checks
| that all clustering columns that were in the stats schema before the column reordering
| remain in the stats schema after the operation. This ensures that clustering columns
| continue to have statistics collected even if their position in the table schema
| changes. When disabled, no validation is performed and stats collection may follow
| position-based indexing rules (e.g., `dataSkippingNumIndexedCols`), potentially
| causing clustering columns to lose stats collection if they move outside the indexed
| range.
""".stripMargin)
.booleanConf
.createWithDefault(false)

val DELTA_CHANGE_COLUMN_CHECK_DEPENDENT_EXPRESSIONS_USE_V2 =
buildConf("changeColumn.checkDependentExpressionsUseV2")
.internal()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -825,6 +825,55 @@ trait ClusteredTableDDLSuiteBase
}
}

Seq("true", "false").foreach { checkEnabled =>
test(s"Alter column after statement with stats schema update - checkEnabled=$checkEnabled") {
withTable(testTable) {
withSQLConf(
DeltaSQLConf.DELTA_LIQUID_ALTER_COLUMN_AFTER_STATS_SCHEMA_CHECK.key -> checkEnabled) {
val tableSchema = "c1 int, c2 int, c3 int, c4 int"
val indexedColumns = 2

testStatsCollectionHelper(
tableSchema = tableSchema,
numberOfIndexedCols = indexedColumns) {

createTableWithStatsColumns(
"CREATE",
testTable,
Seq("c1", "c2"),
indexedColumns,
Some(tableSchema))

// Insert data to ensure stats are collected
sql(s"INSERT INTO $testTable VALUES(1, 2, 3, 4), (5, 6, 7, 8)")

// ALTER TABLE ALTER COLUMN should succeed when checkEnabled=false
sql(s"ALTER TABLE $testTable ALTER COLUMN c1 AFTER c3")

// Verify the column order changed
val (_, snapshot) = DeltaLog.forTableWithSnapshot(spark, TableIdentifier(testTable))
assert(snapshot.schema.fieldNames.toSeq === Seq("c2", "c3", "c1", "c4"))

// Try another ALTER - behavior depends on checkEnabled
if (checkEnabled == "true") {
// Should fail when validation is enabled
val e = intercept[DeltaAnalysisException] {
sql(s"ALTER TABLE $testTable ALTER COLUMN c2 AFTER c3")
}
assert(e.errorClass.contains("DELTA_CLUSTERING_COLUMN_MISSING_STATS"))
} else {
// Should succeed when validation is disabled
sql(s"ALTER TABLE $testTable ALTER COLUMN c2 AFTER c3")
val (_, snapshot2) = DeltaLog.forTableWithSnapshot(spark, TableIdentifier(testTable))
assert(snapshot2.schema.fieldNames.toSeq === Seq("c3", "c2", "c1", "c4"))
}
}
}
}
}
}


test("validate CLONE on clustered table") {
import testImplicits._
val srcTable = "SrcTbl"
Expand Down
Loading