-
Notifications
You must be signed in to change notification settings - Fork 1.4k
Improve performance of first_value
by implementing special GroupsAccumulator
#15266
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
9b0908b
to
0e7bf1d
Compare
first_value
by implementing special GroupsAccumulator
first_value
by implementing special GroupsAccumulator
@@ -179,6 +292,423 @@ impl AggregateUDFImpl for FirstValue { | |||
} | |||
} | |||
|
|||
struct FirstGroupsAccumulator<T> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
struct FirstGroupsAccumulator<T> | |
struct FirstPrimitiveGroupsAccumulator<T> |
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
|
||
let mut ordering_buf = Vec::with_capacity(self.ordering_req.len()); | ||
|
||
for (group_idx, idx) in self |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(Just took a quick look, please correct me if I'm wrong)
Inside this function, it seems to
- 'compress' the current input batch with
get_filtered_min_of_each_group()
(if there are multiple entries for the same group, only keep the smallest one according to the specified order) - Update the global state for the minimal value corresponding to all seen groups
Why is it split into two steps instead of directly updating the global state?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Inside this function, it seems to
'compress' the current input batch with get_filtered_min_of_each_group() (if there are multiple entries for the same group, only keep the smallest one according to the specified order)
Update the global state for the minimal value corresponding to all seen groups
Yes. You are right.
Why is it split into two steps instead of directly updating the global state?
According to this
Returns the first element in an aggregation group according to the requested ordering
The reason for splitting it into two steps is that it performs better when cardinality is low.
benchmark sql: select l_shipmode, first_value(l_partkey order by l_orderkey, l_linenumber, l_comment, l_suppkey, l_tax) from 'benchmarks/data/tpch_sf10/lineitem' group by l_shipmode;
version | time |
---|---|
main | 0.979s |
thisPR | 0.86s |
without get_filtered_min_of_each_group |
1.25s |
extract_row_at_idx_to_buf
has a relatively high overhead. First call get_filtered_min_of_each_group
to avoid the problem where extract_row_at_idx_to_buf
would be called multiple times when the same group_idx
exists
At first, I implemented it like this, but the performance actually got worse.
(At that time, I added the benchmark in datafusion/core/benches/aggregate_query_sql.rs
, and the performance degraded from 3.9ms to 7ms.) 😂
// Once we see the first value, we set the `is_sets[group_idx]` flag | ||
is_sets: BooleanBufferBuilder, | ||
// null_builder[group_idx] == false => vals[group_idx] is null | ||
null_builder: BooleanBufferBuilder, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we use NullState
for that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No. NullState
does not pass NULL values to value_fn
(see this). However, we cannot filter out values[0] == null
in update_batch
when adding respect null
if self.is_sets.len() < new_size { | ||
self.is_sets.append_n(new_size - self.is_sets.len(), false); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think you can use .resize
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. Using .resize
is better approach. 56b11c4
f35d2da
to
56b11c4
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for the nice work, the implementation is very readable.
An alternative I think is to store Row
format for the order keys instead of vec<ScalarValue>
to accelerate comparison, however, I don't think using Row
s will be faster: here the number of comparison for each value is less than sorting, so the row conversion overhead might dominate.
There is only one thing I want to make sure: Is the following null group case covered by existing tests? If not we should include them
select first(a order by b) group by c, d;
a b c d
_ _ null null
_ _ 1 null
_ _ null 1
{ | ||
fn update_batch( | ||
&mut self, | ||
values_with_orderings: &[ArrayRef], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: At first, I thought input is ordered because the name of this argument, perhaps we can use values_and_order_cols
?
Also, we can add a comment with example like e.g. first_value(a order by b): values_and_order_cols will be [a, b]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. Also add test. 297de26
continue; | ||
} | ||
|
||
if !result.contains_key(&group_idx) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could be optimized to 1 lookup (e.g. using HashMap::entry
). It looks in your profile this is a hot function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(Now it's 3 lookups worst case)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
||
if !result.contains_key(&group_idx) | ||
|| comparator | ||
.compare(*result.get(&group_idx).unwrap(), idx_in_val) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this really happen? idx_in_val
is increasing monotonically using enumerate
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you clarify what specific scenario you're referring to with "Does this really happen?" Are you concerned about:
- a.
idx_in_val
decreasing (being smaller than a previous value) within the loop usingenumerate()
? - b.
result[group_idx]
increasing monotonically ?
For a, as I understand it, idx_in_val would only potentially decrease if group_indices.len() > usize::MAX.
For b, Yes. in the fuzz test , result[group_idx]
will increase.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I meant idx_in_val
is a strictly increasing number from enumerate
. so it seams a previous idx is never greater than a new value, so the case should be never hit (so I would expect the code not to be there) / always be false.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In some cases, comparator.compare(*result.get(&group_idx).unwrap(), idx_in_val)
may return true
It compare "values" at the wrapped columns with given indices, array may not sorted by order by fields.
We can verify that by adding assert!(false);
in L564. The fuzz test fail adding it
vals: &PrimitiveArray<T>, | ||
is_set_arr: Option<&BooleanArray>, | ||
) -> Result<HashMap<usize, usize>> { | ||
let mut result = HashMap::with_capacity(orderings.len()); // group_idx -> idx_in_orderings |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am wondering if we can remove the use of a hashmap here... It shouldn't be needed to do perform it like this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you concerned about performance overhead from using HashMap
? We could also make this function return (group_idx_to_idx_in_orderings: Vec<usize>, mask: BooleanBufferBuilder)
and check if there's a performance improvement by running benchmark sqls. Returning BooleanBufferBuilder
is because this function contains filtering logic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But if total_num_groups
is large, group_idx_to_idx_in_orderings: Vec<usize>
may consume lots of memory.......
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes I think a large portion of current overhead comes from the use of HashMap
.
Thinking about it more, I thiknk probably the most efficient would be directly changing the accumulator state than via hashmap creation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After adding min_of_each_group_buf: (Vec<usize>, BooleanBufferBuilder),
to FirstPrimitiveGroupsAccumulator
, it run slightly faster.
benchmark sql | d63 | 44a |
---|---|---|
select id2, id4, first_value(v1 order by id2, id4) as r2 from '~/h2o_100m.parquet' group by id2, id4; |
7s | 6.83s |
select l_shipmode, first_value(l_partkey order by l_orderkey, l_linenumber, l_comment, l_suppkey, l_tax) from 'benchmarks/data/tpch_sf10/lineitem' group by l_shipmode; |
0.86s | 0.79s |
I haven't been following the recent conversations regarding hashmap optimization, but I also feel, if it needs pre-aggregate to make the low-cardinality case run faster, there might be some inefficiency inside the global state update implementation. |
…ccumulator` (apache#15266) * Improve speed of first_value by implementing special GroupsAccumulator * rename and other improvements * `append_n` -> `resize` * address comment * use HashMap::entry * remove hashMap in get_filtered_min_of_each_group
Which issue does this PR close?
part of #13998
Rationale for this change
select id2, id4, first_value(v1 order by id2, id4) as r2 from '~/h2o_100m.parquet' group by id2, id4;
select l_shipmode, first_value(l_partkey order by l_orderkey, l_linenumber, l_comment, l_suppkey, l_tax) from 'benchmarks/data/tpch_sf10/lineitem' group by l_shipmode;
What changes are included in this PR?
FirstGroupsAccumulator
Are these changes tested?
Yes. Add new unit test and fuzz test.
Are there any user-facing changes?