Skip to content

Commit b7d3c03

Browse files
committed
fix: special case to not remove the needed coalesce
1 parent f1f7c88 commit b7d3c03

File tree

3 files changed

+18
-13
lines changed

3 files changed

+18
-13
lines changed

datafusion/core/tests/physical_optimizer/enforce_sorting.rs

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2174,21 +2174,20 @@ async fn test_preserve_needed_coalesce() -> Result<()> {
21742174
get_plan_string(&optimized),
21752175
vec![
21762176
"SortPreservingMergeExec: [a@0 ASC]",
2177-
" SortExec: expr=[a@0 ASC], preserve_partitioning=[true]",
2177+
" SortExec: expr=[a@0 ASC], preserve_partitioning=[false]",
21782178
" AggregateExec: mode=SinglePartitioned, gby=[a@0 as a1], aggr=[]",
2179-
" ProjectionExec: expr=[a@0 as a, b@1 as value]",
2180-
" UnionExec",
2181-
" ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
2182-
" ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
2179+
" CoalescePartitionsExec",
2180+
" ProjectionExec: expr=[a@0 as a, b@1 as value]",
2181+
" UnionExec",
2182+
" ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
2183+
" ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
21832184
],
21842185
);
21852186

2186-
// Plan is now invalid.
2187+
// Plan is valid.
21872188
let checker = SanityCheckPlan::new();
2188-
let err = checker
2189-
.optimize(optimized, &Default::default())
2190-
.unwrap_err();
2191-
assert!(err.message().contains(" does not satisfy distribution requirements: HashPartitioned[[a@0]]). Child-0 output partitioning: UnknownPartitioning(2)"));
2189+
let checker = checker.optimize(optimized, &Default::default());
2190+
assert!(checker.is_ok());
21922191

21932192
Ok(())
21942193
}

datafusion/physical-optimizer/src/enforce_sorting/mod.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,8 @@ use crate::enforce_sorting::sort_pushdown::{
4747
assign_initial_requirements, pushdown_sorts, SortPushDown,
4848
};
4949
use crate::utils::{
50-
add_sort_above, add_sort_above_with_check, is_coalesce_partitions, is_limit,
51-
is_repartition, is_sort, is_sort_preserving_merge, is_union, is_window,
50+
add_sort_above, add_sort_above_with_check, is_aggregation, is_coalesce_partitions,
51+
is_limit, is_repartition, is_sort, is_sort_preserving_merge, is_union, is_window,
5252
};
5353
use crate::PhysicalOptimizerRule;
5454

@@ -518,7 +518,7 @@ fn remove_bottleneck_in_subplan(
518518
) -> Result<PlanWithCorrespondingCoalescePartitions> {
519519
let plan = &requirements.plan;
520520
let children = &mut requirements.children;
521-
if is_coalesce_partitions(&children[0].plan) {
521+
if is_coalesce_partitions(&children[0].plan) && !is_aggregation(plan) {
522522
// We can safely use the 0th index since we have a `CoalescePartitionsExec`.
523523
let mut new_child_node = children[0].children.swap_remove(0);
524524
while new_child_node.plan.output_partitioning() == plan.output_partitioning()

datafusion/physical-optimizer/src/utils.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use std::sync::Arc;
1919

2020
use datafusion_physical_expr::LexRequirement;
2121
use datafusion_physical_expr_common::sort_expr::LexOrdering;
22+
use datafusion_physical_plan::aggregates::AggregateExec;
2223
use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec;
2324
use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
2425
use datafusion_physical_plan::repartition::RepartitionExec;
@@ -105,3 +106,8 @@ pub fn is_repartition(plan: &Arc<dyn ExecutionPlan>) -> bool {
105106
pub fn is_limit(plan: &Arc<dyn ExecutionPlan>) -> bool {
106107
plan.as_any().is::<GlobalLimitExec>() || plan.as_any().is::<LocalLimitExec>()
107108
}
109+
110+
// Checks whether the given operator is a [`AggregateExec`].
111+
pub fn is_aggregation(plan: &Arc<dyn ExecutionPlan>) -> bool {
112+
plan.as_any().is::<AggregateExec>()
113+
}

0 commit comments

Comments
 (0)