-
Notifications
You must be signed in to change notification settings - Fork 1.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix predicate pushdown for custom SchemaAdapters #15263
Conversation
#[tokio::test] | ||
async fn test_pushdown_with_missing_column_in_file() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Replacing the unit test with a more e2e tests that shows that things work as expected
// ArrowPredicate::evaluate is passed columns in the order they appear in the file | ||
// If the predicate has multiple columns, we therefore must project the columns based | ||
// on the order they appear in the file | ||
let projection = match candidate.projection.len() { | ||
0 | 1 => vec![], | ||
2.. => remap_projection(&candidate.projection), | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is no longer necessary and is handled by the SchemaAdapter. Might be nice to have a test to point to to confirm.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See #15263 (comment)
let batch = self.schema_mapping.map_partial_batch(batch)?; | ||
fn evaluate(&mut self, batch: RecordBatch) -> ArrowResult<BooleanArray> { | ||
let batch = self.schema_mapping.map_batch(batch)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here is where we ditch map_partial_batch
in favor of map_batch
/// Computes the projection required to go from the file's schema order to the projected | ||
/// order expected by this filter | ||
/// | ||
/// Effectively this computes the rank of each element in `src` | ||
fn remap_projection(src: &[usize]) -> Vec<usize> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe this is taken care of by SchemaAdapter now 😄. Again it would be nice to be able to point at a (maybe existing) test to confirm. Maybe I need to try removing this on main
and confirming which tests break.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay I can confirm that
async fn single_file() { |
remap_projection
with a no-op. So I think this change is 👍🏻
let file_schema = Arc::new(file_schema.clone()); | ||
let table_schema = Arc::new(table_schema.clone()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could change the signature of build_row_filter
since the caller might have an Arc
'd version already, but since it's pub
that would introduce more breaking changes and the clone seemed cheap enough. Open to doing that though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you can avoid cloning the schema with a pretty simple change. Here is a proposal:
// If a column exists in the table schema but not the file schema it should be rewritten to a null expression | ||
#[test] | ||
fn test_filter_candidate_builder_rewrite_missing_column() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See newly added e2e test
) | ||
.expect("creating filter predicate"); | ||
|
||
let mut parquet_reader = parquet_reader_builder | ||
.with_projection(row_filter.projection().clone()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved down because it needs access to the row filter's projection
let table_schema = get_basic_table_schema(); | ||
|
||
let file_schema = Schema::new(vec![Field::new("str_col", DataType::Utf8, true)]); | ||
let file_schema = | ||
Schema::new(vec![Field::new("string_col", DataType::Utf8, true)]); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is no str_col
in the data returned by get_basic_table_schema()
but there is string_col
.
cc @jeffreyssmith2nd as the original author of |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @adriangb -- it is the mark of a great engineer to fix bugs by deleting code in my mind
I think the only thing this PR needs is a few more tests (I specified what they are below). I do think pydantic#9 is worth considering too though.
FYI @itsjunetime who worked on #12135 and @jeffreyssmith2nd who worked on #10716
/// After visiting all children, rewrite column references to nulls if | ||
/// they are not in the file schema. | ||
/// We do this because they won't be relevant if they're not in the file schema, since that's | ||
/// the only thing we're dealing with here as this is only used for the parquet pushdown during | ||
/// scanning | ||
fn f_up( | ||
&mut self, | ||
expr: Arc<dyn PhysicalExpr>, | ||
) -> Result<Transformed<Arc<dyn PhysicalExpr>>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree adding an API for stats on the new column would be 💯
/// After visiting all children, rewrite column references to nulls if | ||
/// they are not in the file schema. | ||
/// We do this because they won't be relevant if they're not in the file schema, since that's | ||
/// the only thing we're dealing with here as this is only used for the parquet pushdown during | ||
/// scanning | ||
fn f_up( | ||
&mut self, | ||
expr: Arc<dyn PhysicalExpr>, | ||
) -> Result<Transformed<Arc<dyn PhysicalExpr>>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do think in general we need to be correct first, then fast.
As someone once told me "if you don't constraint it (the compiler) to be corrrect, I'll make it as fast as you want!"
let file_schema = Arc::new(file_schema.clone()); | ||
let table_schema = Arc::new(table_schema.clone()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you can avoid cloning the schema with a pretty simple change. Here is a proposal:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @adriangb I think this one is ready to go except for the datafusion-testing
pin
Without fixing the pin, I think the extended tests are going to fail on main
For example, running
INCLUDE_SQLITE=true nice cargo test --profile release-nonlto --test sqllogictests
I think will error
Here is a PR to revert the change
|
||
let file_schema = Arc::new(Schema::new(vec![ | ||
Field::new("c3", DataType::Int32, true), | ||
Field::new("c3", DataType::Int32, true), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
was it intentional to repeat the "c3" column here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes because that's what you suggested in #15263 (comment) (and a file schema like c3, c3
), I was confused by what you meant so maybe I misunderstood but I just ran with it. Maybe c3,c1
is a better test?
Fix datafusion testing pin
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
2620c6a 😄 |
Thanks for bearing with me here |
🚀 |
Amazing thank you so much for pushing this forward Andrew! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤦 somehow I didn't see the datafusion-testing link got deleted in this PR
I made a PR to fix it: https://github.com/apache/datafusion/pull/15318/files
Oh I'm terribly sorry that's probably my bad... I constantly have issues with those submodules and have not yet spent the time to figure out how to avoid it. |
* wip * wip * wip * add tests * wip * wip * fix * fix * fix * better test * more reverts * fix * Reduce Schema clones in predicate * add more tests * add another test * Fix datafusion testing pin * fix clippy --------- Co-authored-by: Andrew Lamb <[email protected]>
no worries -- I think the trick for me is if I ever see a change to the submodule do |
See #15220 (comment).