Skip to content

Commit af2856a

Browse files
committed
test: demonstrate that we lose repartitioning after the use of column estimates, and the remove of the test scenario's forcing of rr repartitioning
1 parent 01c23a3 commit af2856a

File tree

1 file changed

+108
-46
lines changed

1 file changed

+108
-46
lines changed

datafusion/core/src/physical_optimizer/enforce_distribution.rs

Lines changed: 108 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -1433,7 +1433,7 @@ pub(crate) mod tests {
14331433
use datafusion_physical_optimizer::output_requirements::OutputRequirements;
14341434

14351435
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
1436-
use datafusion_common::ScalarValue;
1436+
use datafusion_common::{ColumnStatistics, ScalarValue};
14371437
use datafusion_expr::{AggregateUDF, Operator};
14381438
use datafusion_physical_expr::expressions::{BinaryExpr, Literal};
14391439
use datafusion_physical_expr::{
@@ -1550,6 +1550,25 @@ pub(crate) mod tests {
15501550
]))
15511551
}
15521552

1553+
fn int64_stats() -> ColumnStatistics {
1554+
ColumnStatistics {
1555+
null_count: Precision::Absent,
1556+
max_value: Precision::Exact(1_000_000.into()),
1557+
min_value: Precision::Exact(0.into()),
1558+
distinct_count: Precision::Absent,
1559+
}
1560+
}
1561+
1562+
fn column_stats() -> Vec<ColumnStatistics> {
1563+
vec![
1564+
int64_stats(), // a
1565+
int64_stats(), // b
1566+
int64_stats(), // c
1567+
ColumnStatistics::default(),
1568+
ColumnStatistics::default(),
1569+
]
1570+
}
1571+
15531572
fn parquet_exec() -> Arc<ParquetExec> {
15541573
parquet_exec_with_sort(vec![])
15551574
}
@@ -1566,6 +1585,20 @@ pub(crate) mod tests {
15661585
.build_arc()
15671586
}
15681587

1588+
fn parquet_exec_with_stats() -> Arc<ParquetExec> {
1589+
let mut statistics = Statistics::new_unknown(&schema());
1590+
statistics.num_rows = Precision::Inexact(10);
1591+
statistics.column_statistics = column_stats();
1592+
1593+
let config =
1594+
FileScanConfig::new(ObjectStoreUrl::parse("test:///").unwrap(), schema())
1595+
.with_file(PartitionedFile::new("x".to_string(), 10000))
1596+
.with_statistics(statistics);
1597+
assert_eq!(config.statistics.num_rows, Precision::Inexact(10));
1598+
1599+
ParquetExec::builder(config).build_arc()
1600+
}
1601+
15691602
fn parquet_exec_multiple() -> Arc<ParquetExec> {
15701603
parquet_exec_multiple_sorted(vec![])
15711604
}
@@ -1870,6 +1903,10 @@ pub(crate) mod tests {
18701903
};
18711904

18721905
($EXPECTED_LINES: expr, $PLAN: expr, $FIRST_ENFORCE_DIST: expr, $PREFER_EXISTING_SORT: expr, $TARGET_PARTITIONS: expr, $REPARTITION_FILE_SCANS: expr, $REPARTITION_FILE_MIN_SIZE: expr, $PREFER_EXISTING_UNION: expr) => {
1906+
assert_optimized!($EXPECTED_LINES, $PLAN, $FIRST_ENFORCE_DIST, $PREFER_EXISTING_SORT, $TARGET_PARTITIONS, $REPARTITION_FILE_SCANS, $REPARTITION_FILE_MIN_SIZE, $PREFER_EXISTING_UNION, 1);
1907+
};
1908+
1909+
($EXPECTED_LINES: expr, $PLAN: expr, $FIRST_ENFORCE_DIST: expr, $PREFER_EXISTING_SORT: expr, $TARGET_PARTITIONS: expr, $REPARTITION_FILE_SCANS: expr, $REPARTITION_FILE_MIN_SIZE: expr, $PREFER_EXISTING_UNION: expr, $BATCH_SIZE: expr) => {
18731910
let expected_lines: Vec<&str> = $EXPECTED_LINES.iter().map(|s| *s).collect();
18741911

18751912
let mut config = ConfigOptions::new();
@@ -1879,7 +1916,12 @@ pub(crate) mod tests {
18791916
config.optimizer.prefer_existing_sort = $PREFER_EXISTING_SORT;
18801917
config.optimizer.prefer_existing_union = $PREFER_EXISTING_UNION;
18811918
// Use a small batch size, to trigger RoundRobin in tests
1882-
config.execution.batch_size = 1;
1919+
config.execution.batch_size = $BATCH_SIZE;
1920+
1921+
// This triggers the use of column statisticals estimates in the repartition calculation.
1922+
// Without this setting, the testing of `get_repartition_requirement_status` misses
1923+
// several branches.
1924+
config.execution.use_row_number_estimates_to_optimize_partitioning = true;
18831925

18841926
// NOTE: These tests verify the joint `EnforceDistribution` + `EnforceSorting` cascade
18851927
// because they were written prior to the separation of `BasicEnforcement` into
@@ -3548,9 +3590,25 @@ pub(crate) mod tests {
35483590
}
35493591
}
35503592

3593+
macro_rules! assert_optimized_without_forced_roundrobin {
3594+
($EXPECTED_LINES: expr, $PLAN: expr, $FIRST_ENFORCE_DIST: expr) => {
3595+
assert_optimized!(
3596+
$EXPECTED_LINES,
3597+
$PLAN,
3598+
$FIRST_ENFORCE_DIST,
3599+
false,
3600+
10,
3601+
false,
3602+
1024,
3603+
false,
3604+
100
3605+
);
3606+
};
3607+
}
3608+
35513609
#[test]
35523610
fn repartitions_for_extension_node_with_aggregate_after_union() -> Result<()> {
3553-
let union = union_exec(vec![parquet_exec(); 2]);
3611+
let union = union_exec(vec![parquet_exec_with_stats(); 2]);
35543612
let plan =
35553613
aggregate_exec_with_alias(union, vec![("a".to_string(), "a1".to_string())]);
35563614
let plan: Arc<dyn ExecutionPlan> = Arc::new(MyExtensionNode::new(plan));
@@ -3563,23 +3621,22 @@ pub(crate) mod tests {
35633621
"MyExtensionNode",
35643622
"CoalescePartitionsExec",
35653623
"AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[]",
3566-
"RepartitionExec: partitioning=Hash([a1@0], 10), input_partitions=10",
3624+
"RepartitionExec: partitioning=Hash([a1@0], 10), input_partitions=2",
35673625
"AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[]",
3568-
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2",
35693626
"UnionExec",
35703627
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
35713628
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
35723629
];
35733630

3574-
assert_optimized!(expected, plan.clone(), true);
3575-
assert_optimized!(expected, plan.clone(), false);
3631+
assert_optimized_without_forced_roundrobin!(expected, plan.clone(), true);
3632+
assert_optimized_without_forced_roundrobin!(expected, plan.clone(), false);
35763633

35773634
Ok(())
35783635
}
35793636

35803637
#[test]
35813638
fn repartitions_for_aggregate_after_sorted_union() -> Result<()> {
3582-
let union = union_exec(vec![parquet_exec(); 2]);
3639+
let union = union_exec(vec![parquet_exec_with_stats(); 2]);
35833640
let schema = schema();
35843641
let sort_key = LexOrdering::new(vec![PhysicalSortExpr {
35853642
expr: col("a", &schema).unwrap(),
@@ -3593,45 +3650,48 @@ pub(crate) mod tests {
35933650
let checker = check_plan_sanity(plan.clone(), &Default::default());
35943651
assert!(checker.is_ok());
35953652

3596-
// it still repartitions
3653+
// does not repartition on the first run
35973654
let expected_after_first_run = &[
35983655
"AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[], ordering_mode=Sorted",
3599-
"SortExec: expr=[a1@0 ASC NULLS LAST], preserve_partitioning=[true]",
3600-
"RepartitionExec: partitioning=Hash([a1@0], 10), input_partitions=10",
36013656
"AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[], ordering_mode=Sorted",
3602-
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
36033657
"SortPreservingMergeExec: [a@0 ASC]",
36043658
"UnionExec",
36053659
"SortExec: expr=[a@0 ASC], preserve_partitioning=[false]",
36063660
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
36073661
"SortExec: expr=[a@0 ASC], preserve_partitioning=[false]",
36083662
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
36093663
];
3610-
assert_optimized!(expected_after_first_run, plan.clone(), true);
3664+
assert_optimized_without_forced_roundrobin!(
3665+
expected_after_first_run,
3666+
plan.clone(),
3667+
true
3668+
);
36113669

3670+
// does repartition on the second run
36123671
let expected_after_second_run = &[
36133672
"AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[], ordering_mode=Sorted",
36143673
"SortExec: expr=[a1@0 ASC NULLS LAST], preserve_partitioning=[true]",
3615-
"RepartitionExec: partitioning=Hash([a1@0], 10), input_partitions=10",
3674+
"RepartitionExec: partitioning=Hash([a1@0], 10), input_partitions=2",
36163675
"SortExec: expr=[a1@0 ASC NULLS LAST], preserve_partitioning=[true]",
36173676
"AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[], ordering_mode=Sorted",
3618-
"SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]", // adds another sort
3619-
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2",
3620-
"SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]", // removes the SPM
36213677
"UnionExec",
36223678
"SortExec: expr=[a@0 ASC], preserve_partitioning=[false]",
36233679
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
36243680
"SortExec: expr=[a@0 ASC], preserve_partitioning=[false]",
36253681
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
36263682
];
3627-
assert_optimized!(expected_after_second_run, plan.clone(), false);
3683+
assert_optimized_without_forced_roundrobin!(
3684+
expected_after_second_run,
3685+
plan.clone(),
3686+
false
3687+
);
36283688

36293689
Ok(())
36303690
}
36313691

36323692
#[test]
3633-
fn repartition_for_aggregate_after_sorted_union_projection() -> Result<()> {
3634-
let union = union_exec(vec![parquet_exec(); 2]);
3693+
fn does_not_repartition_for_aggregate_after_sorted_union_projection() -> Result<()> {
3694+
let union = union_exec(vec![parquet_exec_with_stats(); 2]);
36353695
let projection = projection_exec_with_alias(
36363696
union,
36373697
vec![
@@ -3652,45 +3712,47 @@ pub(crate) mod tests {
36523712
let checker = check_plan_sanity(plan.clone(), &Default::default());
36533713
assert!(checker.is_ok());
36543714

3655-
// it still repartitions
3715+
// does not repartition on the first run
36563716
let expected_after_first_run = &[
36573717
"AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[], ordering_mode=Sorted",
3658-
"SortExec: expr=[a1@0 ASC NULLS LAST], preserve_partitioning=[true]",
3659-
"RepartitionExec: partitioning=Hash([a1@0], 10), input_partitions=10",
36603718
"AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[], ordering_mode=Sorted",
3661-
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
36623719
"SortPreservingMergeExec: [a@0 ASC]",
36633720
"SortExec: expr=[a@0 ASC], preserve_partitioning=[true]",
36643721
"ProjectionExec: expr=[a@0 as a, b@1 as value]",
36653722
"UnionExec",
36663723
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
36673724
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
36683725
];
3669-
assert_optimized!(expected_after_first_run, plan.clone(), true);
3726+
assert_optimized_without_forced_roundrobin!(
3727+
expected_after_first_run,
3728+
plan.clone(),
3729+
true
3730+
);
36703731

3732+
// does not repartition on the second run
36713733
let expected_after_second_run = &[
36723734
"AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[], ordering_mode=Sorted",
3673-
"SortExec: expr=[a1@0 ASC NULLS LAST], preserve_partitioning=[true]",
3674-
"RepartitionExec: partitioning=Hash([a1@0], 10), input_partitions=10",
3675-
"SortExec: expr=[a1@0 ASC NULLS LAST], preserve_partitioning=[true]", // adds another sort
36763735
"AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[], ordering_mode=Sorted",
3677-
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
3678-
// removes the SPM
36793736
"SortExec: expr=[a@0 ASC], preserve_partitioning=[false]",
3680-
"CoalescePartitionsExec", // adds the coalesce
3737+
"CoalescePartitionsExec",
36813738
"ProjectionExec: expr=[a@0 as a, b@1 as value]",
36823739
"UnionExec",
36833740
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
36843741
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
36853742
];
3686-
assert_optimized!(expected_after_second_run, plan.clone(), false);
3743+
assert_optimized_without_forced_roundrobin!(
3744+
expected_after_second_run,
3745+
plan.clone(),
3746+
false
3747+
);
36873748

36883749
Ok(())
36893750
}
36903751

36913752
#[test]
3692-
fn repartition_for_aggregate_sum_after_sorted_union_projection() -> Result<()> {
3693-
let union = union_exec(vec![parquet_exec(); 2]);
3753+
fn does_not_repartition_for_aggregate_sum_after_sorted_union_projection() -> Result<()>
3754+
{
3755+
let union = union_exec(vec![parquet_exec_with_stats(); 2]);
36943756
let projection = projection_exec_with_alias(
36953757
union,
36963758
vec![
@@ -3715,41 +3777,41 @@ pub(crate) mod tests {
37153777
let checker = check_plan_sanity(plan.clone(), &Default::default());
37163778
assert!(checker.is_ok());
37173779

3718-
// it still repartitions
3780+
// does not repartition on the first run
37193781
let expected_after_first_run = &[
37203782
"MyExtensionNode",
3721-
"CoalescePartitionsExec",
37223783
"AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[sum], ordering_mode=Sorted",
3723-
"SortExec: expr=[a1@0 ASC NULLS LAST], preserve_partitioning=[true]",
3724-
"RepartitionExec: partitioning=Hash([a1@0], 10), input_partitions=10",
37253784
"AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[sum], ordering_mode=Sorted",
3726-
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
37273785
"SortPreservingMergeExec: [a@0 ASC]",
37283786
"SortExec: expr=[a@0 ASC], preserve_partitioning=[true]",
37293787
"ProjectionExec: expr=[a@0 as a, b@1 as b]",
37303788
"UnionExec",
37313789
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
37323790
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
37333791
];
3734-
assert_optimized!(expected_after_first_run, plan.clone(), true);
3792+
assert_optimized_without_forced_roundrobin!(
3793+
expected_after_first_run,
3794+
plan.clone(),
3795+
true
3796+
);
37353797

3798+
// does not repartition on the second run
37363799
let expected_after_second_run = &[
37373800
"MyExtensionNode",
3738-
"CoalescePartitionsExec",
37393801
"AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[sum], ordering_mode=Sorted",
3740-
"SortExec: expr=[a1@0 ASC NULLS LAST], preserve_partitioning=[true]",
3741-
"RepartitionExec: partitioning=Hash([a1@0], 10), input_partitions=10",
3742-
"SortExec: expr=[a1@0 ASC NULLS LAST], preserve_partitioning=[true]",
37433802
"AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[sum], ordering_mode=Sorted",
3744-
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
37453803
"SortExec: expr=[a@0 ASC], preserve_partitioning=[false]",
37463804
"CoalescePartitionsExec",
37473805
"ProjectionExec: expr=[a@0 as a, b@1 as b]",
37483806
"UnionExec",
37493807
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
37503808
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
37513809
];
3752-
assert_optimized!(expected_after_second_run, plan.clone(), false);
3810+
assert_optimized_without_forced_roundrobin!(
3811+
expected_after_second_run,
3812+
plan.clone(),
3813+
false
3814+
);
37533815

37543816
Ok(())
37553817
}

0 commit comments

Comments
 (0)