Skip to content

Commit 43cc181

Browse files
authored
fix: pipeline max_threads should use max width of Pipes. (#18837)
1 parent 1b75e7e commit 43cc181

File tree

4 files changed

+8
-5
lines changed

4 files changed

+8
-5
lines changed

src/query/pipeline/core/src/pipeline.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,7 @@ impl Pipeline {
227227
new_sinks.push_back((index, idx));
228228
}
229229
}
230-
230+
self.max_threads = self.max_threads.max(new_sinks.len());
231231
self.sinks = new_sinks;
232232
}
233233

@@ -246,13 +246,11 @@ impl Pipeline {
246246
}
247247

248248
pub fn set_max_threads(&mut self, max_threads: usize) {
249-
let mut max_pipe_size = 0;
250249
let sinks = self.graph.externals(Direction::Outgoing).count();
251250
let sources = self.graph.externals(Direction::Incoming).count();
252251

253-
max_pipe_size = std::cmp::max(max_pipe_size, sinks);
254-
max_pipe_size = std::cmp::max(max_pipe_size, sources);
255-
self.max_threads = std::cmp::min(max_pipe_size, max_threads);
252+
self.max_threads = std::cmp::max(self.max_threads, std::cmp::max(sinks, sources));
253+
self.max_threads = std::cmp::min(self.max_threads, max_threads);
256254
}
257255

258256
pub fn get_max_threads(&self) -> usize {

src/query/service/src/pipelines/executor/queries_pipeline_executor.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use databend_common_exception::ErrorCode;
2727
use databend_common_exception::Result;
2828
use fastrace::func_path;
2929
use fastrace::prelude::*;
30+
use log::info;
3031
use log::warn;
3132
use parking_lot::Mutex;
3233

@@ -105,6 +106,7 @@ impl QueriesPipelineExecutor {
105106

106107
fn execute_threads(self: &Arc<Self>, threads: usize) -> Vec<ThreadJoinHandle<Result<()>>> {
107108
let mut thread_join_handles = Vec::with_capacity(threads);
109+
info!(num = threads; "starting worker threads");
108110

109111
for thread_num in 0..threads {
110112
let this = self.clone();

src/query/service/src/pipelines/executor/query_pipeline_executor.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -343,6 +343,7 @@ impl QueryPipelineExecutor {
343343

344344
fn execute_threads(self: &Arc<Self>, threads: usize) -> Vec<ThreadJoinHandle<Result<()>>> {
345345
let mut thread_join_handles = Vec::with_capacity(threads);
346+
info!(num = threads; "starting worker threads");
346347

347348
for thread_num in 0..threads {
348349
let this = self.clone();

src/query/sql/src/planner/plans/copy_into_table.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,8 @@ pub struct CopyIntoTablePlan {
139139
// query may be Some even if is_transform=false
140140
pub is_transform: bool,
141141

142+
// control by setting `enable_distributed_copy`
143+
// set in optimizer
142144
pub enable_distributed: bool,
143145

144146
pub files_collected: bool,

0 commit comments

Comments
 (0)