Skip to content

Commit 661635e

Browse files
committed
First draft of dynamic filters for TopK
Closes apache#15037
1 parent 8f28cd9 commit 661635e

File tree

16 files changed

+96
-75
lines changed

16 files changed

+96
-75
lines changed

datafusion/core/tests/physical_optimizer/mod.rs

+1
Original file line numberDiff line numberDiff line change
@@ -28,3 +28,4 @@ mod projection_pushdown;
2828
mod replace_with_order_preserving_variants;
2929
mod sanity_checker;
3030
mod test_utils;
31+
mod topk_dynamic_filters;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
#[cfg(test)]
2+
mod tests {
3+
#[tokio::test]
4+
async fn test_topk_dynamic_filters() {
5+
// TODO: add a test
6+
}
7+
}

datafusion/datasource-avro/src/source.rs

-7
Original file line numberDiff line numberDiff line change
@@ -220,13 +220,6 @@ impl FileSource for AvroSource {
220220
Arc::new(conf)
221221
}
222222

223-
fn with_dynamic_filter(
224-
&self,
225-
_dynamic_filter: Arc<dyn datafusion_physical_plan::DynamicFilterSource>,
226-
) -> Arc<dyn FileSource> {
227-
Arc::new(Self { ..self.clone() })
228-
}
229-
230223
fn metrics(&self) -> &ExecutionPlanMetricsSet {
231224
&self.metrics
232225
}

datafusion/datasource-csv/src/source.rs

-8
Original file line numberDiff line numberDiff line change
@@ -605,14 +605,6 @@ impl FileSource for CsvSource {
605605
Arc::new(conf)
606606
}
607607

608-
fn with_dynamic_filter(
609-
&self,
610-
_dynamic_filter: Arc<dyn datafusion_physical_plan::DynamicFilterSource>,
611-
) -> Arc<dyn FileSource> {
612-
// AvroSource does not support dynamic filters, so we just return self
613-
Arc::new(self.clone())
614-
}
615-
616608
fn metrics(&self) -> &ExecutionPlanMetricsSet {
617609
&self.metrics
618610
}

datafusion/datasource-json/src/source.rs

-7
Original file line numberDiff line numberDiff line change
@@ -303,13 +303,6 @@ impl FileSource for JsonSource {
303303
Arc::new(Self { ..self.clone() })
304304
}
305305

306-
fn with_dynamic_filter(
307-
&self,
308-
_dynamic_filter: Arc<dyn datafusion_physical_plan::DynamicFilterSource>,
309-
) -> Arc<dyn FileSource> {
310-
Arc::new(Self { ..self.clone() })
311-
}
312-
313306
fn metrics(&self) -> &ExecutionPlanMetricsSet {
314307
&self.metrics
315308
}

datafusion/datasource-parquet/src/opener.rs

+3-9
Original file line numberDiff line numberDiff line change
@@ -109,15 +109,9 @@ impl FileOpener for ParquetOpener {
109109
let dynamic_filters = self
110110
.dynamic_filters
111111
.iter()
112-
.filter_map(|f| {
113-
f.current_filters().ok().and_then(|filters| {
114-
if filters.is_empty() {
115-
None
116-
} else {
117-
Some(filters)
118-
}
119-
})
120-
})
112+
.map(|f| f.current_filters())
113+
.collect::<Result<Vec<_>>>()?
114+
.into_iter()
121115
.flatten()
122116
.collect::<Vec<_>>();
123117
// Collect dynamic_filters into a single predicate by reducing with AND

datafusion/datasource-parquet/src/source.rs

-9
Original file line numberDiff line numberDiff line change
@@ -531,15 +531,6 @@ impl FileSource for ParquetSource {
531531
Arc::new(Self { ..self.clone() })
532532
}
533533

534-
fn with_dynamic_filter(
535-
&self,
536-
dynamic_filter: Arc<dyn DynamicFilterSource>,
537-
) -> Arc<dyn FileSource> {
538-
let mut conf = self.clone();
539-
conf.dynamic_filters.push(dynamic_filter);
540-
Arc::new(conf)
541-
}
542-
543534
fn metrics(&self) -> &ExecutionPlanMetricsSet {
544535
&self.metrics
545536
}

datafusion/datasource/src/file.rs

-5
Original file line numberDiff line numberDiff line change
@@ -52,11 +52,6 @@ pub trait FileSource: Send + Sync {
5252
fn with_schema(&self, schema: SchemaRef) -> Arc<dyn FileSource>;
5353
/// Initialize new instance with projection information
5454
fn with_projection(&self, config: &FileScanConfig) -> Arc<dyn FileSource>;
55-
/// Add dynamic filters to the file source
56-
fn with_dynamic_filter(
57-
&self,
58-
dynamic_filter: Arc<dyn DynamicFilterSource>,
59-
) -> Arc<dyn FileSource>;
6055
/// Initialize new instance with projected statistics
6156
fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource>;
6257
/// Return execution plan metrics

datafusion/datasource/src/file_scan_config.rs

+5-1
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,11 @@ use datafusion_physical_expr::{
4141
PhysicalSortExpr,
4242
};
4343
use datafusion_physical_plan::{
44-
display::{display_orderings, ProjectSchemaDisplay}, dynamic_filters::DynamicFilterSource, metrics::ExecutionPlanMetricsSet, projection::{all_alias_free_columns, new_projections_for_columns, ProjectionExec}, DisplayAs, DisplayFormatType, ExecutionPlan
44+
display::{display_orderings, ProjectSchemaDisplay},
45+
dynamic_filters::DynamicFilterSource,
46+
metrics::ExecutionPlanMetricsSet,
47+
projection::{all_alias_free_columns, new_projections_for_columns, ProjectionExec},
48+
DisplayAs, DisplayFormatType, ExecutionPlan,
4549
};
4650
use log::{debug, warn};
4751

datafusion/datasource/src/memory.rs

-8
Original file line numberDiff line numberDiff line change
@@ -491,14 +491,6 @@ impl DataSource for MemorySourceConfig {
491491
})
492492
.transpose()
493493
}
494-
495-
fn with_dynamic_filter(
496-
&self,
497-
_dynamic_filter: Arc<dyn datafusion_physical_plan::DynamicFilterSource>,
498-
) -> Arc<dyn DataSource> {
499-
// MemorySourceConfig does not support dynamic filters
500-
Arc::new(self.clone())
501-
}
502494
}
503495

504496
impl MemorySourceConfig {

datafusion/datasource/src/source.rs

-12
Original file line numberDiff line numberDiff line change
@@ -79,10 +79,6 @@ pub trait DataSource: Send + Sync + Debug {
7979
&self,
8080
_projection: &ProjectionExec,
8181
) -> datafusion_common::Result<Option<Arc<dyn ExecutionPlan>>>;
82-
fn with_dynamic_filter(
83-
&self,
84-
_dynamic_filter: Arc<dyn DynamicFilterSource>,
85-
) -> Arc<dyn DataSource>;
8682
}
8783

8884
/// [`ExecutionPlan`] handles different file formats like JSON, CSV, AVRO, ARROW, PARQUET
@@ -227,14 +223,6 @@ impl DataSourceExec {
227223
self
228224
}
229225

230-
pub fn with_dynamic_filters(
231-
mut self,
232-
dynamic_filter: Arc<dyn DynamicFilterSource>,
233-
) -> Self {
234-
self.data_source = self.data_source.with_dynamic_filter(dynamic_filter);
235-
self
236-
}
237-
238226
fn compute_properties(data_source: Arc<dyn DataSource>) -> PlanProperties {
239227
PlanProperties::new(
240228
data_source.eq_properties(),

datafusion/datasource/src/test_util.rs

-7
Original file line numberDiff line numberDiff line change
@@ -59,13 +59,6 @@ impl FileSource for MockSource {
5959
Arc::new(Self { ..self.clone() })
6060
}
6161

62-
fn with_dynamic_filter(
63-
&self,
64-
_dynamic_filter: Arc<dyn datafusion_physical_plan::DynamicFilterSource>,
65-
) -> Arc<dyn FileSource> {
66-
Arc::new(Self { ..self.clone() })
67-
}
68-
6962
fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource> {
7063
let mut source = self.clone();
7164
source.projected_statistics = Some(statistics);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! An optimizer rule that detects TopK operations that can generate dynamic filters to be pushed down into file scans
19+
20+
use std::sync::Arc;
21+
22+
use crate::PhysicalOptimizerRule;
23+
use datafusion_common::config::ConfigOptions;
24+
use datafusion_common::tree_node::{TransformedResult, TreeNode};
25+
use datafusion_common::Result;
26+
use datafusion_physical_plan::ExecutionPlan;
27+
28+
/// An optimizer rule that passes a TopK as a DynamicFilterSource to DataSourceExec executors.
29+
///
30+
/// This optimizer looks for TopK operators in the plan and connects them to compatible
31+
/// data sources by registering the TopK as a source of dynamic filters.
32+
///
33+
/// When a data source is found that can benefit from a TopK's filter, the source is
34+
/// modified to include the TopK as a dynamic filter source via `with_dynamic_filter_source`.
35+
/// During execution, the data source will then consult the TopK's current filter state
36+
/// to determine which files or partitions can be skipped.
37+
#[derive(Debug)]
38+
pub struct TopKDynamicFilters {}
39+
40+
impl PhysicalOptimizerRule for TopKDynamicFilters {
41+
fn optimize(
42+
&self,
43+
plan: Arc<dyn ExecutionPlan>,
44+
config: &ConfigOptions,
45+
) -> Result<Arc<dyn ExecutionPlan>> {
46+
if config.optimizer.enable_dynamic_filter_pushdown {
47+
plan.transform_down(|_plan| {
48+
// TODO: recurse only traversing approved nodes (e.g., FilterExec, ProjectionExec, SortExec, CoalesceBatchesExec, RepartitionExec; namely not aggregations, joins, etc.)
49+
// Collect any DataSourceExecs that can use dynamic filters and register the TopK as a source
50+
todo!();
51+
})
52+
.data()
53+
} else {
54+
Ok(plan)
55+
}
56+
}
57+
58+
fn name(&self) -> &str {
59+
"TopKDynamicFilters"
60+
}
61+
62+
fn schema_check(&self) -> bool {
63+
true
64+
}
65+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
use std::sync::Arc;
2+
3+
use datafusion_common::Result;
4+
use datafusion_physical_expr::PhysicalExpr;
5+
6+
/// A source of dynamic runtime filters.
7+
///
8+
/// During query execution, operators implementing this trait can provide
9+
/// filter expressions that other operators can use to dynamically prune data.
10+
pub trait DynamicFilterSource: Send + Sync + std::fmt::Debug + 'static {
11+
/// Returns a list of filter expressions that can be used for dynamic pruning.
12+
fn current_filters(&self) -> Result<Vec<Arc<dyn PhysicalExpr>>>;
13+
}

datafusion/physical-plan/src/lib.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ pub use datafusion_physical_expr::{
4040
};
4141

4242
pub use crate::display::{DefaultDisplay, DisplayAs, DisplayFormatType, VerboseDisplay};
43-
pub use crate::dynamic_filter::DynamicFilterSource;
43+
pub use crate::dynamic_filters::DynamicFilterSource;
4444
pub use crate::execution_plan::{
4545
collect, collect_partitioned, displayable, execute_input_stream, execute_stream,
4646
execute_stream_partitioned, get_plan_string, with_new_children_if_necessary,

datafusion/physical-plan/src/topk/mod.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ use std::{cmp::Ordering, collections::BinaryHeap, sync::Arc};
2828
use datafusion_expr::Operator;
2929

3030
use super::metrics::{BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder};
31-
use crate::dynamic_filter::DynamicFilterSource;
31+
use crate::dynamic_filters::DynamicFilterSource;
3232
use crate::spill::get_record_batch_memory_size;
3333
use crate::{stream::RecordBatchStreamAdapter, SendableRecordBatchStream};
3434
use arrow::array::{Array, ArrayRef, RecordBatch};

0 commit comments

Comments
 (0)