Skip to content

Commit 3d15755

Browse files
committed
fix
1 parent 2afdc59 commit 3d15755

File tree

6 files changed

+26
-22
lines changed

6 files changed

+26
-22
lines changed

src/query/catalog/src/runtime_filter_info.rs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ pub struct RuntimeFilterBloom {
7474

7575
#[derive(Default)]
7676
pub struct RuntimeFilterStats {
77-
_filter_id: usize,
77+
pub filter_id: usize,
7878
bloom_time_ns: AtomicU64,
7979
bloom_rows_filtered: AtomicU64,
8080
inlist_min_max_time_ns: AtomicU64,
@@ -83,8 +83,15 @@ pub struct RuntimeFilterStats {
8383
}
8484

8585
impl RuntimeFilterStats {
86-
pub fn new() -> Self {
87-
Self::default()
86+
pub fn new(filter_id: usize) -> Self {
87+
Self {
88+
filter_id,
89+
bloom_time_ns: AtomicU64::new(0),
90+
bloom_rows_filtered: AtomicU64::new(0),
91+
inlist_min_max_time_ns: AtomicU64::new(0),
92+
min_max_rows_filtered: AtomicU64::new(0),
93+
min_max_partitions_pruned: AtomicU64::new(0),
94+
}
8895
}
8996

9097
pub fn record_bloom(&self, time_ns: u64, rows_filtered: u64) {

src/query/service/src/physical_plans/format/format_table_scan.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -156,9 +156,9 @@ impl<'a> PhysicalFormat for TableScanFormatter<'a> {
156156
vec![FormatTreeNode::new(format!("type: [{}]", type_text))];
157157

158158
if let Some(ref filter_stats) = stats {
159-
// Find the matching stat by filter id
160-
// For now, we just use the first one as stats are grouped by scan_id
161-
if let Some(stat) = filter_stats.first() {
159+
if let Some(stat) =
160+
filter_stats.iter().find(|stat| stat.filter_id == filter.id)
161+
{
162162
detail_children.push(FormatTreeNode::new(format!(
163163
"bloom rows filtered: {}",
164164
stat.bloom_rows_filtered()

src/query/service/src/physical_plans/physical_hash_join.rs

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,6 @@ use crate::pipelines::processors::transforms::memory::outer_left_join::OuterLeft
5959
use crate::pipelines::processors::transforms::BasicHashJoinState;
6060
use crate::pipelines::processors::transforms::HashJoinProbeState;
6161
use crate::pipelines::processors::transforms::InnerHashJoin;
62-
use crate::pipelines::processors::transforms::RuntimeFilterContext;
6362
use crate::pipelines::processors::transforms::TransformHashJoin;
6463
use crate::pipelines::processors::transforms::TransformHashJoinBuild;
6564
use crate::pipelines::processors::transforms::TransformHashJoinProbe;
@@ -284,7 +283,7 @@ impl IPhysicalPlan for HashJoin {
284283
}
285284

286285
// Create the join state with optimization flags
287-
let state = self.build_state(builder)?;
286+
let state = self.build_state(builder, desc)?;
288287

289288
if let Some((build_cache_index, _)) = self.build_side_cache_info {
290289
builder.hash_join_states.insert(
@@ -306,13 +305,17 @@ impl IPhysicalPlan for HashJoin {
306305
}
307306

308307
impl HashJoin {
309-
fn build_state(&self, builder: &mut PipelineBuilder) -> Result<Arc<HashJoinState>> {
308+
fn build_state(
309+
&self,
310+
builder: &mut PipelineBuilder,
311+
desc: Arc<HashJoinDesc>,
312+
) -> Result<Arc<HashJoinState>> {
310313
let (enable_optimization, is_distributed) = builder.merge_into_get_optimization_flag(self);
311314
HashJoinState::try_create(
312315
builder.ctx.clone(),
313316
self.build.output_schema()?,
314317
&self.build_projections,
315-
HashJoinDesc::create(&builder.ctx, builder.func_ctx.clone(), self)?,
318+
desc.clone(),
316319
&self.probe_to_build,
317320
is_distributed,
318321
enable_optimization,
@@ -408,14 +411,6 @@ impl HashJoin {
408411
desc: Arc<HashJoinDesc>,
409412
) -> Result<()> {
410413
let state = Arc::new(BasicHashJoinState::create());
411-
// We must build the runtime filter before constructing the child nodes,
412-
// as we will inject some runtime filter information into the context for the child nodes to use.
413-
let rf_desc = RuntimeFilterContext::create(
414-
&builder.ctx,
415-
builder.func_ctx.clone(),
416-
&self.runtime_filter,
417-
self.broadcast_id,
418-
)?;
419414

420415
if let Some((build_cache_index, _)) = self.build_side_cache_info {
421416
builder.hash_join_states.insert(
@@ -461,7 +456,7 @@ impl HashJoin {
461456
self.create_join(&self.join_type, builder, desc.clone(), state.clone())?,
462457
stage_sync_barrier.clone(),
463458
self.projections.clone(),
464-
Arc::new(rf_desc.clone()),
459+
Arc::new(desc.runtime_filter.clone()),
465460
);
466461

467462
join_pipe_items.push(PipeItem::create(

src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_state.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ pub struct HashJoinState {
111111
/// A dummy receiver to make build done watcher channel open
112112
pub(crate) _build_done_dummy_receiver: Receiver<HashTableType>,
113113
/// Some description of hash join. Such as join type, join keys, etc.
114-
pub(crate) hash_join_desc: HashJoinDesc,
114+
pub(crate) hash_join_desc: Arc<HashJoinDesc>,
115115
/// Interrupt the build phase or probe phase.
116116
pub(crate) interrupt: AtomicBool,
117117
/// If there is no data in build side, maybe we can fast return.
@@ -156,7 +156,7 @@ impl HashJoinState {
156156
ctx: Arc<QueryContext>,
157157
mut build_schema: DataSchemaRef,
158158
build_projections: &ColumnSet,
159-
hash_join_desc: HashJoinDesc,
159+
hash_join_desc: Arc<HashJoinDesc>,
160160
probe_to_build: &[(usize, (bool, bool))],
161161
merge_into_is_distributed: bool,
162162
enable_merge_into_optimization: bool,

src/query/service/src/pipelines/processors/transforms/runtime_filter/context.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,8 @@ impl RuntimeFilterContext {
141141
let runtime_filter_infos =
142142
super::convert::build_runtime_filter_infos(packet, runtime_filter_descs)?;
143143

144+
self.ctx
145+
.register_runtime_filter_stats(&runtime_filter_infos);
144146
self.channels.publish_all(&runtime_filter_infos)?;
145147

146148
let merge_time = merge_start.elapsed();

src/query/service/src/pipelines/processors/transforms/runtime_filter/convert.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ pub fn build_runtime_filter_infos(
8787
} else {
8888
None
8989
},
90-
stats: Arc::new(RuntimeFilterStats::new()),
90+
stats: Arc::new(RuntimeFilterStats::new(desc.id)),
9191
};
9292

9393
if let Some(existing) = entry

0 commit comments

Comments
 (0)