Skip to content

Commit a2203f0

Browse files
committed
feat: scan wait for runtime filter build ready
1 parent 6b115af commit a2203f0

File tree

2 files changed

+30
-17
lines changed

2 files changed

+30
-17
lines changed

src/query/service/src/physical_plans/physical_table_scan.rs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -206,9 +206,6 @@ impl IPhysicalPlan for TableScan {
206206
fn build_pipeline2(&self, builder: &mut PipelineBuilder) -> Result<()> {
207207
let table = builder.ctx.build_table_from_source_plan(&self.source)?;
208208
builder.ctx.set_partitions(self.source.parts.clone())?;
209-
builder
210-
.ctx
211-
.set_wait_runtime_filter(self.scan_id, builder.contain_sink_processor);
212209

213210
if builder.ctx.get_settings().get_enable_prune_pipeline()? {
214211
if let Some(prune_pipeline) = table.build_prune_pipeline(

src/query/storages/fuse/src/operations/read/parquet_data_source_deserializer.rs

Lines changed: 30 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -77,10 +77,9 @@ pub struct DeserializeDataTransform {
7777

7878
base_block_ids: Option<Scalar>,
7979
cached_runtime_filter: Option<Vec<BloomRuntimeFilterRef>>,
80-
// for merge_into target build.
8180
need_reserve_block_info: bool,
8281
need_wait_runtime_filter: bool,
83-
runtime_filter_ready: Option<Arc<RuntimeFilterReady>>,
82+
runtime_filter_ready: Vec<Arc<RuntimeFilterReady>>,
8483
}
8584

8685
#[derive(Clone)]
@@ -103,8 +102,7 @@ impl DeserializeDataTransform {
103102
virtual_reader: Arc<Option<VirtualColumnReader>>,
104103
) -> Result<ProcessorPtr> {
105104
let scan_progress = ctx.get_scan_progress();
106-
let need_wait_runtime_filter =
107-
!ctx.get_cluster().is_empty() && ctx.get_wait_runtime_filter(plan.scan_id);
105+
let need_wait_runtime_filter = !ctx.get_cluster().is_empty();
108106

109107
let mut src_schema: DataSchema = (block_reader.schema().as_ref()).into();
110108
if let Some(virtual_reader) = virtual_reader.as_ref() {
@@ -142,7 +140,7 @@ impl DeserializeDataTransform {
142140
cached_runtime_filter: None,
143141
need_reserve_block_info,
144142
need_wait_runtime_filter,
145-
runtime_filter_ready: None,
143+
runtime_filter_ready: vec![],
146144
})))
147145
}
148146

@@ -204,8 +202,8 @@ impl DeserializeDataTransform {
204202
}
205203
self.need_wait_runtime_filter = false;
206204
let runtime_filter_ready = self.ctx.get_runtime_filter_ready(self.scan_id);
207-
if runtime_filter_ready.len() == 1 {
208-
self.runtime_filter_ready = Some(runtime_filter_ready[0].clone());
205+
if !runtime_filter_ready.is_empty() {
206+
self.runtime_filter_ready = runtime_filter_ready;
209207
true
210208
} else {
211209
false
@@ -389,14 +387,32 @@ impl Processor for DeserializeDataTransform {
389387

390388
#[async_backtrace::framed]
391389
async fn async_process(&mut self) -> Result<()> {
392-
let runtime_filter_ready = self.runtime_filter_ready.as_mut().unwrap();
393-
let mut rx = runtime_filter_ready.runtime_filter_watcher.subscribe();
394-
if (*rx.borrow()).is_some() {
395-
return Ok(());
390+
use std::time::Duration;
391+
392+
use databend_common_base::base::tokio::time::timeout;
393+
394+
let timeout_duration = Duration::from_secs(30);
395+
396+
for runtime_filter_ready in &self.runtime_filter_ready {
397+
let mut rx = runtime_filter_ready.runtime_filter_watcher.subscribe();
398+
if (*rx.borrow()).is_some() {
399+
continue;
400+
}
401+
402+
match timeout(timeout_duration, rx.changed()).await {
403+
Ok(Ok(())) => {}
404+
Ok(Err(_)) => {
405+
return Err(ErrorCode::TokioError("watcher's sender is dropped"));
406+
}
407+
Err(_) => {
408+
log::warn!(
409+
"Runtime filter wait timeout after {:?} for scan_id: {}",
410+
timeout_duration,
411+
self.scan_id
412+
);
413+
}
414+
}
396415
}
397-
rx.changed()
398-
.await
399-
.map_err(|_| ErrorCode::TokioError("watcher's sender is dropped"))?;
400416
Ok(())
401417
}
402418
}

0 commit comments

Comments
 (0)