Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix predicate pushdown for custom SchemaAdapters #15263

Merged
merged 19 commits into from
Mar 19, 2025
1 change: 0 additions & 1 deletion datafusion-testing
Submodule datafusion-testing deleted from 3462ea
7 changes: 0 additions & 7 deletions datafusion/core/src/datasource/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,12 +347,5 @@ mod tests {

Ok(RecordBatch::try_new(schema, new_columns).unwrap())
}

fn map_partial_batch(
&self,
batch: RecordBatch,
) -> datafusion_common::Result<RecordBatch> {
self.map_batch(batch)
}
}
}
321 changes: 321 additions & 0 deletions datafusion/core/src/datasource/physical_plan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,327 @@ mod tests {
)
}

#[tokio::test]
async fn test_pushdown_with_missing_column_in_file() {
Comment on lines +227 to +228
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replacing the unit test with a more e2e tests that shows that things work as expected

let c1: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3]));

let file_schema =
Arc::new(Schema::new(vec![Field::new("c1", DataType::Int32, true)]));

let table_schema = Arc::new(Schema::new(vec![
Field::new("c1", DataType::Int32, true),
Field::new("c2", DataType::Int32, true),
]));

let batch = RecordBatch::try_new(file_schema.clone(), vec![c1]).unwrap();

// Since c2 is missing from the file and we didn't supply a custom `SchemaAdapterFactory`,
// the default behavior is to fill in missing columns with nulls.
// Thus this predicate will come back as false.
let filter = col("c2").eq(lit(1_i32));
let rt = RoundTrip::new()
.with_schema(table_schema.clone())
.with_predicate(filter.clone())
.with_pushdown_predicate()
.round_trip(vec![batch.clone()])
.await;
let total_rows = rt
.batches
.unwrap()
.iter()
.map(|b| b.num_rows())
.sum::<usize>();
assert_eq!(total_rows, 0, "Expected no rows to match the predicate");
let metrics = rt.parquet_exec.metrics().unwrap();
let metric = get_value(&metrics, "pushdown_rows_pruned");
assert_eq!(metric, 3, "Expected all rows to be pruned");

// If we excplicitly allow nulls the rest of the predicate should work
let filter = col("c2").is_null().and(col("c1").eq(lit(1_i32)));
let rt = RoundTrip::new()
.with_schema(table_schema.clone())
.with_predicate(filter.clone())
.with_pushdown_predicate()
.round_trip(vec![batch.clone()])
.await;
let batches = rt.batches.unwrap();
#[rustfmt::skip]
let expected = [
"+----+----+",
"| c1 | c2 |",
"+----+----+",
"| 1 | |",
"+----+----+",
];
assert_batches_sorted_eq!(expected, &batches);
let metrics = rt.parquet_exec.metrics().unwrap();
let metric = get_value(&metrics, "pushdown_rows_pruned");
assert_eq!(metric, 2, "Expected all rows to be pruned");
}

#[tokio::test]
async fn test_pushdown_with_missing_column_in_file_multiple_types() {
let c1: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3]));

let file_schema =
Arc::new(Schema::new(vec![Field::new("c1", DataType::Int32, true)]));

let table_schema = Arc::new(Schema::new(vec![
Field::new("c1", DataType::Int32, true),
Field::new("c2", DataType::Utf8, true),
]));

let batch = RecordBatch::try_new(file_schema.clone(), vec![c1]).unwrap();

// Since c2 is missing from the file and we didn't supply a custom `SchemaAdapterFactory`,
// the default behavior is to fill in missing columns with nulls.
// Thus this predicate will come back as false.
let filter = col("c2").eq(lit("abc"));
let rt = RoundTrip::new()
.with_schema(table_schema.clone())
.with_predicate(filter.clone())
.with_pushdown_predicate()
.round_trip(vec![batch.clone()])
.await;
let total_rows = rt
.batches
.unwrap()
.iter()
.map(|b| b.num_rows())
.sum::<usize>();
assert_eq!(total_rows, 0, "Expected no rows to match the predicate");
let metrics = rt.parquet_exec.metrics().unwrap();
let metric = get_value(&metrics, "pushdown_rows_pruned");
assert_eq!(metric, 3, "Expected all rows to be pruned");

// If we excplicitly allow nulls the rest of the predicate should work
let filter = col("c2").is_null().and(col("c1").eq(lit(1_i32)));
let rt = RoundTrip::new()
.with_schema(table_schema.clone())
.with_predicate(filter.clone())
.with_pushdown_predicate()
.round_trip(vec![batch.clone()])
.await;
let batches = rt.batches.unwrap();
#[rustfmt::skip]
let expected = [
"+----+----+",
"| c1 | c2 |",
"+----+----+",
"| 1 | |",
"+----+----+",
];
assert_batches_sorted_eq!(expected, &batches);
let metrics = rt.parquet_exec.metrics().unwrap();
let metric = get_value(&metrics, "pushdown_rows_pruned");
assert_eq!(metric, 2, "Expected all rows to be pruned");
}

#[tokio::test]
async fn test_pushdown_with_missing_middle_column() {
let c1: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3]));
let c3 = Arc::new(Int32Array::from(vec![7, 8, 9]));

let file_schema = Arc::new(Schema::new(vec![
Field::new("c1", DataType::Int32, true),
Field::new("c3", DataType::Int32, true),
]));

let table_schema = Arc::new(Schema::new(vec![
Field::new("c1", DataType::Int32, true),
Field::new("c2", DataType::Utf8, true),
Field::new("c3", DataType::Int32, true),
]));

let batch = RecordBatch::try_new(file_schema.clone(), vec![c1, c3]).unwrap();

// Since c2 is missing from the file and we didn't supply a custom `SchemaAdapterFactory`,
// the default behavior is to fill in missing columns with nulls.
// Thus this predicate will come back as false.
let filter = col("c2").eq(lit("abc"));
let rt = RoundTrip::new()
.with_schema(table_schema.clone())
.with_predicate(filter.clone())
.with_pushdown_predicate()
.round_trip(vec![batch.clone()])
.await;
let total_rows = rt
.batches
.unwrap()
.iter()
.map(|b| b.num_rows())
.sum::<usize>();
assert_eq!(total_rows, 0, "Expected no rows to match the predicate");
let metrics = rt.parquet_exec.metrics().unwrap();
let metric = get_value(&metrics, "pushdown_rows_pruned");
assert_eq!(metric, 3, "Expected all rows to be pruned");

// If we excplicitly allow nulls the rest of the predicate should work
let filter = col("c2").is_null().and(col("c1").eq(lit(1_i32)));
let rt = RoundTrip::new()
.with_schema(table_schema.clone())
.with_predicate(filter.clone())
.with_pushdown_predicate()
.round_trip(vec![batch.clone()])
.await;
let batches = rt.batches.unwrap();
#[rustfmt::skip]
let expected = [
"+----+----+----+",
"| c1 | c2 | c3 |",
"+----+----+----+",
"| 1 | | 7 |",
"+----+----+----+",
];
assert_batches_sorted_eq!(expected, &batches);
let metrics = rt.parquet_exec.metrics().unwrap();
let metric = get_value(&metrics, "pushdown_rows_pruned");
assert_eq!(metric, 2, "Expected all rows to be pruned");
}

#[tokio::test]
async fn test_pushdown_with_file_column_order_mismatch() {
let c3 = Arc::new(Int32Array::from(vec![7, 8, 9]));

let file_schema = Arc::new(Schema::new(vec![
Field::new("c3", DataType::Int32, true),
Field::new("c3", DataType::Int32, true),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

was it intentional to repeat the "c3" column here?

Copy link
Contributor Author

@adriangb adriangb Mar 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes because that's what you suggested in #15263 (comment) (and a file schema like c3, c3), I was confused by what you meant so maybe I misunderstood but I just ran with it. Maybe c3,c1 is a better test?

]));

let table_schema = Arc::new(Schema::new(vec![
Field::new("c1", DataType::Int32, true),
Field::new("c2", DataType::Utf8, true),
Field::new("c3", DataType::Int32, true),
]));

let batch =
RecordBatch::try_new(file_schema.clone(), vec![c3.clone(), c3]).unwrap();

// Since c2 is missing from the file and we didn't supply a custom `SchemaAdapterFactory`,
// the default behavior is to fill in missing columns with nulls.
// Thus this predicate will come back as false.
let filter = col("c2").eq(lit("abc"));
let rt = RoundTrip::new()
.with_schema(table_schema.clone())
.with_predicate(filter.clone())
.with_pushdown_predicate()
.round_trip(vec![batch.clone()])
.await;
let total_rows = rt
.batches
.unwrap()
.iter()
.map(|b| b.num_rows())
.sum::<usize>();
assert_eq!(total_rows, 0, "Expected no rows to match the predicate");
let metrics = rt.parquet_exec.metrics().unwrap();
let metric = get_value(&metrics, "pushdown_rows_pruned");
assert_eq!(metric, 3, "Expected all rows to be pruned");

// If we excplicitly allow nulls the rest of the predicate should work
let filter = col("c2").is_null().and(col("c3").eq(lit(7_i32)));
let rt = RoundTrip::new()
.with_schema(table_schema.clone())
.with_predicate(filter.clone())
.with_pushdown_predicate()
.round_trip(vec![batch.clone()])
.await;
let batches = rt.batches.unwrap();
#[rustfmt::skip]
let expected = [
"+----+----+----+",
"| c1 | c2 | c3 |",
"+----+----+----+",
"| | | 7 |",
"+----+----+----+",
];
assert_batches_sorted_eq!(expected, &batches);
let metrics = rt.parquet_exec.metrics().unwrap();
let metric = get_value(&metrics, "pushdown_rows_pruned");
assert_eq!(metric, 2, "Expected all rows to be pruned");
}

#[tokio::test]
async fn test_pushdown_with_missing_column_nested_conditions() {
// Create test data with c1 and c3 columns
let c1: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5]));
let c3: ArrayRef = Arc::new(Int32Array::from(vec![10, 20, 30, 40, 50]));

let file_schema = Arc::new(Schema::new(vec![
Field::new("c1", DataType::Int32, true),
Field::new("c3", DataType::Int32, true),
]));

let table_schema = Arc::new(Schema::new(vec![
Field::new("c1", DataType::Int32, true),
Field::new("c2", DataType::Int32, true),
Field::new("c3", DataType::Int32, true),
]));

let batch = RecordBatch::try_new(file_schema.clone(), vec![c1, c3]).unwrap();

// Test with complex nested AND/OR:
// (c1 = 1 OR c2 = 5) AND (c3 = 10 OR c2 IS NULL)
// Should return 1 row where c1=1 AND c3=10 (since c2 IS NULL is always true)
let filter = col("c1")
.eq(lit(1_i32))
.or(col("c2").eq(lit(5_i32)))
.and(col("c3").eq(lit(10_i32)).or(col("c2").is_null()));

let rt = RoundTrip::new()
.with_schema(table_schema.clone())
.with_predicate(filter.clone())
.with_pushdown_predicate()
.round_trip(vec![batch.clone()])
.await;

let batches = rt.batches.unwrap();
#[rustfmt::skip]
let expected = [
"+----+----+----+",
"| c1 | c2 | c3 |",
"+----+----+----+",
"| 1 | | 10 |",
"+----+----+----+",
];
assert_batches_sorted_eq!(expected, &batches);
let metrics = rt.parquet_exec.metrics().unwrap();
let metric = get_value(&metrics, "pushdown_rows_pruned");
assert_eq!(metric, 4, "Expected 4 rows to be pruned");

// Test a more complex nested condition:
// (c1 < 3 AND c2 IS NOT NULL) OR (c3 > 20 AND c2 IS NULL)
// First part should return 0 rows (c2 IS NOT NULL is always false)
// Second part should return rows where c3 > 20 (3 rows: where c3 is 30, 40, 50)
let filter = col("c1")
.lt(lit(3_i32))
.and(col("c2").is_not_null())
.or(col("c3").gt(lit(20_i32)).and(col("c2").is_null()));

let rt = RoundTrip::new()
.with_schema(table_schema)
.with_predicate(filter.clone())
.with_pushdown_predicate()
.round_trip(vec![batch])
.await;

let batches = rt.batches.unwrap();
#[rustfmt::skip]
let expected = [
"+----+----+----+",
"| c1 | c2 | c3 |",
"+----+----+----+",
"| 3 | | 30 |",
"| 4 | | 40 |",
"| 5 | | 50 |",
"+----+----+----+",
];
assert_batches_sorted_eq!(expected, &batches);
let metrics = rt.parquet_exec.metrics().unwrap();
let metric = get_value(&metrics, "pushdown_rows_pruned");
assert_eq!(metric, 2, "Expected 2 rows to be pruned");
}

#[tokio::test]
async fn evolved_schema() {
let c1: ArrayRef =
Expand Down
3 changes: 2 additions & 1 deletion datafusion/datasource-parquet/src/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ impl FileOpener for ParquetOpener {

let projected_schema =
SchemaRef::from(self.table_schema.project(&self.projection)?);
let schema_adapter_factory = Arc::clone(&self.schema_adapter_factory);
let schema_adapter = self
.schema_adapter_factory
.create(projected_schema, Arc::clone(&self.table_schema));
Expand Down Expand Up @@ -173,7 +174,7 @@ impl FileOpener for ParquetOpener {
builder.metadata(),
reorder_predicates,
&file_metrics,
Arc::clone(&schema_mapping),
&schema_adapter_factory,
);

match row_filter {
Expand Down
Loading