Skip to content

Commit 2afdc59

Browse files
committed
refactor
1 parent 04c6ce3 commit 2afdc59

37 files changed

+636
-872
lines changed

src/query/catalog/src/runtime_filter_info.rs

Lines changed: 75 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::collections::HashMap;
1516
use std::fmt::Debug;
1617
use std::fmt::Formatter;
1718
use std::sync::atomic::AtomicU64;
@@ -21,6 +22,8 @@ use std::sync::Arc;
2122
use databend_common_base::base::tokio::sync::watch;
2223
use databend_common_base::base::tokio::sync::watch::Receiver;
2324
use databend_common_base::base::tokio::sync::watch::Sender;
25+
use databend_common_exception::ErrorCode;
26+
use databend_common_exception::Result;
2427
use databend_common_expression::Expr;
2528
use xorf::BinaryFuse16;
2629

@@ -71,6 +74,7 @@ pub struct RuntimeFilterBloom {
7174

7275
#[derive(Default)]
7376
pub struct RuntimeFilterStats {
77+
_filter_id: usize,
7478
bloom_time_ns: AtomicU64,
7579
bloom_rows_filtered: AtomicU64,
7680
inlist_min_max_time_ns: AtomicU64,
@@ -98,47 +102,87 @@ impl RuntimeFilterStats {
98102
.fetch_add(partitions_pruned, Ordering::Relaxed);
99103
}
100104

101-
pub fn snapshot(&self) -> RuntimeFilterStatsSnapshot {
102-
RuntimeFilterStatsSnapshot {
103-
bloom_time_ns: self.bloom_time_ns.load(Ordering::Relaxed),
104-
bloom_rows_filtered: self.bloom_rows_filtered.load(Ordering::Relaxed),
105-
inlist_min_max_time_ns: self.inlist_min_max_time_ns.load(Ordering::Relaxed),
106-
min_max_rows_filtered: self.min_max_rows_filtered.load(Ordering::Relaxed),
107-
min_max_partitions_pruned: self.min_max_partitions_pruned.load(Ordering::Relaxed),
108-
}
105+
pub fn bloom_time_ns(&self) -> u64 {
106+
self.bloom_time_ns.load(Ordering::Relaxed)
109107
}
110-
}
111108

112-
#[derive(Default, Clone, Debug)]
113-
pub struct RuntimeFilterStatsSnapshot {
114-
pub bloom_time_ns: u64,
115-
pub bloom_rows_filtered: u64,
116-
pub inlist_min_max_time_ns: u64,
117-
pub min_max_rows_filtered: u64,
118-
pub min_max_partitions_pruned: u64,
119-
}
109+
pub fn bloom_rows_filtered(&self) -> u64 {
110+
self.bloom_rows_filtered.load(Ordering::Relaxed)
111+
}
112+
113+
pub fn inlist_min_max_time_ns(&self) -> u64 {
114+
self.inlist_min_max_time_ns.load(Ordering::Relaxed)
115+
}
120116

121-
#[derive(Clone, Debug)]
122-
pub struct RuntimeFilterReport {
123-
pub filter_id: usize,
124-
pub has_bloom: bool,
125-
pub has_inlist: bool,
126-
pub has_min_max: bool,
127-
pub stats: RuntimeFilterStatsSnapshot,
117+
pub fn min_max_rows_filtered(&self) -> u64 {
118+
self.min_max_rows_filtered.load(Ordering::Relaxed)
119+
}
120+
121+
pub fn min_max_partitions_pruned(&self) -> u64 {
122+
self.min_max_partitions_pruned.load(Ordering::Relaxed)
123+
}
128124
}
129125

130-
pub struct RuntimeFilterReady {
131-
pub runtime_filter_watcher: Sender<Option<()>>,
132-
/// A dummy receiver to make runtime_filter_watcher channel open.
133-
pub _runtime_filter_dummy_receiver: Receiver<Option<()>>,
126+
pub struct RuntimeFilterChannel {
127+
pub sender: Sender<Option<Vec<RuntimeFilterEntry>>>,
128+
/// A dummy receiver to make sender channel open.
129+
pub _guard: Receiver<Option<Vec<RuntimeFilterEntry>>>,
134130
}
135131

136-
impl Default for RuntimeFilterReady {
132+
impl Default for RuntimeFilterChannel {
137133
fn default() -> Self {
138134
let (watcher, dummy_receiver) = watch::channel(None);
139135
Self {
140-
runtime_filter_watcher: watcher,
141-
_runtime_filter_dummy_receiver: dummy_receiver,
136+
sender: watcher,
137+
_guard: dummy_receiver,
138+
}
139+
}
140+
}
141+
142+
impl RuntimeFilterChannel {
143+
pub async fn wait_ready(&self) -> Result<Vec<RuntimeFilterEntry>> {
144+
let mut rx = self.sender.subscribe();
145+
if let Some(entries) = (*rx.borrow()).clone() {
146+
return Ok(entries);
147+
}
148+
149+
match rx.changed().await {
150+
Ok(()) => {}
151+
Err(e) => {
152+
return Err(ErrorCode::TokioError(format!(
153+
"runtime filter channel changed error: {}",
154+
e
155+
)));
156+
}
157+
}
158+
let entries = (*rx.borrow()).clone().unwrap_or_default();
159+
Ok(entries)
160+
}
161+
}
162+
163+
#[derive(Default, Clone)]
164+
pub struct RuntimeFilterChannels {
165+
pub channels: HashMap<usize, Arc<RuntimeFilterChannel>>,
166+
}
167+
168+
impl RuntimeFilterChannels {
169+
pub fn register(&mut self, scan_id: usize, channel: Arc<RuntimeFilterChannel>) {
170+
self.channels.insert(scan_id, channel);
171+
}
172+
173+
pub fn publish_all(&self, filters: &HashMap<usize, RuntimeFilterInfo>) -> Result<()> {
174+
for (scan_id, channel) in &self.channels {
175+
let entries = filters
176+
.get(scan_id)
177+
.map(|info| info.filters.clone())
178+
.unwrap_or_default();
179+
channel.sender.send(Some(entries.clone())).map_err(|_| {
180+
ErrorCode::TokioError(format!(
181+
"runtime filter channel closed for scan_id {}",
182+
scan_id
183+
))
184+
})?;
142185
}
186+
Ok(())
143187
}
144188
}

src/query/catalog/src/table_context.rs

Lines changed: 22 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ use databend_common_exception::ErrorCode;
3131
use databend_common_exception::Result;
3232
use databend_common_exception::ResultExt;
3333
use databend_common_expression::BlockThresholds;
34-
use databend_common_expression::Expr;
3534
use databend_common_expression::FunctionContext;
3635
use databend_common_expression::Scalar;
3736
use databend_common_expression::TableSchema;
@@ -67,7 +66,6 @@ use databend_storages_common_table_meta::meta::TableMetaTimestamps;
6766
use databend_storages_common_table_meta::meta::TableSnapshot;
6867
use parking_lot::Mutex;
6968
use parking_lot::RwLock;
70-
use xorf::BinaryFuse16;
7169

7270
use crate::catalog::Catalog;
7371
use crate::cluster_info::Cluster;
@@ -78,10 +76,9 @@ use crate::plan::PartInfoPtr;
7876
use crate::plan::PartStatistics;
7977
use crate::plan::Partitions;
8078
use crate::query_kind::QueryKind;
81-
use crate::runtime_filter_info::RuntimeFilterEntry;
79+
use crate::runtime_filter_info::RuntimeFilterChannel;
8280
use crate::runtime_filter_info::RuntimeFilterInfo;
83-
use crate::runtime_filter_info::RuntimeFilterReady;
84-
use crate::runtime_filter_info::RuntimeFilterReport;
81+
use crate::runtime_filter_info::RuntimeFilterStats;
8582
use crate::session_type::SessionType;
8683
use crate::statistics::data_cache_statistics::DataCacheMetrics;
8784
use crate::table::Table;
@@ -351,35 +348,10 @@ pub trait TableContext: Send + Sync {
351348

352349
fn get_query_profiles(&self) -> Vec<PlanProfile>;
353350

354-
fn set_runtime_filter(&self, _filters: HashMap<usize, RuntimeFilterInfo>) {
355-
unimplemented!()
356-
}
357-
358-
fn set_runtime_filter_ready(&self, table_index: usize, ready: Arc<RuntimeFilterReady>);
359-
360-
fn get_runtime_filter_ready(&self, table_index: usize) -> Vec<Arc<RuntimeFilterReady>>;
361-
362-
fn set_wait_runtime_filter(&self, table_index: usize, need_to_wait: bool);
363-
364-
fn get_wait_runtime_filter(&self, table_index: usize) -> bool;
365-
366-
fn clear_runtime_filter(&self);
367-
368351
fn set_merge_into_join(&self, join: MergeIntoJoin);
369352

370353
fn get_merge_into_join(&self) -> MergeIntoJoin;
371354

372-
fn get_runtime_filters(&self, id: usize) -> Vec<RuntimeFilterEntry>;
373-
374-
fn get_bloom_runtime_filter_with_id(&self, id: usize) -> Vec<(String, BinaryFuse16)>;
375-
376-
fn get_inlist_runtime_filter_with_id(&self, id: usize) -> Vec<Expr<String>>;
377-
378-
fn get_min_max_runtime_filter_with_id(&self, id: usize) -> Vec<Expr<String>>;
379-
380-
fn runtime_filter_reports(&self) -> HashMap<usize, Vec<RuntimeFilterReport>>;
381-
382-
fn has_bloom_runtime_filters(&self, id: usize) -> bool;
383355
fn txn_mgr(&self) -> TxnManagerRef;
384356
fn get_table_meta_timestamps(
385357
&self,
@@ -481,6 +453,26 @@ pub trait TableContext: Send + Sync {
481453
fn merge_pruned_partitions_stats(&self, _other: &HashMap<u32, PartStatistics>) {
482454
unimplemented!()
483455
}
456+
457+
fn register_runtime_filter_stats(&self, _runtime_filters: &HashMap<usize, RuntimeFilterInfo>) {
458+
unimplemented!()
459+
}
460+
461+
fn register_runtime_filter_channel(
462+
&self,
463+
_scan_id: usize,
464+
_channel: Arc<RuntimeFilterChannel>,
465+
) {
466+
unimplemented!()
467+
}
468+
469+
fn get_runtime_filter_channels(&self, _scan_id: usize) -> Vec<Arc<RuntimeFilterChannel>> {
470+
unimplemented!()
471+
}
472+
473+
fn get_runtime_filter_stats(&self) -> Arc<DashMap<usize, Vec<Arc<RuntimeFilterStats>>>> {
474+
unimplemented!()
475+
}
484476
}
485477

486478
pub type AbortChecker = Arc<dyn CheckAbort + Send + Sync>;

src/query/service/src/interpreters/interpreter_explain.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ impl Interpreter for ExplainInterpreter {
180180
profs: HashMap::new(),
181181
metadata: &metadata,
182182
scan_id_to_runtime_filters: HashMap::new(),
183-
runtime_filter_reports: HashMap::new(),
183+
runtime_filter_stats: Default::default(),
184184
};
185185

186186
let formatter = plan.formatter()?;
@@ -484,7 +484,7 @@ impl ExplainInterpreter {
484484
plan.set_pruning_stats(&mut pruned_partitions_stats);
485485
}
486486

487-
let runtime_filter_reports = self.ctx.runtime_filter_reports();
487+
let runtime_filter_stats = self.ctx.get_runtime_filter_stats();
488488

489489
let result = match self.partial {
490490
true => {
@@ -493,7 +493,7 @@ impl ExplainInterpreter {
493493
profs: query_profiles.clone(),
494494
metadata: &metadata,
495495
scan_id_to_runtime_filters: HashMap::new(),
496-
runtime_filter_reports: runtime_filter_reports.clone(),
496+
runtime_filter_stats: runtime_filter_stats.clone(),
497497
};
498498

499499
let formatter = plan.formatter()?;
@@ -506,7 +506,7 @@ impl ExplainInterpreter {
506506
profs: query_profiles.clone(),
507507
metadata: &metadata,
508508
scan_id_to_runtime_filters: HashMap::new(),
509-
runtime_filter_reports: runtime_filter_reports.clone(),
509+
runtime_filter_stats: runtime_filter_stats.clone(),
510510
};
511511
let formatter = plan.formatter()?;
512512
let format_node = formatter.format(&mut context)?;

src/query/service/src/physical_plans/format/common.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,14 @@
1313
// limitations under the License.
1414

1515
use std::collections::HashMap;
16+
use std::sync::Arc;
1617

18+
use dashmap::DashMap;
1719
use databend_common_ast::ast::FormatTreeNode;
1820
use databend_common_base::base::format_byte_size;
1921
use databend_common_base::runtime::profile::get_statistics_desc;
2022
use databend_common_catalog::plan::PartStatistics;
21-
use databend_common_catalog::runtime_filter_info::RuntimeFilterReport;
23+
use databend_common_catalog::runtime_filter_info::RuntimeFilterStats;
2224
use databend_common_expression::DataSchemaRef;
2325
use databend_common_sql::executor::physical_plans::AggregateFunctionDesc;
2426
use databend_common_sql::IndexType;
@@ -33,7 +35,7 @@ pub struct FormatContext<'a> {
3335
pub metadata: &'a Metadata,
3436
pub profs: HashMap<u32, PlanProfile>,
3537
pub scan_id_to_runtime_filters: HashMap<IndexType, Vec<PhysicalRuntimeFilter>>,
36-
pub runtime_filter_reports: HashMap<IndexType, Vec<RuntimeFilterReport>>,
38+
pub runtime_filter_stats: Arc<DashMap<IndexType, Vec<Arc<RuntimeFilterStats>>>>,
3739
}
3840

3941
pub fn pretty_display_agg_desc(desc: &AggregateFunctionDesc, metadata: &Metadata) -> String {

src/query/service/src/physical_plans/format/format_table_scan.rs

Lines changed: 25 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ impl<'a> PhysicalFormat for TableScanFormatter<'a> {
125125
// runtime filters
126126
if let Some(filters) = ctx.scan_id_to_runtime_filters.get(&self.inner.scan_id) {
127127
if !filters.is_empty() {
128-
let reports = ctx.runtime_filter_reports.get(&self.inner.scan_id);
128+
let stats = ctx.runtime_filter_stats.get(&self.inner.scan_id);
129129
let mut filter_nodes = Vec::new();
130130
for filter in filters.iter() {
131131
if let Some((probe_key, _)) = filter
@@ -155,34 +155,30 @@ impl<'a> PhysicalFormat for TableScanFormatter<'a> {
155155
let mut detail_children =
156156
vec![FormatTreeNode::new(format!("type: [{}]", type_text))];
157157

158-
if let Some(reports) = reports {
159-
if let Some(report) =
160-
reports.iter().find(|report| report.filter_id == filter.id)
161-
{
162-
if report.has_bloom {
163-
detail_children.push(FormatTreeNode::new(format!(
164-
"bloom rows filtered: {}",
165-
report.stats.bloom_rows_filtered
166-
)));
167-
detail_children.push(FormatTreeNode::new(format!(
168-
"bloom time: {:?}",
169-
Duration::from_nanos(report.stats.bloom_time_ns)
170-
)));
171-
}
172-
if report.has_inlist || report.has_min_max {
173-
detail_children.push(FormatTreeNode::new(format!(
174-
"inlist/min-max time: {:?}",
175-
Duration::from_nanos(report.stats.inlist_min_max_time_ns)
176-
)));
177-
detail_children.push(FormatTreeNode::new(format!(
178-
"min-max rows filtered: {}",
179-
report.stats.min_max_rows_filtered
180-
)));
181-
detail_children.push(FormatTreeNode::new(format!(
182-
"min-max partitions pruned: {}",
183-
report.stats.min_max_partitions_pruned
184-
)));
185-
}
158+
if let Some(ref filter_stats) = stats {
159+
// Find the matching stat by filter id
160+
// For now, we just use the first one as stats are grouped by scan_id
161+
if let Some(stat) = filter_stats.first() {
162+
detail_children.push(FormatTreeNode::new(format!(
163+
"bloom rows filtered: {}",
164+
stat.bloom_rows_filtered()
165+
)));
166+
detail_children.push(FormatTreeNode::new(format!(
167+
"bloom time: {:?}",
168+
Duration::from_nanos(stat.bloom_time_ns())
169+
)));
170+
detail_children.push(FormatTreeNode::new(format!(
171+
"inlist/min-max time: {:?}",
172+
Duration::from_nanos(stat.inlist_min_max_time_ns())
173+
)));
174+
detail_children.push(FormatTreeNode::new(format!(
175+
"min-max rows filtered: {}",
176+
stat.min_max_rows_filtered()
177+
)));
178+
detail_children.push(FormatTreeNode::new(format!(
179+
"min-max partitions pruned: {}",
180+
stat.min_max_partitions_pruned()
181+
)));
186182
}
187183
}
188184

0 commit comments

Comments
 (0)