Skip to content

Revert #15476 to fix the datafusion-examples CI fail #15496

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 30, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 36 additions & 34 deletions datafusion/physical-plan/src/joins/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use crate::projection::{
use crate::spill::get_record_batch_memory_size;
use crate::ExecutionPlanProperties;
use crate::{
coalesce_partitions::CoalescePartitionsExec,
common::can_project,
handle_state,
hash_utils::create_hashes,
Expand Down Expand Up @@ -791,44 +792,34 @@ impl ExecutionPlan for HashJoinExec {
);
}

if self.mode == PartitionMode::CollectLeft && left_partitions != 1 {
return internal_err!(
"Invalid HashJoinExec,the output partition count of the left child must be 1 in CollectLeft mode,\
consider using CoalescePartitionsExec"
);
}

let join_metrics = BuildProbeJoinMetrics::new(partition, &self.metrics);
let left_fut = match self.mode {
PartitionMode::CollectLeft => {
let left_stream = self.left.execute(0, Arc::clone(&context))?;

self.left_fut.once(|| {
let reservation = MemoryConsumer::new("HashJoinInput")
.register(context.memory_pool());

collect_left_input(
self.random_state.clone(),
left_stream,
on_left.clone(),
join_metrics.clone(),
reservation,
need_produce_result_in_final(self.join_type),
self.right().output_partitioning().partition_count(),
)
})
}
PartitionMode::CollectLeft => self.left_fut.once(|| {
let reservation =
MemoryConsumer::new("HashJoinInput").register(context.memory_pool());
collect_left_input(
None,
self.random_state.clone(),
Arc::clone(&self.left),
on_left.clone(),
Arc::clone(&context),
join_metrics.clone(),
reservation,
need_produce_result_in_final(self.join_type),
self.right().output_partitioning().partition_count(),
)
}),
PartitionMode::Partitioned => {
let left_stream = self.left.execute(partition, Arc::clone(&context))?;

let reservation =
MemoryConsumer::new(format!("HashJoinInput[{partition}]"))
.register(context.memory_pool());

OnceFut::new(collect_left_input(
Some(partition),
self.random_state.clone(),
left_stream,
Arc::clone(&self.left),
on_left.clone(),
Arc::clone(&context),
join_metrics.clone(),
reservation,
need_produce_result_in_final(self.join_type),
Expand Down Expand Up @@ -939,22 +930,36 @@ impl ExecutionPlan for HashJoinExec {

/// Reads the left (build) side of the input, buffering it in memory, to build a
/// hash table (`LeftJoinData`)
#[allow(clippy::too_many_arguments)]
async fn collect_left_input(
partition: Option<usize>,
random_state: RandomState,
left_stream: SendableRecordBatchStream,
left: Arc<dyn ExecutionPlan>,
on_left: Vec<PhysicalExprRef>,
context: Arc<TaskContext>,
metrics: BuildProbeJoinMetrics,
reservation: MemoryReservation,
with_visited_indices_bitmap: bool,
probe_threads_count: usize,
) -> Result<JoinLeftData> {
let schema = left_stream.schema();
let schema = left.schema();

let (left_input, left_input_partition) = if let Some(partition) = partition {
(left, partition)
} else if left.output_partitioning().partition_count() != 1 {
(Arc::new(CoalescePartitionsExec::new(left)) as _, 0)
} else {
(left, 0)
};

// Depending on partition argument load single partition or whole left side in memory
let stream = left_input.execute(left_input_partition, Arc::clone(&context))?;

// This operation performs 2 steps at once:
// 1. creates a [JoinHashMap] of all batches from the stream
// 2. stores the batches in a vector.
let initial = (Vec::new(), 0, metrics, reservation);
let (batches, num_rows, metrics, mut reservation) = left_stream
let (batches, num_rows, metrics, mut reservation) = stream
.try_fold(initial, |mut acc, batch| async {
let batch_size = get_record_batch_memory_size(&batch);
// Reserve memory for incoming batch
Expand Down Expand Up @@ -1650,7 +1655,6 @@ impl EmbeddedProjection for HashJoinExec {
#[cfg(test)]
mod tests {
use super::*;
use crate::coalesce_partitions::CoalescePartitionsExec;
use crate::test::TestMemoryExec;
use crate::{
common, expressions::Column, repartition::RepartitionExec, test::build_table_i32,
Expand Down Expand Up @@ -2101,7 +2105,6 @@ mod tests {
let left =
TestMemoryExec::try_new_exec(&[vec![batch1], vec![batch2]], schema, None)
.unwrap();
let left = Arc::new(CoalescePartitionsExec::new(left));

let right = build_table(
("a1", &vec![1, 2, 3]),
Expand Down Expand Up @@ -2174,7 +2177,6 @@ mod tests {
let left =
TestMemoryExec::try_new_exec(&[vec![batch1], vec![batch2]], schema, None)
.unwrap();
let left = Arc::new(CoalescePartitionsExec::new(left));
let right = build_table(
("a2", &vec![20, 30, 10]),
("b2", &vec![5, 6, 4]),
Expand Down