Skip to content

Introduce selection vector repartitioning #15423

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 28 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -687,6 +687,10 @@ config_namespace! {
/// HashJoin can work more efficiently than SortMergeJoin but consumes more memory
pub prefer_hash_join: bool, default = true

/// When set to true, the physical plan optimizer will prefer HashSelectionVectorPartitioning for HashAggregate
/// over HashPartitioning. HashSelectionVectorPartitioning can work without data copying.
pub prefer_hash_selection_vector_partitioning_agg: bool, default = false

/// The maximum estimated size in bytes for one input side of a HashJoin
/// will be collected into a single partition
pub hash_join_single_partition_threshold: usize, default = 1024 * 1024
Expand Down
23 changes: 20 additions & 3 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ use datafusion_expr::{
};
use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr};
use datafusion_physical_expr::expressions::{Column, Literal};
use datafusion_physical_expr::LexOrdering;
use datafusion_physical_expr::{HashPartitionMode, LexOrdering};
use datafusion_physical_optimizer::PhysicalOptimizerRule;
use datafusion_physical_plan::execution_plan::InvariantLevel;
use datafusion_physical_plan::placeholder_row::PlaceholderRowExec;
Expand Down Expand Up @@ -741,8 +741,17 @@ impl DefaultPhysicalPlanner {
let updated_aggregates = initial_aggr.aggr_expr().to_vec();

let next_partition_mode = if can_repartition {
let mode = if session_state
.config_options()
.optimizer
.prefer_hash_selection_vector_partitioning_agg
{
HashPartitionMode::SelectionVector
} else {
HashPartitionMode::HashPartitioned
};
// construct a second aggregation with 'AggregateMode::FinalPartitioned'
AggregateMode::FinalPartitioned
AggregateMode::FinalPartitioned(mode)
} else {
// construct a second aggregation, keeping the final column name equal to the
// first aggregation and the expressions corresponding to the respective aggregate
Expand Down Expand Up @@ -804,7 +813,15 @@ impl DefaultPhysicalPlanner {
)
})
.collect::<Result<Vec<_>>>()?;
Partitioning::Hash(runtime_expr, *n)
if session_state
.config_options()
.optimizer
.prefer_hash_selection_vector_partitioning_agg
{
Partitioning::HashSelectionVector(runtime_expr, *n)
} else {
Partitioning::Hash(runtime_expr, *n)
}
}
LogicalPartitioning::DistributeBy(_) => {
return not_impl_err!(
Expand Down
46 changes: 23 additions & 23 deletions datafusion/core/tests/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2514,7 +2514,7 @@ async fn test_count_wildcard_on_sort() -> Result<()> {

assert_snapshot!(
pretty_format_batches(&sql_results).unwrap(),
@r###"
@r"
+---------------+------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+------------------------------------------------------------------------------------------------------------+
Expand All @@ -2527,37 +2527,37 @@ async fn test_count_wildcard_on_sort() -> Result<()> {
| | SortPreservingMergeExec: [count(Int64(1))@2 ASC NULLS LAST] |
| | SortExec: expr=[count(Int64(1))@2 ASC NULLS LAST], preserve_partitioning=[true] |
| | ProjectionExec: expr=[b@0 as b, count(Int64(1))@1 as count(*), count(Int64(1))@1 as count(Int64(1))] |
| | AggregateExec: mode=FinalPartitioned, gby=[b@0 as b], aggr=[count(Int64(1))] |
| | AggregateExec: mode=FinalPartitioned(HashPartitioned), gby=[b@0 as b], aggr=[count(Int64(1))] |
| | CoalesceBatchesExec: target_batch_size=8192 |
| | RepartitionExec: partitioning=Hash([b@0], 4), input_partitions=4 |
| | RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 |
| | AggregateExec: mode=Partial, gby=[b@0 as b], aggr=[count(Int64(1))] |
| | DataSourceExec: partitions=1, partition_sizes=[1] |
| | |
+---------------+------------------------------------------------------------------------------------------------------------+
"###
"
);

assert_snapshot!(
pretty_format_batches(&df_results).unwrap(),
@r###"
+---------------+--------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+--------------------------------------------------------------------------------+
| logical_plan | Sort: count(*) ASC NULLS LAST |
| | Aggregate: groupBy=[[t1.b]], aggr=[[count(Int64(1)) AS count(*)]] |
| | TableScan: t1 projection=[b] |
| physical_plan | SortPreservingMergeExec: [count(*)@1 ASC NULLS LAST] |
| | SortExec: expr=[count(*)@1 ASC NULLS LAST], preserve_partitioning=[true] |
| | AggregateExec: mode=FinalPartitioned, gby=[b@0 as b], aggr=[count(*)] |
| | CoalesceBatchesExec: target_batch_size=8192 |
| | RepartitionExec: partitioning=Hash([b@0], 4), input_partitions=4 |
| | RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 |
| | AggregateExec: mode=Partial, gby=[b@0 as b], aggr=[count(*)] |
| | DataSourceExec: partitions=1, partition_sizes=[1] |
| | |
+---------------+--------------------------------------------------------------------------------+
"###
@r"
+---------------+--------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+--------------------------------------------------------------------------------------------+
| logical_plan | Sort: count(*) ASC NULLS LAST |
| | Aggregate: groupBy=[[t1.b]], aggr=[[count(Int64(1)) AS count(*)]] |
| | TableScan: t1 projection=[b] |
| physical_plan | SortPreservingMergeExec: [count(*)@1 ASC NULLS LAST] |
| | SortExec: expr=[count(*)@1 ASC NULLS LAST], preserve_partitioning=[true] |
| | AggregateExec: mode=FinalPartitioned(HashPartitioned), gby=[b@0 as b], aggr=[count(*)] |
| | CoalesceBatchesExec: target_batch_size=8192 |
| | RepartitionExec: partitioning=Hash([b@0], 4), input_partitions=4 |
| | RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 |
| | AggregateExec: mode=Partial, gby=[b@0 as b], aggr=[count(*)] |
| | DataSourceExec: partitions=1, partition_sizes=[1] |
| | |
+---------------+--------------------------------------------------------------------------------------------+
"
);
Ok(())
}
Expand Down Expand Up @@ -2870,7 +2870,7 @@ async fn test_count_wildcard_on_where_scalar_subquery() -> Result<()> {
| | HashJoinExec: mode=CollectLeft, join_type=Left, on=[(a@0, a@1)], projection=[a@0, b@1, count(*)@2, __always_true@4] |
| | DataSourceExec: partitions=1, partition_sizes=[1] |
| | ProjectionExec: expr=[count(Int64(1))@1 as count(*), a@0 as a, true as __always_true] |
| | AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[count(Int64(1))] |
| | AggregateExec: mode=FinalPartitioned(HashPartitioned), gby=[a@0 as a], aggr=[count(Int64(1))] |
| | CoalesceBatchesExec: target_batch_size=8192 |
| | RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4 |
| | RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 |
Expand Down Expand Up @@ -2927,7 +2927,7 @@ async fn test_count_wildcard_on_where_scalar_subquery() -> Result<()> {
| | HashJoinExec: mode=CollectLeft, join_type=Left, on=[(a@0, a@1)], projection=[a@0, b@1, count(*)@2, __always_true@4] |
| | DataSourceExec: partitions=1, partition_sizes=[1] |
| | ProjectionExec: expr=[count(*)@1 as count(*), a@0 as a, true as __always_true] |
| | AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[count(*)] |
| | AggregateExec: mode=FinalPartitioned(HashPartitioned), gby=[a@0 as a], aggr=[count(*)] |
| | CoalesceBatchesExec: target_batch_size=8192 |
| | RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4 |
| | RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 |
Expand Down
Loading
Loading