Skip to content

Commit

Permalink
[HUDI-5482] Nulls should be counted in the value count stats for mor …
Browse files Browse the repository at this point in the history
…table (#7482)

* The behavior is kept in line with COW parquet file stats.
  • Loading branch information
boneanxs authored and nsivabalan committed Apr 12, 2023
1 parent 1e56e46 commit 826385f
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,8 @@ class ColumnStats {
final Object fieldVal = convertValueForSpecificDataTypes(field.schema(), genericRecord.get(field.name()), false);
final Schema fieldSchema = getNestedFieldSchemaFromWriteSchema(genericRecord.getSchema(), field.name());

colStats.valueCount++;

if (fieldVal != null && canCompare(fieldSchema)) {
// Set the min value of the field
if (colStats.minValue == null
Expand All @@ -170,8 +172,6 @@ class ColumnStats {
if (colStats.maxValue == null || ConvertingGenericData.INSTANCE.compare(fieldVal, colStats.maxValue, fieldSchema) > 0) {
colStats.maxValue = fieldVal;
}

colStats.valueCount++;
} else {
colStats.nullCount++;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import org.junit.jupiter.api.Assertions.{assertEquals, assertNotNull, assertTrue
import org.junit.jupiter.api._
import org.junit.jupiter.api.condition.DisabledIf
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.{Arguments, MethodSource, ValueSource}
import org.junit.jupiter.params.provider.{Arguments, EnumSource, MethodSource, ValueSource}

import java.math.BigInteger
import java.sql.{Date, Timestamp}
Expand Down Expand Up @@ -129,6 +129,54 @@ class TestColumnStatsIndex extends HoodieClientTestBase {
saveMode = SaveMode.Append)
}

@ParameterizedTest
@EnumSource(classOf[HoodieTableType])
def testMetadataColumnStatsIndexValueCount(tableType: HoodieTableType): Unit = {
val metadataOpts = Map(
HoodieMetadataConfig.ENABLE.key -> "true",
HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true"
)

val commonOpts = Map(
"hoodie.insert.shuffle.parallelism" -> "4",
"hoodie.upsert.shuffle.parallelism" -> "4",
HoodieWriteConfig.TBL_NAME.key -> "hoodie_test",
DataSourceWriteOptions.TABLE_TYPE.key -> tableType.name(),
RECORDKEY_FIELD.key -> "c1",
PRECOMBINE_FIELD.key -> "c1",
HoodieTableConfig.POPULATE_META_FIELDS.key -> "true"
) ++ metadataOpts

val schema = StructType(StructField("c1", IntegerType, false) :: StructField("c2", StringType, true) :: Nil)
val inputDF = spark.createDataFrame(
spark.sparkContext.parallelize(Seq(Row(1, "v1"), Row(2, "v2"), Row(3, null), Row(4, "v4"))),
schema)

inputDF
.sort("c1", "c2")
.write
.format("hudi")
.options(commonOpts)
.option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
.mode(SaveMode.Overwrite)
.save(basePath)

metaClient = HoodieTableMetaClient.reload(metaClient)

val metadataConfig = HoodieMetadataConfig.newBuilder()
.fromProperties(toProperties(metadataOpts))
.build()

val columnStatsIndex = new ColumnStatsIndexSupport(spark, schema, metadataConfig, metaClient)
columnStatsIndex.loadTransposed(Seq("c2"), false) { transposedDF =>
val result = transposedDF.select("valueCount", "c2_nullCount")
.collect().head

assertTrue(result.getLong(0) == 4)
assertTrue(result.getLong(1) == 1)
}
}

@ParameterizedTest
@ValueSource(booleans = Array(true, false))
def testMetadataColumnStatsIndexPartialProjection(shouldReadInMemory: Boolean): Unit = {
Expand Down

0 comments on commit 826385f

Please sign in to comment.