Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement GroupsAccumulator for count(DISTINCT) aggr #15324

Draft
wants to merge 8 commits into
base: main
Choose a base branch
from

Conversation

waynexia
Copy link
Member

Signed-off-by: Ruihang Xia [email protected]

Which issue does this PR close?

Related to #5472

Rationale for this change

Implement group accumulator for distinct count aggr fn. In hits.parquet dataset from clickbench, it can gain ~5x performance improve for query like select "RegionID", COUNT("UserID"), COUNT(DISTINCT "UserID") as u FROM hits GROUP BY "RegionID" ORDER BY u DESC LIMIT 10;:

After:

> select "RegionID", COUNT("UserID"), COUNT(DISTINCT "UserID") as u FROM hits GROUP BY "RegionID" ORDER BY u DESC LIMIT 10;
+----------+--------------------+---------+
| RegionID | count(hits.UserID) | u       |
+----------+--------------------+---------+
| 229      | 18295832           | 2845673 |
| 2        | 6687587            | 1081016 |
| 208      | 4261812            | 831676  |
| 169      | 3320229            | 604583  |
| 184      | 1755192            | 322661  |
| 158      | 1318059            | 307152  |
| 34       | 1792369            | 299479  |
| 55       | 1426901            | 286525  |
| 107      | 1516690            | 272448  |
| 42       | 1542717            | 243181  |
+----------+--------------------+---------+
10 row(s) fetched. 
Elapsed 1.941 seconds.

Before:

Elapsed 11.828 seconds.

For queries with only one distinct count (like q5 from clickbench), optimize rule single_distinct_to_groupby will rewrite the distinct column to group by column, which avoids the need for this group accumulator. For scenarios exceeding that rule, this group accumulator can improve a lot.

What changes are included in this PR?

implement GroupsAccumulator for distinct count.

Are these changes tested?

yes

Are there any user-facing changes?

no

@github-actions github-actions bot added sqllogictest SQL Logic Tests (.slt) functions Changes to functions implementation labels Mar 19, 2025
Signed-off-by: Ruihang Xia <[email protected]>
@waynexia waynexia requested a review from korowa March 19, 2025 22:26
Signed-off-by: Ruihang Xia <[email protected]>
@korowa
Copy link
Contributor

korowa commented Mar 20, 2025

Thank you @waynexia, I'm planning to check it out at most tomorrow.

I have a question in advance before reviewing -- have you been considering to implement groups accumulator for specialized cases of DistinctCountAccumulator (primitive/native types and bytes)?

I'm asking because, as for me, it looks a bit odd (though I haven't rechecked performance results, and perhaps GroupsAccumulatorAdapter introduces some insane overhead), that switching from native Rust types to ScalarValue still gives x5 faster execution, while groups accumulator in this case, if I'm not mistaken, does basically the same as the GroupsAccumulatorAdapter -- storing separate states (hashsets) in the vector is already implemented in the adapter.

@2010YOUY01
Copy link
Contributor

Thank you @waynexia, I'm planning to check it out at most tomorrow.

I have a question in advance before reviewing -- have you been considering to implement groups accumulator for specialized cases of DistinctCountAccumulator (primitive/native types and bytes)?

I'm asking because, as for me, it looks a bit odd (though I haven't rechecked performance results, and perhaps GroupsAccumulatorAdapter introduces some insane overhead), that switching from native Rust types to ScalarValue still gives x5 faster execution, while groups accumulator in this case, if I'm not mistaken, does basically the same as the GroupsAccumulatorAdapter -- storing separate states (hashsets) in the vector is already implemented in the adapter.

Here are my thoughts on why 5X given implementations are very similar:
GroupsAccumulator for median() in #13681 also achieved 5X speed-up, we found 2X is due to old Accumulator implementation has a inefficient vec<ScalarValue> <---> ListArray conversion when state() and merge_batch() is called, and the remaining 2X difference I believe is due to inefficiency in adaptor, or extra memory allocation for many outer state struct. Perhaps Accumulator for DistinctCount is also not implementing this list conversion efficiently 🤔

#[derive(Debug)]
pub struct DistinctCountGroupsAccumulator {
/// One HashSet per group to track distinct values
distinct_sets: Vec<HashSet<ScalarValue, RandomState>>,
Copy link
Contributor

@Dandandan Dandandan Mar 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if a single HashSet<(u64, ScalarValue), RandomState>> (i.e. also index by group id rather than create a new HashSet per group) might be faster? It will use less memory and intuitively should be more cache friendly.

@@ -752,10 +761,245 @@ impl Accumulator for DistinctCountAccumulator {
}
}

/// GroupsAccumulator for COUNT DISTINCT operations
#[derive(Debug)]
pub struct DistinctCountGroupsAccumulator {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a follow-up, this could be specialized for types as well (e.g. PrimitveDistinctCountGroupsAccumulator)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also using the HashTable API would probably give some further gains
https://docs.rs/hashbrown/latest/hashbrown/struct.HashTable.html

Copy link
Contributor

@Dandandan Dandandan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks really nice 🚀 gave some hints for further performance improvements

@Dandandan
Copy link
Contributor

It would be worthwhile to run the clickbench_extended benchmarks as well (./bench.sh run clickbench_extended)

@alamb
Copy link
Contributor

alamb commented Mar 21, 2025

It would be worthwhile to run the clickbench_extended benchmarks as well (./bench.sh run clickbench_extended)

I am running the benchmarks now and will report

BTW if there is no existing coverage we can add the one from this description into clickbench_extended perhaps

@alamb alamb added the performance Make DataFusion faster label Mar 21, 2025
@alamb
Copy link
Contributor

alamb commented Mar 21, 2025

🤔 my measurements show Q3 getting quite a bit slower. I will rerun to test

Comparing main_base and count-distinct-group
--------------------
Benchmark clickbench_extended.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━┓
┃ Query        ┃  main_base ┃ count-distinct-group ┃       Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━┩
│ QQuery 0     │  1978.13ms │            1940.74ms │    no change │
│ QQuery 1     │   736.56ms │             753.84ms │    no change │
│ QQuery 2     │  1432.89ms │            4533.83ms │ 3.16x slower │
│ QQuery 3     │   706.18ms │             716.03ms │    no change │
│ QQuery 4     │  1492.33ms │            1498.56ms │    no change │
│ QQuery 5     │ 17251.84ms │           17220.01ms │    no change │
└──────────────┴────────────┴──────────────────────┴──────────────┘

@waynexia
Copy link
Member Author

Ahh, I reproduced the same result. And I also observed a regression on q0:

Query Before (ms) After (ms)
Q0 1407.04 2013.42
Q2 681.78 1742.08

(BTW, how do you get the comparison output

@alamb
Copy link
Contributor

alamb commented Mar 21, 2025

(BTW, how do you get the comparison output

I am using bench.sh with bench.sh compare

@waynexia
Copy link
Member Author

I'll look into the regression this weekend. I suspect the reason is the improvement from grouping is way less than specialized non-group accumulator 🤔

Copy link
Contributor

@Dandandan Dandandan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets try to get no regressions in the extended benchmark first

@waynexia
Copy link
Member Author

Quick update: made some progress, but still need a few days to refine it. I've implemented a primitive aggregator and it does work

Signed-off-by: Ruihang Xia <[email protected]>
Signed-off-by: Ruihang Xia <[email protected]>
Signed-off-by: Ruihang Xia <[email protected]>
Signed-off-by: Ruihang Xia <[email protected]>
@waynexia
Copy link
Member Author

waynexia commented Mar 25, 2025

I (am very excited!) just realized we may have overcomplicated things: we specialize in array types to compute hashes and store the value, but we neither need a dedicated hash function (wrapped as xxx set in previous implementation) nor need to store the origin value. We only need to do two things for count(distinct) accumulator -- compute and maintain a hashset.

Thus I tried another way to rewrite this aggregator, use a uniform accumulator for all types. Do one dispatch for each update to dispatch the actual hash implementation (and this can be eliminated by extracting a type parameter for accumulator). Throw the origin value and only store the hashes in state. This can not only save memory, but also gain a good performance:

Query Before (ms) After (ms) Notes
Q0 1046.3 430.4 no group-by
Q1 243.7 200.5 no group-by
Q2 441.8 327.9 110 groups
Count + Count Distinct 11.828 seconds 0.515 seconds 9000 groups

p.s. I changed a machine to run them
p.p.s I didn't use bench.sh compare because it seems not to support selecting test case from help text

Some follow-up things:

  • Make a type parameter for the new general accumulator's implementation, if needed (consider our compile time is quite slow... one dispatch per array seems acceptable)
  • Use RawTable to further optimize the states, and reduce another hash over u64 hash values
  • Maybe remove PrimitiveDistinctCountAccumulator and similar implementations? They are not used by us after this patch, but they are part of our public API

})
Ok(Box::new(DistinctCountAccumulator {
values: HashSet::default(),
random_state: RandomState::with_seeds(1, 2, 3, 4),
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we only store hashes now, we need a fixed random state for reproducible hash across different accumulators. But I don't know how to choose a group of proper seeds...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm... I think we can't store only the hashes as it won't account for hash collisions (two values mapping to the same value). There is not a seed that prevents that (as the number of possible values in e.g. a u64 is much smaller than e.g. a string with 20 bytes)

@alamb alamb marked this pull request as draft March 26, 2025 21:23
@alamb
Copy link
Contributor

alamb commented Mar 26, 2025

i think this is still a work in progress, so marking it as a draft to clean up the review queue

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
functions Changes to functions implementation performance Make DataFusion faster sqllogictest SQL Logic Tests (.slt)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants