Skip to content

Commit

Permalink
perf: Remove redundant copying of batches after FilterExec (apache#835)
Browse files Browse the repository at this point in the history
* Use custom FilterExec that always uses take with a selection vector

* Remove CopyExec around FilterExec

* remove CopyExec on FilterExec inputs to joins

* remove copy before sort in some cases

* add comments

* cargo fmt

* bug fix: check for null when building selection vector

* revert

* use arrow kernel

* remove unused imports

* add criterion benchmark

* address initial feedback

* add ASF header

* fix missing imports

* Update native/core/src/execution/operators/filter.rs

Co-authored-by: Liang-Chi Hsieh <[email protected]>

---------

Co-authored-by: Liang-Chi Hsieh <[email protected]>
  • Loading branch information
andygrove and viirya authored Aug 16, 2024
1 parent 6051232 commit 3f826a3
Show file tree
Hide file tree
Showing 7 changed files with 575 additions and 6 deletions.
1 change: 1 addition & 0 deletions native/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions native/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ datafusion = { default-features = false, git = "https://github.com/apache/datafu
datafusion-functions = { git = "https://github.com/apache/datafusion.git", rev = "41.0.0-rc1", features = ["crypto_expressions"] }
datafusion-functions-nested = { git = "https://github.com/apache/datafusion.git", rev = "41.0.0-rc1", default-features = false }
datafusion-expr = { git = "https://github.com/apache/datafusion.git", rev = "41.0.0-rc1", default-features = false }
datafusion-execution = { git = "https://github.com/apache/datafusion.git", rev = "41.0.0-rc1", default-features = false }
datafusion-physical-plan = { git = "https://github.com/apache/datafusion.git", rev = "41.0.0-rc1", default-features = false }
datafusion-physical-expr-common = { git = "https://github.com/apache/datafusion.git", rev = "41.0.0-rc1", default-features = false }
datafusion-physical-expr = { git = "https://github.com/apache/datafusion.git", rev = "41.0.0-rc1", default-features = false }
Expand Down
5 changes: 5 additions & 0 deletions native/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ datafusion-common = { workspace = true }
datafusion = { workspace = true }
datafusion-functions-nested = { workspace = true }
datafusion-expr = { workspace = true }
datafusion-execution = { workspace = true }
datafusion-physical-expr-common = { workspace = true }
datafusion-physical-expr = { workspace = true }
once_cell = "1.18.0"
Expand Down Expand Up @@ -118,3 +119,7 @@ harness = false
[[bench]]
name = "parquet_decode"
harness = false

[[bench]]
name = "filter"
harness = false
111 changes: 111 additions & 0 deletions native/core/benches/filter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.use arrow::array::{ArrayRef, BooleanBuilder, Int32Builder, RecordBatch, StringBuilder};

use arrow::compute::filter_record_batch;
use arrow::datatypes::{DataType, Field, Schema};
use arrow_array::builder::{BooleanBuilder, Int32Builder, StringBuilder};
use arrow_array::{ArrayRef, RecordBatch};
use comet::execution::operators::comet_filter_record_batch;
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use std::sync::Arc;
use std::time::Duration;

fn criterion_benchmark(c: &mut Criterion) {
let mut group = c.benchmark_group("filter");

let num_rows = 8192;
let num_int_cols = 4;
let num_string_cols = 4;

let batch = create_record_batch(num_rows, num_int_cols, num_string_cols);

// create some different predicates
let mut predicate_select_few = BooleanBuilder::with_capacity(num_rows);
let mut predicate_select_many = BooleanBuilder::with_capacity(num_rows);
let mut predicate_select_all = BooleanBuilder::with_capacity(num_rows);
for i in 0..num_rows {
predicate_select_few.append_value(i % 10 == 0);
predicate_select_many.append_value(i % 10 > 0);
predicate_select_all.append_value(true);
}
let predicate_select_few = predicate_select_few.finish();
let predicate_select_many = predicate_select_many.finish();
let predicate_select_all = predicate_select_all.finish();

// baseline uses Arrow's filter_record_batch method
group.bench_function("arrow_filter_record_batch - few rows selected", |b| {
b.iter(|| filter_record_batch(black_box(&batch), black_box(&predicate_select_few)))
});
group.bench_function("arrow_filter_record_batch - many rows selected", |b| {
b.iter(|| filter_record_batch(black_box(&batch), black_box(&predicate_select_many)))
});
group.bench_function("arrow_filter_record_batch - all rows selected", |b| {
b.iter(|| filter_record_batch(black_box(&batch), black_box(&predicate_select_all)))
});

group.bench_function("comet_filter_record_batch - few rows selected", |b| {
b.iter(|| comet_filter_record_batch(black_box(&batch), black_box(&predicate_select_few)))
});
group.bench_function("comet_filter_record_batch - many rows selected", |b| {
b.iter(|| comet_filter_record_batch(black_box(&batch), black_box(&predicate_select_many)))
});
group.bench_function("comet_filter_record_batch - all rows selected", |b| {
b.iter(|| comet_filter_record_batch(black_box(&batch), black_box(&predicate_select_all)))
});

group.finish();
}

fn create_record_batch(num_rows: usize, num_int_cols: i32, num_string_cols: i32) -> RecordBatch {
let mut int32_builder = Int32Builder::with_capacity(num_rows);
let mut string_builder = StringBuilder::with_capacity(num_rows, num_rows * 32);
for i in 0..num_rows {
int32_builder.append_value(i as i32);
string_builder.append_value(format!("this is string #{i}"));
}
let int32_array = Arc::new(int32_builder.finish());
let string_array = Arc::new(string_builder.finish());

let mut fields = vec![];
let mut columns: Vec<ArrayRef> = vec![];
let mut i = 0;
for _ in 0..num_int_cols {
fields.push(Field::new(format!("c{i}"), DataType::Int32, false));
columns.push(int32_array.clone()); // note this is just copying a reference to the array
i += 1;
}
for _ in 0..num_string_cols {
fields.push(Field::new(format!("c{i}"), DataType::Utf8, false));
columns.push(string_array.clone()); // note this is just copying a reference to the array
i += 1;
}
let schema = Schema::new(fields);
RecordBatch::try_new(Arc::new(schema), columns).unwrap()
}

fn config() -> Criterion {
Criterion::default()
.measurement_time(Duration::from_millis(500))
.warm_up_time(Duration::from_millis(500))
}

criterion_group! {
name = benches;
config = config();
targets = criterion_benchmark
}
criterion_main!(benches);
8 changes: 2 additions & 6 deletions native/core/src/execution/datafusion/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
use super::expressions::EvalMode;
use crate::execution::datafusion::expressions::comet_scalar_funcs::create_comet_physical_fun;
use crate::execution::operators::CopyMode;
use crate::execution::operators::{CopyMode, FilterExec};
use crate::{
errors::ExpressionError,
execution::{
Expand Down Expand Up @@ -73,7 +73,6 @@ use datafusion::{
physical_optimizer::join_selection::swap_hash_join,
physical_plan::{
aggregates::{AggregateMode as DFAggregateMode, PhysicalGroupBy},
filter::FilterExec,
joins::{utils::JoinFilter, HashJoinExec, PartitionMode, SortMergeJoinExec},
limit::LocalLimitExec,
projection::ProjectionExec,
Expand Down Expand Up @@ -1780,10 +1779,7 @@ impl From<ExpressionError> for DataFusionError {
/// modification. This is used to determine if we need to copy the input batch to avoid
/// data corruption from reusing the input batch.
fn can_reuse_input_batch(op: &Arc<dyn ExecutionPlan>) -> bool {
if op.as_any().is::<ProjectionExec>()
|| op.as_any().is::<LocalLimitExec>()
|| op.as_any().is::<FilterExec>()
{
if op.as_any().is::<ProjectionExec>() || op.as_any().is::<LocalLimitExec>() {
can_reuse_input_batch(op.children()[0])
} else {
op.as_any().is::<ScanExec>()
Expand Down
Loading

0 comments on commit 3f826a3

Please sign in to comment.