Skip to content

Remove CoalescePartitions insertion from HashJoinExec #15476

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Mar 29, 2025

Conversation

ctsk
Copy link
Contributor

@ctsk ctsk commented Mar 28, 2025

Which issue does this PR close?

  • Closes #.

Rationale for this change

What changes are included in this PR?

Are these changes tested?

Are there any user-facing changes?

Copy link
Contributor

@Dandandan Dandandan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch

Copy link
Contributor

@berkaysynnada berkaysynnada left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

great, it works :) thank you @ctsk

@ctsk
Copy link
Contributor Author

ctsk commented Mar 28, 2025

Note that this does break for users of HashJoinExec that

  • Use the CollectLeft mode, with >1 partition on the build side AND
  • Construct their physical plan without running EnforceDistribution

@comphead
Copy link
Contributor

Note that this does break for users of HashJoinExec that

  • Use the CollectLeft mode, with >1 partition on the build side AND
  • Construct their physical plan without running EnforceDistribution

Thanks @ctsk what exactly is broken?

@ctsk
Copy link
Contributor Author

ctsk commented Mar 28, 2025

Before this PR, if someone hand-wired a CollectLeft HashJoin where the left child has more than one output partition, the HashJoin would automatically add a CoalescePartitions exec. This behaviour never triggers for plans that are constructed by datafusion, because the EnforceDistribution pass makes sure that that CoalescePartitions exist.

After this PR, if someone hand-wires a CollectLeft HashJoin and does not use the EnforceDistribution pass and provides a left child that has more than 1 output partition, the resulting plan, when executed will return a wrong result (because the hash table will only built on partition 0).

Now that I've written it out. I believe it is strictly better to return an internal_err! if one constructs such a plan and tries to execute it rather than return a wrong result.

@ctsk
Copy link
Contributor Author

ctsk commented Mar 28, 2025

I've amended the PR so that Executionplan::execute fails if one tries to execute such a problematic plan.

@Dandandan Dandandan merged commit 7e0738a into apache:main Mar 29, 2025
27 checks passed
@Dandandan
Copy link
Contributor

Thanks @ctsk

@alamb
Copy link
Contributor

alamb commented Mar 30, 2025

This PR appears to have caused CI failures for some reason so @goldmedal has a PR to revert it:

alamb pushed a commit that referenced this pull request Mar 30, 2025
@ctsk
Copy link
Contributor Author

ctsk commented Mar 30, 2025

Sorry about that! Thanks for tracking it down @goldmedal.

@goldmedal
Copy link
Contributor

This PR appears to have caused CI failures for some reason so @goldmedal has a PR to revert it:

It can be reproduced in the local by the following steps:

  • Checkout to the commit in the man branch: git checkout 7e0738a979054e95d1935d6e3e58b27385679031
  • run the datafame example: cargo run --package datafusion-examples --example dataframe

@ctsk
Copy link
Contributor Author

ctsk commented Apr 1, 2025

Alright, so what went wrong here is that for CollectLeft joins, the left ExecutionPlan gets executed for every input partition.

This was not caught by the tests 2 weeks ago, because most plans can have a partition executed multiple times - but repartition can not. I suspect there is no case where a CollectLeft join is used and a RepartitionExec is under the build side.

@berkaysynnada
Copy link
Contributor

This was not caught by the tests 2 weeks ago, because most plans can have a partition executed multiple times - but repartition can not.

As the signature of execute() is not self consuming, what I believe is all execute()'s should be idempotent. If repartition breaks this, should we create a ticket for that?

I suspect there is no case where a CollectLeft join is used and a RepartitionExec is under the build side.

Even if this case is not generated in tests at all, we should be prepared for that.

Alright, so what went wrong here is that for CollectLeft joins, the left ExecutionPlan gets executed for every input partition.

The issue is now CollectLeft should always receive only 1 partition, by EnforceDistribution, right? Then, we can re-apply these changes

@ctsk
Copy link
Contributor Author

ctsk commented Apr 2, 2025

Alright, so what went wrong here is that for CollectLeft joins, the left ExecutionPlan gets executed for every input partition.

The issue is now CollectLeft should always receive only 1 partition, by EnforceDistribution, right? Then, we can re-apply these changes

I typo'd there - I meant to say for every output partition.
The bug is not in the removal of the CoalescePartitions, but the changes that I did prior to that (lifting the execute of the build side out of the OnceFut - this commit: da06eff)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants