-
Notifications
You must be signed in to change notification settings - Fork 1.4k
fix: fetch is missed in the EnsureSorting #14192
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Does this PR fix: |
No, it's another bug, I'll complete the issue summary. |
done |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @xudong963 !
This PR changes the datafusion-testing pin. I think it would be better to avoid that:
I think you can fix it like
git restore datafusion-testng
git commit -m 'restore datafusion-testing'
git reset
Other than that I think the PR is ok to merge, but I also left some suggestions for your consideration
fetch: if let Some(fetch) = child.plan.fetch() { | ||
Some(fetch) | ||
} else { | ||
None | ||
}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't this the same as this?
fetch: if let Some(fetch) = child.plan.fetch() { | |
Some(fetch) | |
} else { | |
None | |
}, | |
fetch: child.plan.fetch().clone(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes
let fetch = if let Some(fetch) = requirements.data.fetch.clone() { | ||
Some(fetch) | ||
} else { | ||
// It's possible current plan (`SortExec`) has a fetch value. | ||
sort_fetch | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you can express this same thing more concisely
let fetch = if let Some(fetch) = requirements.data.fetch.clone() { | |
Some(fetch) | |
} else { | |
// It's possible current plan (`SortExec`) has a fetch value. | |
sort_fetch | |
}; | |
let fetch = requirements.data.fetch.clone() | |
// It's possible current plan (`SortExec`) has a fetch value. | |
.and(sort_fetch); |
Also it seems strange that we have two fetches here that could be different .
I reverted just this change and the test still passes. By my reading of the code you have fixed the issue above by setting the requirements fetch
.
So maybe this would be better as check that the fetches are the same and an internal error if they aren't 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, they should be the same, checking is enough.
For the suggestion, I think what you mean is requirements.data.fetch.clone().or_else(|| sort_fetch)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a test which has different values
async fn test_remove_unnecessary_sort8() -> Result<()> {
let schema = create_test_schema()?;
let source = memory_exec(&schema);
let input = Arc::new(SortExec::new(
LexOrdering::new(vec![sort_expr("non_nullable_col", &schema)]),
source,
));
let limit = Arc::new(LocalLimitExec::new(input, 2));
let physical_plan = sort_exec(
vec![
sort_expr("non_nullable_col", &schema),
sort_expr("nullable_col", &schema),
],
limit,
);
let expected_input = [
"SortExec: expr=[non_nullable_col@1 ASC, nullable_col@0 ASC], preserve_partitioning=[false]",
" LocalLimitExec: fetch=2",
" SortExec: expr=[non_nullable_col@1 ASC], preserve_partitioning=[false]",
" MemoryExec: partitions=1, partition_sizes=[0]",
];
let expected_optimized = [
"LocalLimitExec: fetch=2",
" SortExec: TopK(fetch=2), expr=[non_nullable_col@1 ASC, nullable_col@0 ASC], preserve_partitioning=[false]",
" MemoryExec: partitions=1, partition_sizes=[0]",
];
assert_optimized!(expected_input, expected_optimized, physical_plan, true);
Ok(())
}
requirement data fetch is Some(2), but sort doesn't have fetch, is none.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @xudong963 for this PR.
Thanks, @akurmustafa! |
I'll be reviewing this today |
@@ -103,7 +106,8 @@ fn pushdown_sorts_helper( | |||
if !satisfy_parent { | |||
// Make sure this `SortExec` satisfies parent requirements: | |||
let sort_reqs = requirements.data.ordering_requirement.unwrap_or_default(); | |||
let fetch = requirements.data.fetch; | |||
// It's possible current plan (`SortExec`) has a fetch value. | |||
let fetch = requirements.data.fetch.or(sort_fetch); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't we need to take min() instead of or()?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i don't find a case that both requirement fetch and sort fetch are Some and requirement fetch is large than sort fetch
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Even if there is not a case from the sql API, a plan can be constructed like:
Limit: 10
--Sort: fetch:5
IIUC, that will set sort fetch as 10, but that will be suboptimal. Do you think there is something wrong in taking their min?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The PR looks good BTW. I don't want to block merging.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can address as a follow on PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks all, I opened a follow up PR: #14221
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great to me -- thank you @xudong963 @berkaysynnada and @akurmustafa
Thanks @xudong963 and @berkaysynnada and @berkaysynnada |
* fix: fetch is missed in the EnsureSorting * fix conflict * resolve comments from alamb * update
* fix: fetch is missed in the EnfoceSorting * fix conflict * resolve comments from alamb * update
* fix: fetch is missed in the EnfoceSorting * fix conflict * resolve comments from alamb * update
Which issue does this PR close?
select * from (select * from topk limit 8) order by x limit 3;
will return 8 rows which isn't expected.After debug: I found the fetch(limit 3) in topk will lost during

EnforceSorting
Rationale for this change
fix offset missed during EnforceSorting
What changes are included in this PR?
Are these changes tested?
yes
Are there any user-facing changes?