Skip to content

Commit f1f7c88

Browse files
committed
test: reproducer of SanityCheck failure after EnforceSorting removes the coalesce added in the EnforceDistribution
1 parent 8d2c2c4 commit f1f7c88

File tree

2 files changed

+100
-6
lines changed

2 files changed

+100
-6
lines changed

datafusion/core/tests/physical_optimizer/enforce_distribution.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,7 @@ fn column_stats() -> Vec<ColumnStatistics> {
188188
]
189189
}
190190

191-
fn parquet_exec_with_stats() -> Arc<ParquetExec> {
191+
pub(crate) fn parquet_exec_with_stats() -> Arc<ParquetExec> {
192192
let mut statistics = Statistics::new_unknown(&schema());
193193
statistics.num_rows = Precision::Inexact(10);
194194
statistics.column_statistics = column_stats();
@@ -267,7 +267,7 @@ fn csv_exec_multiple_sorted(output_ordering: Vec<LexOrdering>) -> Arc<CsvExec> {
267267
)
268268
}
269269

270-
fn projection_exec_with_alias(
270+
pub(crate) fn projection_exec_with_alias(
271271
input: Arc<dyn ExecutionPlan>,
272272
alias_pairs: Vec<(String, String)>,
273273
) -> Arc<dyn ExecutionPlan> {

datafusion/core/tests/physical_optimizer/enforce_sorting.rs

Lines changed: 98 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,18 @@
1717

1818
use std::sync::Arc;
1919

20+
use crate::physical_optimizer::enforce_distribution::{
21+
parquet_exec_with_stats, projection_exec_with_alias,
22+
};
2023
use crate::physical_optimizer::test_utils::{
2124
aggregate_exec, bounded_window_exec, bounded_window_exec_non_set_monotonic,
2225
bounded_window_exec_with_partition, check_integrity, coalesce_batches_exec,
2326
coalesce_partitions_exec, create_test_schema, create_test_schema2,
2427
create_test_schema3, create_test_schema4, filter_exec, global_limit_exec,
2528
hash_join_exec, limit_exec, local_limit_exec, memory_exec, parquet_exec,
26-
repartition_exec, sort_exec, sort_expr, sort_expr_options, sort_merge_join_exec,
27-
sort_preserving_merge_exec, spr_repartition_exec, stream_exec_ordered, union_exec,
28-
RequirementsTestExec,
29+
repartition_exec, schema, sort_exec, sort_expr, sort_expr_options,
30+
sort_merge_join_exec, sort_preserving_merge_exec, spr_repartition_exec,
31+
stream_exec_ordered, union_exec, RequirementsTestExec,
2932
};
3033

3134
use datafusion_physical_plan::displayable;
@@ -35,11 +38,15 @@ use datafusion_common::Result;
3538
use datafusion_expr::JoinType;
3639
use datafusion_physical_expr::expressions::{col, Column, NotExpr};
3740
use datafusion_physical_optimizer::PhysicalOptimizerRule;
38-
use datafusion_physical_expr::Partitioning;
41+
use datafusion_physical_expr::{Partitioning};
3942
use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
4043
use datafusion_physical_optimizer::enforce_sorting::{EnforceSorting,PlanWithCorrespondingCoalescePartitions,PlanWithCorrespondingSort,parallelize_sorts,ensure_sorting};
4144
use datafusion_physical_optimizer::enforce_sorting::replace_with_order_preserving_variants::{replace_with_order_preserving_variants,OrderPreservationContext};
4245
use datafusion_physical_optimizer::enforce_sorting::sort_pushdown::{SortPushDown, assign_initial_requirements, pushdown_sorts};
46+
use datafusion_physical_optimizer::sanity_checker::SanityCheckPlan;
47+
use datafusion_physical_plan::aggregates::{
48+
AggregateExec, AggregateMode, PhysicalGroupBy,
49+
};
4350
use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec;
4451
use datafusion_physical_plan::repartition::RepartitionExec;
4552
use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
@@ -2099,6 +2106,93 @@ async fn test_commutativity() -> Result<()> {
20992106
Ok(())
21002107
}
21012108

2109+
fn single_partition_aggregate(
2110+
input: Arc<dyn ExecutionPlan>,
2111+
alias_pairs: Vec<(String, String)>,
2112+
) -> Arc<dyn ExecutionPlan> {
2113+
let schema = schema();
2114+
let group_by = alias_pairs
2115+
.iter()
2116+
.map(|(column, alias)| (col(column, &input.schema()).unwrap(), alias.to_string()))
2117+
.collect::<Vec<_>>();
2118+
let group_by = PhysicalGroupBy::new_single(group_by);
2119+
2120+
Arc::new(
2121+
AggregateExec::try_new(
2122+
AggregateMode::SinglePartitioned,
2123+
group_by,
2124+
vec![],
2125+
vec![],
2126+
input,
2127+
schema,
2128+
)
2129+
.unwrap(),
2130+
)
2131+
}
2132+
2133+
#[tokio::test]
2134+
async fn test_preserve_needed_coalesce() -> Result<()> {
2135+
// Input to EnforceSorting, from our test case.
2136+
let plan = projection_exec_with_alias(
2137+
union_exec(vec![parquet_exec_with_stats(); 2]),
2138+
vec![
2139+
("a".to_string(), "a".to_string()),
2140+
("b".to_string(), "value".to_string()),
2141+
],
2142+
);
2143+
let plan = Arc::new(CoalescePartitionsExec::new(plan));
2144+
let schema = schema();
2145+
let sort_key = LexOrdering::new(vec![PhysicalSortExpr {
2146+
expr: col("a", &schema).unwrap(),
2147+
options: SortOptions::default(),
2148+
}]);
2149+
let plan: Arc<dyn ExecutionPlan> =
2150+
single_partition_aggregate(plan, vec![("a".to_string(), "a1".to_string())]);
2151+
let plan = sort_exec(sort_key, plan);
2152+
2153+
// Starting plan: as in our test case.
2154+
assert_eq!(
2155+
get_plan_string(&plan),
2156+
vec![
2157+
"SortExec: expr=[a@0 ASC], preserve_partitioning=[false]",
2158+
" AggregateExec: mode=SinglePartitioned, gby=[a@0 as a1], aggr=[]",
2159+
" CoalescePartitionsExec",
2160+
" ProjectionExec: expr=[a@0 as a, b@1 as value]",
2161+
" UnionExec",
2162+
" ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
2163+
" ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
2164+
],
2165+
);
2166+
2167+
let checker = SanityCheckPlan::new().optimize(plan.clone(), &Default::default());
2168+
assert!(checker.is_ok());
2169+
2170+
// EnforceSorting will remove the coalesce, and add an SPM further up (above the aggregate).
2171+
let optimizer = EnforceSorting::new();
2172+
let optimized = optimizer.optimize(plan, &Default::default())?;
2173+
assert_eq!(
2174+
get_plan_string(&optimized),
2175+
vec![
2176+
"SortPreservingMergeExec: [a@0 ASC]",
2177+
" SortExec: expr=[a@0 ASC], preserve_partitioning=[true]",
2178+
" AggregateExec: mode=SinglePartitioned, gby=[a@0 as a1], aggr=[]",
2179+
" ProjectionExec: expr=[a@0 as a, b@1 as value]",
2180+
" UnionExec",
2181+
" ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
2182+
" ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
2183+
],
2184+
);
2185+
2186+
// Plan is now invalid.
2187+
let checker = SanityCheckPlan::new();
2188+
let err = checker
2189+
.optimize(optimized, &Default::default())
2190+
.unwrap_err();
2191+
assert!(err.message().contains(" does not satisfy distribution requirements: HashPartitioned[[a@0]]). Child-0 output partitioning: UnknownPartitioning(2)"));
2192+
2193+
Ok(())
2194+
}
2195+
21022196
#[tokio::test]
21032197
async fn test_coalesce_propagate() -> Result<()> {
21042198
let schema = create_test_schema()?;

0 commit comments

Comments
 (0)