Skip to content

Commit eda1b53

Browse files
committed
Update to arrow/parquet 57.1.0
1 parent 1cc9bcd commit eda1b53

File tree

27 files changed

+196
-121
lines changed

27 files changed

+196
-121
lines changed

Cargo.lock

Lines changed: 16 additions & 47 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion-cli/src/main.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -592,9 +592,9 @@ mod tests {
592592
+-----------------------------------+-----------------+---------------------+------+------------------+
593593
| filename | file_size_bytes | metadata_size_bytes | hits | extra |
594594
+-----------------------------------+-----------------+---------------------+------+------------------+
595-
| alltypes_plain.parquet | 1851 | 6957 | 2 | page_index=false |
596-
| alltypes_tiny_pages.parquet | 454233 | 267014 | 2 | page_index=true |
597-
| lz4_raw_compressed_larger.parquet | 380836 | 996 | 2 | page_index=false |
595+
| alltypes_plain.parquet | 1851 | 8882 | 2 | page_index=false |
596+
| alltypes_tiny_pages.parquet | 454233 | 269266 | 2 | page_index=true |
597+
| lz4_raw_compressed_larger.parquet | 380836 | 1347 | 2 | page_index=false |
598598
+-----------------------------------+-----------------+---------------------+------+------------------+
599599
");
600600

@@ -623,9 +623,9 @@ mod tests {
623623
+-----------------------------------+-----------------+---------------------+------+------------------+
624624
| filename | file_size_bytes | metadata_size_bytes | hits | extra |
625625
+-----------------------------------+-----------------+---------------------+------+------------------+
626-
| alltypes_plain.parquet | 1851 | 6957 | 5 | page_index=false |
627-
| alltypes_tiny_pages.parquet | 454233 | 267014 | 2 | page_index=true |
628-
| lz4_raw_compressed_larger.parquet | 380836 | 996 | 3 | page_index=false |
626+
| alltypes_plain.parquet | 1851 | 8882 | 5 | page_index=false |
627+
| alltypes_tiny_pages.parquet | 454233 | 269266 | 2 | page_index=true |
628+
| lz4_raw_compressed_larger.parquet | 380836 | 1347 | 3 | page_index=false |
629629
+-----------------------------------+-----------------+---------------------+------+------------------+
630630
");
631631

datafusion/common/src/config.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -699,6 +699,12 @@ config_namespace! {
699699
/// the filters are applied in the same order as written in the query
700700
pub reorder_filters: bool, default = false
701701

702+
/// (reading) Force the use of RowSelections for filter results, when
703+
/// pushdown_filters is enabled. If false, the reader will automatically
704+
/// choose between a RowSelection and a Bitmap based on the number and
705+
/// pattern of selected rows.
706+
pub force_filter_selections: bool, default = false
707+
702708
/// (reading) If true, parquet reader will read columns of `Utf8/Utf8Large` with `Utf8View`,
703709
/// and `Binary/BinaryLarge` with `BinaryView`.
704710
pub schema_force_view_types: bool, default = true

datafusion/common/src/file_options/parquet_writer.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,7 @@ impl ParquetOptions {
200200
metadata_size_hint: _,
201201
pushdown_filters: _,
202202
reorder_filters: _,
203+
force_filter_selections: _, // not used for writer props
203204
allow_single_file_parallelism: _,
204205
maximum_parallel_row_group_writers: _,
205206
maximum_buffered_record_batches_per_stream: _,
@@ -464,6 +465,7 @@ mod tests {
464465
metadata_size_hint: defaults.metadata_size_hint,
465466
pushdown_filters: defaults.pushdown_filters,
466467
reorder_filters: defaults.reorder_filters,
468+
force_filter_selections: defaults.force_filter_selections,
467469
allow_single_file_parallelism: defaults.allow_single_file_parallelism,
468470
maximum_parallel_row_group_writers: defaults
469471
.maximum_parallel_row_group_writers,
@@ -577,6 +579,7 @@ mod tests {
577579
metadata_size_hint: global_options_defaults.metadata_size_hint,
578580
pushdown_filters: global_options_defaults.pushdown_filters,
579581
reorder_filters: global_options_defaults.reorder_filters,
582+
force_filter_selections: global_options_defaults.force_filter_selections,
580583
allow_single_file_parallelism: global_options_defaults
581584
.allow_single_file_parallelism,
582585
maximum_parallel_row_group_writers: global_options_defaults

datafusion/core/src/physical_planner.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3106,7 +3106,7 @@ mod tests {
31063106

31073107
assert_contains!(
31083108
&e,
3109-
r#"Error during planning: Can not find compatible types to compare Boolean with [Struct("foo": Boolean), Utf8]"#
3109+
r#"Error during planning: Can not find compatible types to compare Boolean with [Struct("foo": non-null Boolean), Utf8]"#
31103110
);
31113111

31123112
Ok(())

datafusion/core/tests/dataframe/dataframe_functions.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -313,10 +313,10 @@ async fn test_fn_arrow_typeof() -> Result<()> {
313313
+----------------------+
314314
| arrow_typeof(test.l) |
315315
+----------------------+
316-
| List(nullable Int32) |
317-
| List(nullable Int32) |
318-
| List(nullable Int32) |
319-
| List(nullable Int32) |
316+
| List(Int32) |
317+
| List(Int32) |
318+
| List(Int32) |
319+
| List(Int32) |
320320
+----------------------+
321321
");
322322

datafusion/core/tests/parquet/filter_pushdown.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -636,6 +636,27 @@ async fn predicate_cache_pushdown_default() -> datafusion_common::Result<()> {
636636
config.options_mut().execution.parquet.pushdown_filters = true;
637637
let ctx = SessionContext::new_with_config(config);
638638
// The cache is on by default, and used when filter pushdown is enabled
639+
PredicateCacheTest {
640+
expected_inner_records: 8,
641+
expected_records: 7, // reads more than necessary from the cache as then another bitmap is applied
642+
}
643+
.run(&ctx)
644+
.await
645+
}
646+
647+
#[tokio::test]
648+
async fn predicate_cache_pushdown_default_selections_only(
649+
) -> datafusion_common::Result<()> {
650+
let mut config = SessionConfig::new();
651+
config.options_mut().execution.parquet.pushdown_filters = true;
652+
// forcing filter selections minimizes the number of rows read from the cache
653+
config
654+
.options_mut()
655+
.execution
656+
.parquet
657+
.force_filter_selections = true;
658+
let ctx = SessionContext::new_with_config(config);
659+
// The cache is on by default, and used when filter pushdown is enabled
639660
PredicateCacheTest {
640661
expected_inner_records: 8,
641662
expected_records: 4,

datafusion/core/tests/sql/select.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -222,10 +222,10 @@ async fn test_parameter_invalid_types() -> Result<()> {
222222
.await;
223223
assert_snapshot!(results.unwrap_err().strip_backtrace(),
224224
@r"
225-
type_coercion
226-
caused by
227-
Error during planning: Cannot infer common argument type for comparison operation List(nullable Int32) = Int32
228-
");
225+
type_coercion
226+
caused by
227+
Error during planning: Cannot infer common argument type for comparison operation List(Int32) = Int32
228+
");
229229
Ok(())
230230
}
231231

datafusion/datasource-parquet/src/opener.rs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,9 @@ use futures::{ready, Stream, StreamExt, TryStreamExt};
5353
use itertools::Itertools;
5454
use log::debug;
5555
use parquet::arrow::arrow_reader::metrics::ArrowReaderMetrics;
56-
use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions};
56+
use parquet::arrow::arrow_reader::{
57+
ArrowReaderMetadata, ArrowReaderOptions, RowSelectionPolicy,
58+
};
5759
use parquet::arrow::async_reader::AsyncFileReader;
5860
use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask};
5961
use parquet::file::metadata::{PageIndexPolicy, ParquetMetaDataReader};
@@ -87,6 +89,8 @@ pub(super) struct ParquetOpener {
8789
pub pushdown_filters: bool,
8890
/// Should the filters be reordered to optimize the scan?
8991
pub reorder_filters: bool,
92+
/// Should we force the reader to use RowSelections for filtering
93+
pub force_filter_selections: bool,
9094
/// Should the page index be read from parquet files, if present, to skip
9195
/// data pages
9296
pub enable_page_index: bool,
@@ -147,6 +151,7 @@ impl FileOpener for ParquetOpener {
147151
let partition_fields = self.partition_fields.clone();
148152
let reorder_predicates = self.reorder_filters;
149153
let pushdown_filters = self.pushdown_filters;
154+
let force_filter_selections = self.force_filter_selections;
150155
let coerce_int96 = self.coerce_int96;
151156
let enable_bloom_filter = self.enable_bloom_filter;
152157
let enable_row_group_stats_pruning = self.enable_row_group_stats_pruning;
@@ -347,6 +352,10 @@ impl FileOpener for ParquetOpener {
347352
}
348353
};
349354
};
355+
if force_filter_selections {
356+
builder =
357+
builder.with_row_selection_policy(RowSelectionPolicy::Selectors);
358+
}
350359

351360
// Determine which row groups to actually read. The idea is to skip
352361
// as many row groups as possible based on the metadata and query
@@ -887,6 +896,7 @@ mod test {
887896
partition_fields: vec![],
888897
pushdown_filters: false, // note that this is false!
889898
reorder_filters: false,
899+
force_filter_selections: false,
890900
enable_page_index: false,
891901
enable_bloom_filter: false,
892902
schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
@@ -960,6 +970,7 @@ mod test {
960970
))],
961971
pushdown_filters: false, // note that this is false!
962972
reorder_filters: false,
973+
force_filter_selections: false,
963974
enable_page_index: false,
964975
enable_bloom_filter: false,
965976
schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
@@ -1049,6 +1060,7 @@ mod test {
10491060
))],
10501061
pushdown_filters: false, // note that this is false!
10511062
reorder_filters: false,
1063+
force_filter_selections: false,
10521064
enable_page_index: false,
10531065
enable_bloom_filter: false,
10541066
schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
@@ -1141,6 +1153,7 @@ mod test {
11411153
))],
11421154
pushdown_filters: true, // note that this is true!
11431155
reorder_filters: true,
1156+
force_filter_selections: false,
11441157
enable_page_index: false,
11451158
enable_bloom_filter: false,
11461159
schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
@@ -1233,6 +1246,7 @@ mod test {
12331246
))],
12341247
pushdown_filters: false, // note that this is false!
12351248
reorder_filters: false,
1249+
force_filter_selections: false,
12361250
enable_page_index: false,
12371251
enable_bloom_filter: false,
12381252
schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
@@ -1383,6 +1397,7 @@ mod test {
13831397
partition_fields: vec![],
13841398
pushdown_filters: true,
13851399
reorder_filters: false,
1400+
force_filter_selections: false,
13861401
enable_page_index: false,
13871402
enable_bloom_filter: false,
13881403
schema_adapter_factory: Arc::new(CustomSchemaAdapterFactory),

0 commit comments

Comments
 (0)