Skip to content

Introduce selection vector repartitioning #15423

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 23 commits into
base: main
Choose a base branch
from

Conversation

goldmedal
Copy link
Contributor

@goldmedal goldmedal commented Mar 25, 2025

Which issue does this PR close?

Rationale for this change

It's a pre-work of #15382 and #15383.

What changes are included in this PR?

  • Add a config to control the hash partitioning mode: datafusion.optimizer.prefer_hash_selection_vector_partitioning
        /// When set to true, the physical plan optimizer will prefer HashSelectionVectorPartitioning for RepartitionExec
        /// over HashPartitioning. HashSelectionVectorPartitioning can work without data copying.
        pub prefer_hash_selection_vector_partitioning_agg: bool, default = false
  • Add a new physical hash partitioning to change the behavior of RepartitionExec
    /// Allocate rows based on a hash of one or more expressions and the specified number
    /// of partitions with a selection vector column.
    ///
    /// The column is a boolean column called `__selection` that is used to filter out rows
    /// that should not be included in the partition. `true` means the row should be included
    /// and `false` means the row should be excluded.
    HashSelectionVector(Vec<Arc<dyn PhysicalExpr>>, usize),
  • Add HashPartitionMode to decide the hash partition behavior when planning physical plans.
/// The mode of hash partitioning
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum HashPartitionMode {
    /// Hash partitioning with a selection vector. See [Partitioning::HashSelectionVector]
    SelectionVector,
    /// The default hash partitioning
    HashPartitioned,
}

Are these changes tested?

Are there any user-facing changes?

@Dandandan
Copy link
Contributor

This starts to look nice @goldmedal

@github-actions github-actions bot added physical-expr Changes to the physical-expr crates optimizer Optimizer rules core Core DataFusion crate sqllogictest SQL Logic Tests (.slt) common Related to common crate proto Related to proto crate labels Mar 26, 2025
@goldmedal goldmedal force-pushed the feat/selection-vector-partition branch from 87a84f9 to 3a151dd Compare March 27, 2025 16:39
@github-actions github-actions bot added the documentation Improvements or additions to documentation label Mar 27, 2025
@goldmedal
Copy link
Contributor Author

After rebasing to the main, the hash join no longer uses RepartitionExec. Remove the test by 3a151dd

@goldmedal
Copy link
Contributor Author

#15339 It looks like the join plan is being changed.

@goldmedal goldmedal marked this pull request as ready for review March 27, 2025 17:54
@goldmedal goldmedal requested a review from Dandandan March 27, 2025 17:54
@goldmedal
Copy link
Contributor Author

also c.c. @zebsme

@Dandandan
Copy link
Contributor

Dandandan commented Mar 28, 2025

#15339 It looks like the join plan is being changed.

You should be able to get the test back by also setting datafusion.optimizer.hash_join_single_partition_threshold to 0 / a low value.

@goldmedal
Copy link
Contributor Author

You should be able to get the test back by also setting datafusion.optimizer.hash_join_single_partition_threshold to 0 / a low value.

Thanks. It works. I also added the test for the normal hash partition for RepartitionExec.

@goldmedal
Copy link
Contributor Author

I'm working on HashAggregate goldmedal#3 based on this PR.
I found we shouldn't use only one config, prefer_hash_selection_vector_partitioning, to control whether to use the selection vector or not. For testing and benchmark convenience, it's better to split it into two configs, one for hash join and another for HashAggregate.

Copy link
Contributor

@zebsme zebsme left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your contribution! @goldmedal

@Dandandan
Copy link
Contributor

I'm working on HashAggregate goldmedal#3 based on this PR. I found we shouldn't use only one config, prefer_hash_selection_vector_partitioning, to control whether to use the selection vector or not. For testing and benchmark convenience, it's better to split it into two configs, one for hash join and another for HashAggregate.

Makes sense to me 👍

@goldmedal goldmedal marked this pull request as draft March 29, 2025 17:42
Comment on lines +1438 to +1439
# TODO: The selection vector partitioning should be used for the hash join.
# After fix https://github.com/apache/datafusion/issues/15382
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't implement the planner for the hash join to avoid making this PR huge and complex. I think #15382 will implement the required parts.

@berkaysynnada
Copy link
Contributor

Isn't it always better partitioning on this selection vectors in case of hash-rep 🤔 What is the reason of keeping the old strategy ?

@2010YOUY01
Copy link
Contributor

Isn't it always better partitioning on this selection vectors in case of hash-rep 🤔 What is the reason of keeping the old strategy ?

I think to support this selection vector, the executors need to be updated to interpret an additional metadata column. However, since executors are part of the public interface that some downstream projects might depend on directly, it's better to ensure backward compatibility.

@goldmedal
Copy link
Contributor Author

I think to support this selection vector, the executors need to be updated to interpret an additional metadata column. However, since executors are part of the public interface that some downstream projects might depend on directly, it's better to ensure backward compatibility.

I agree with @2010YOUY01. It would be a breaking change for RepartitionExec.
After this PR, we still have the following-up works for #15382 and #15383. Maybe we can use the selection vector by default, but I also prefer to keep the original strategy for backward compatibility for a while.

@goldmedal
Copy link
Contributor Author

goldmedal commented Mar 30, 2025

The CI failure isn't related to this PR. It could be fixed by #15493

@goldmedal goldmedal force-pushed the feat/selection-vector-partition branch from b28bf32 to c6b33da Compare March 30, 2025 11:35
@2010YOUY01 2010YOUY01 requested a review from Copilot March 31, 2025 11:27
Copy link
Contributor

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR introduces selection vector repartitioning to enable efficient hash partitioning without data copying. Key changes include:

  • Adding a configuration option (prefer_hash_selection_vector_partitioning_agg) to control partitioning mode.
  • Introducing a new physical hash partitioning variant (HashSelectionVector) and updating the related enums and expressions.
  • Updating repartitioning, join, aggregate, optimizer and test code to support the new repartitioning mode.

Reviewed Changes

Copilot reviewed 64 out of 64 changed files in this pull request and generated no comments.

Show a summary per file
File Description
datafusion/physical-plan/src/windows/window_agg_exec.rs Updated distribution logic to include the new HashPartitionMode parameter.
datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs Updated repartitioning to use the new HashPartitionMode parameter.
datafusion/physical-plan/src/repartition/mod.rs Added support for HashSelectionVector in repartitioning and updated tests.
datafusion/physical-plan/src/joins/* Updated join repartitioning to propagate the HashPartitionMode parameter.
datafusion/physical-plan/src/aggregates/* and datafusion/physical-plan/src/aggregates/mod.rs Adjusted aggregate mode pattern matches to account for the new mode parameter.
datafusion/physical-optimizer/src/* Updated optimizer and output requirements handling to include the new mode.
datafusion/physical-expr/src/partitioning.rs Enhanced display and partition count functions to support HashSelectionVector.
datafusion/core/* Updated tests and planner behavior to reflect the new repartitioning mechanism.
datafusion/common/src/config.rs Added new config flag to control hash selection vector partitioning in aggregates.
Comments suppressed due to low confidence (1)

datafusion/physical-plan/src/repartition/mod.rs:391

  • Using unwrap() here may lead to a panic if RecordBatch creation fails. Consider propagating the error using the '?' operator or handling the error explicitly.
let batch = RecordBatch::try_new_with_options(Arc::clone(&schema), columns, &options).unwrap();

@goldmedal
Copy link
Contributor Author

I did the benchmarks for HashAggregate #15383 (comment)
It seems that HashAggregate is slower in the selection vector mode 🤔

@alamb alamb added the performance Make DataFusion faster label Mar 31, 2025
@alamb alamb mentioned this pull request Apr 7, 2025
12 tasks
@goldmedal goldmedal force-pushed the feat/selection-vector-partition branch from c6b33da to d10ec22 Compare April 13, 2025 15:08
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
common Related to common crate core Core DataFusion crate documentation Improvements or additions to documentation optimizer Optimizer rules performance Make DataFusion faster physical-expr Changes to the physical-expr crates proto Related to proto crate sqllogictest SQL Logic Tests (.slt)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add selection vector repartitioning
6 participants