Skip to content

Commit

Permalink
replace SetNull aggregation with "SetNullOnConflict" aggregation to e…
Browse files Browse the repository at this point in the history
…ase handling categorical values within aggregations
  • Loading branch information
nmandery committed Jul 11, 2023
1 parent 45b5a3c commit dba6985
Show file tree
Hide file tree
Showing 5 changed files with 18 additions and 8 deletions.
2 changes: 1 addition & 1 deletion CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## Unreleased

* Nullable columns and "SetNull" aggregation method on columns
* Nullable columns and "SetNullOnConflict" aggregation method on columns to ease handling categorical values within aggregations
* Workaround bug of the sum aggregation by disabling compaction when sum is used
* Dependency updates

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,14 @@ where
ALIAS_SOURCE_TABLE, col_name, col_name
)
}
AggregationMethod::SetNull => format!("NULL as {}", col_name),
AggregationMethod::SetNullOnConflict => {
format!(
"if(length(groupUniqArray({}.{}))=1,first_value({}.{}), null) as {}",
ALIAS_SOURCE_TABLE, col_name,
ALIAS_SOURCE_TABLE, col_name,
col_name
)
}
}
} else {
format!("{}.{}", ALIAS_SOURCE_TABLE, col_name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ pub enum AggregationMethod {
Max,
Min,
Average,
SetNull,
/// set to Null in case different values are to be aggregated. Useful for categorical values
SetNullOnConflict,
// TODO: aggregation method to generate parent resolution for other h3index column
}

Expand All @@ -25,7 +26,7 @@ impl AggregationMethod {
Self::Max => datatype.is_number(),
Self::Min => datatype.is_number(),
Self::Average => datatype.is_number(),
Self::SetNull => nullable,
Self::SetNullOnConflict => nullable,
}
}

Expand All @@ -49,7 +50,7 @@ impl Named for AggregationMethod {
Self::Min => "min",
Self::Sum => "sum",
Self::Average => "average",
Self::SetNull => "setnull",
Self::SetNullOnConflict => "setnullonconflict",
}
}
}
2 changes: 1 addition & 1 deletion crates/ukis_h3cellstorepy/src/clickhouse/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ impl PyCompactedTableSchemaBuilder {
"max" => AggregationMethod::Max,
"avg" | "average" => AggregationMethod::Average,
"relativetoarea" | "relativetocellarea" => AggregationMethod::RelativeToCellArea,
"setnull" => AggregationMethod::SetNull,
"setnullonconflict" => AggregationMethod::SetNullOnConflict,
_ => {
return Err(PyValueError::new_err(format!(
"Unsupported aggregation method: {}",
Expand Down
6 changes: 4 additions & 2 deletions crates/ukis_h3cellstorepy/tests/clickhouse/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ def elephant_schema(tableset_name="okavango_delta", temporal_partitioning="month
csb.h3_partitioning(h3_partitioning, **kw)
csb.add_column("is_valid", "UInt8", compression_method=CompressionMethod("gorilla"))
csb.add_aggregated_column("elephant_density", "Float32", "RelativeToCellArea")
csb.add_aggregated_column("some_category", "UInt8", "SetNull", nullable=True)
csb.add_aggregated_column("some_category", "UInt8", "SetNullOnConflict", nullable=True)
schema = csb.build() # raises when the schema is invalid / missing something
assert schema is not None
#print(schema.to_json_string())
Expand Down Expand Up @@ -42,11 +42,13 @@ def setup_elephant_schema_with_data(clickhouse_grpc_endpoint, clickhouse_testdb_
center_point = (20.0, 10.0)
# uncompacted disk
disk = h3.k_ring(h3.geo_to_h3(center_point[1], center_point[0], schema.max_h3_resolution), 10).astype(np.uint64)
cat1 = np.ones(int(len(disk) / 2)) * 23
cat2 = np.ones(len(disk)- len(cat1)) * 12
df = pl.DataFrame({
"h3index": disk,
"is_valid": np.ones(len(disk)),
"elephant_density": np.ones(len(disk)) * 4,
"some_category": np.ones(len(disk)) * 23
"some_category": np.concatenate((cat1, cat2))
})

# write to db - this performs auto-compaction
Expand Down

0 comments on commit dba6985

Please sign in to comment.