Skip to content

Commit e640d67

Browse files
committed
fix: fetch is missed in the EnsureSorting (apache#14192)
* fix: fetch is missed in the EnsureSorting * fix conflict * resolve comments from alamb * update
1 parent 1bc5476 commit e640d67

File tree

3 files changed

+13
-6
lines changed

3 files changed

+13
-6
lines changed

Diff for: datafusion/core/src/physical_optimizer/enforce_sorting.rs

-2
Original file line numberDiff line numberDiff line change
@@ -186,13 +186,11 @@ impl PhysicalOptimizerRule for EnforceSorting {
186186
)
187187
})
188188
.data()?;
189-
190189
// Execute a top-down traversal to exploit sort push-down opportunities
191190
// missed by the bottom-up traversal:
192191
let mut sort_pushdown = SortPushDown::new_default(updated_plan.plan);
193192
assign_initial_requirements(&mut sort_pushdown);
194193
let adjusted = pushdown_sorts(sort_pushdown)?;
195-
196194
adjusted
197195
.plan
198196
.transform_up(|plan| Ok(Transformed::yes(replace_with_partial_sort(plan)?)))

Diff for: datafusion/core/src/physical_optimizer/sort_pushdown.rs

+7-3
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,9 @@ pub fn assign_initial_requirements(node: &mut SortPushDown) {
6464
for (child, requirement) in node.children.iter_mut().zip(reqs) {
6565
child.data = ParentRequirements {
6666
ordering_requirement: requirement,
67-
fetch: None,
67+
// If the parent has a fetch value, assign it to the children
68+
// Or use the fetch value of the child.
69+
fetch: child.plan.fetch(),
6870
};
6971
}
7072
}
@@ -95,14 +97,16 @@ fn pushdown_sorts_helper(
9597
.equivalence_properties()
9698
.ordering_satisfy_requirement(parent_reqs);
9799
if is_sort(plan) {
100+
let sort_fetch = plan.fetch();
98101
let required_ordering = plan
99102
.output_ordering()
100103
.map(PhysicalSortRequirement::from_sort_exprs)
101104
.unwrap_or_default();
102105
if !satisfy_parent {
103106
// Make sure this `SortExec` satisfies parent requirements:
104107
let sort_reqs = requirements.data.ordering_requirement.unwrap_or_default();
105-
let fetch = requirements.data.fetch;
108+
// It's possible current plan (`SortExec`) has a fetch value.
109+
let fetch = requirements.data.fetch.or(sort_fetch);
106110
requirements = requirements.children.swap_remove(0);
107111
requirements = add_sort_above(requirements, sort_reqs, fetch);
108112
};
@@ -112,7 +116,7 @@ fn pushdown_sorts_helper(
112116
if let Some(adjusted) =
113117
pushdown_requirement_to_children(&child.plan, &required_ordering)?
114118
{
115-
let fetch = child.plan.fetch();
119+
let fetch = sort_fetch.or_else(|| child.plan.fetch());
116120
for (grand_child, order) in child.children.iter_mut().zip(adjusted) {
117121
grand_child.data = ParentRequirements {
118122
ordering_requirement: order,

Diff for: datafusion/sqllogictest/test_files/topk.slt

+6-1
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,12 @@ select * from topk order by x desc limit 3;
4949
8
5050
5
5151

52-
52+
query I
53+
select * from (select * from topk limit 8) order by x limit 3;
54+
----
55+
0
56+
1
57+
2
5358

5459

5560
statement ok

0 commit comments

Comments
 (0)