diff --git a/CHANGES.md b/CHANGES.md index 681c1c5..eb3e057 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -10,7 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## Unreleased -* Nullable columns and "SetNull" aggregation method on columns +* Nullable columns and "SetNullOnConflict" aggregation method on columns to ease handling categorical values within aggregations * Workaround bug of the sum aggregation by disabling compaction when sum is used * Dependency updates diff --git a/crates/ukis_h3cellstore/src/clickhouse/compacted_tables/insert.rs b/crates/ukis_h3cellstore/src/clickhouse/compacted_tables/insert.rs index 759283c..c6be902 100644 --- a/crates/ukis_h3cellstore/src/clickhouse/compacted_tables/insert.rs +++ b/crates/ukis_h3cellstore/src/clickhouse/compacted_tables/insert.rs @@ -426,7 +426,14 @@ where ALIAS_SOURCE_TABLE, col_name, col_name ) } - AggregationMethod::SetNull => format!("NULL as {}", col_name), + AggregationMethod::SetNullOnConflict => { + format!( + "if(length(groupUniqArray({}.{}))=1,first_value({}.{}), null) as {}", + ALIAS_SOURCE_TABLE, col_name, + ALIAS_SOURCE_TABLE, col_name, + col_name + ) + } } } else { format!("{}.{}", ALIAS_SOURCE_TABLE, col_name) diff --git a/crates/ukis_h3cellstore/src/clickhouse/compacted_tables/schema/agg.rs b/crates/ukis_h3cellstore/src/clickhouse/compacted_tables/schema/agg.rs index 37d4edb..d244c25 100644 --- a/crates/ukis_h3cellstore/src/clickhouse/compacted_tables/schema/agg.rs +++ b/crates/ukis_h3cellstore/src/clickhouse/compacted_tables/schema/agg.rs @@ -13,7 +13,8 @@ pub enum AggregationMethod { Max, Min, Average, - SetNull, + /// set to Null in case different values are to be aggregated. Useful for categorical values + SetNullOnConflict, // TODO: aggregation method to generate parent resolution for other h3index column } @@ -25,7 +26,7 @@ impl AggregationMethod { Self::Max => datatype.is_number(), Self::Min => datatype.is_number(), Self::Average => datatype.is_number(), - Self::SetNull => nullable, + Self::SetNullOnConflict => nullable, } } @@ -49,7 +50,7 @@ impl Named for AggregationMethod { Self::Min => "min", Self::Sum => "sum", Self::Average => "average", - Self::SetNull => "setnull", + Self::SetNullOnConflict => "setnullonconflict", } } } diff --git a/crates/ukis_h3cellstorepy/src/clickhouse/schema.rs b/crates/ukis_h3cellstorepy/src/clickhouse/schema.rs index 3aeefc5..3412b67 100644 --- a/crates/ukis_h3cellstorepy/src/clickhouse/schema.rs +++ b/crates/ukis_h3cellstorepy/src/clickhouse/schema.rs @@ -206,7 +206,7 @@ impl PyCompactedTableSchemaBuilder { "max" => AggregationMethod::Max, "avg" | "average" => AggregationMethod::Average, "relativetoarea" | "relativetocellarea" => AggregationMethod::RelativeToCellArea, - "setnull" => AggregationMethod::SetNull, + "setnullonconflict" => AggregationMethod::SetNullOnConflict, _ => { return Err(PyValueError::new_err(format!( "Unsupported aggregation method: {}", diff --git a/crates/ukis_h3cellstorepy/tests/clickhouse/__init__.py b/crates/ukis_h3cellstorepy/tests/clickhouse/__init__.py index dd92e3b..f2baeb5 100644 --- a/crates/ukis_h3cellstorepy/tests/clickhouse/__init__.py +++ b/crates/ukis_h3cellstorepy/tests/clickhouse/__init__.py @@ -13,7 +13,7 @@ def elephant_schema(tableset_name="okavango_delta", temporal_partitioning="month csb.h3_partitioning(h3_partitioning, **kw) csb.add_column("is_valid", "UInt8", compression_method=CompressionMethod("gorilla")) csb.add_aggregated_column("elephant_density", "Float32", "RelativeToCellArea") - csb.add_aggregated_column("some_category", "UInt8", "SetNull", nullable=True) + csb.add_aggregated_column("some_category", "UInt8", "SetNullOnConflict", nullable=True) schema = csb.build() # raises when the schema is invalid / missing something assert schema is not None #print(schema.to_json_string()) @@ -42,11 +42,13 @@ def setup_elephant_schema_with_data(clickhouse_grpc_endpoint, clickhouse_testdb_ center_point = (20.0, 10.0) # uncompacted disk disk = h3.k_ring(h3.geo_to_h3(center_point[1], center_point[0], schema.max_h3_resolution), 10).astype(np.uint64) + cat1 = np.ones(int(len(disk) / 2)) * 23 + cat2 = np.ones(len(disk)- len(cat1)) * 12 df = pl.DataFrame({ "h3index": disk, "is_valid": np.ones(len(disk)), "elephant_density": np.ones(len(disk)) * 4, - "some_category": np.ones(len(disk)) * 23 + "some_category": np.concatenate((cat1, cat2)) }) # write to db - this performs auto-compaction