From 42d44035622b12c5be9395d24272b48d6176dd22 Mon Sep 17 00:00:00 2001 From: blaginin Date: Sat, 15 Feb 2025 21:18:54 +0000 Subject: [PATCH 01/12] Set projection before configuring the source --- .../physical_plan/file_scan_config.rs | 31 +++++++++++++++++++ .../proto/src/physical_plan/from_proto.rs | 20 +++++++----- 2 files changed, 43 insertions(+), 8 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs index 3708fe6abd5e..dbd683d30b6a 100644 --- a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs +++ b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs @@ -280,6 +280,9 @@ impl FileScanConfig { /// # Parameters: /// * `object_store_url`: See [`Self::object_store_url`] /// * `file_schema`: See [`Self::file_schema`] + // + // todo: should this function (without projection) exist at all? + // config.with_source depends on projection being set correctly... pub fn new( object_store_url: ObjectStoreUrl, file_schema: SchemaRef, @@ -306,6 +309,34 @@ impl FileScanConfig { config } + /// Create a new [`FileScanConfig`] also setting the projection + pub fn new_with_projection( + object_store_url: ObjectStoreUrl, + file_schema: SchemaRef, + file_source: Arc, + projection: Option>, + ) -> Self { + let statistics = Statistics::new_unknown(&file_schema); + + let mut config = Self { + object_store_url, + file_schema, + file_groups: vec![], + constraints: Constraints::empty(), + statistics, + projection, + limit: None, + table_partition_cols: vec![], + output_ordering: vec![], + file_compression_type: FileCompressionType::UNCOMPRESSED, + new_lines_in_values: false, + source: Arc::clone(&file_source), + }; + + config = config.with_source(Arc::clone(&file_source)); + config + } + /// Set the file source pub fn with_source(mut self, source: Arc) -> Self { let ( diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs index 34fb5bb6ddc1..ab0d5b9518f6 100644 --- a/datafusion/proto/src/physical_plan/from_proto.rs +++ b/datafusion/proto/src/physical_plan/from_proto.rs @@ -538,14 +538,18 @@ pub fn parse_protobuf_file_scan_config( output_ordering.push(sort_expr); } - Ok(FileScanConfig::new(object_store_url, file_schema, source) - .with_file_groups(file_groups) - .with_constraints(constraints) - .with_statistics(statistics) - .with_projection(projection) - .with_limit(proto.limit.as_ref().map(|sl| sl.limit as usize)) - .with_table_partition_cols(table_partition_cols) - .with_output_ordering(output_ordering)) + Ok(FileScanConfig::new_with_projection( + object_store_url, + file_schema, + source, + projection, + ) + .with_file_groups(file_groups) + .with_constraints(constraints) + .with_statistics(statistics) + .with_limit(proto.limit.as_ref().map(|sl| sl.limit as usize)) + .with_table_partition_cols(table_partition_cols) + .with_output_ordering(output_ordering)) } impl TryFrom<&protobuf::PartitionedFile> for PartitionedFile { From a8709ff42ab723f89568bda1bc8aaa19b02fff5b Mon Sep 17 00:00:00 2001 From: blaginin Date: Mon, 17 Feb 2025 20:19:42 +0000 Subject: [PATCH 02/12] Refresh source manually --- .../physical_plan/file_scan_config.rs | 45 ++++++------------- .../proto/src/physical_plan/from_proto.rs | 21 ++++----- 2 files changed, 23 insertions(+), 43 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs index dbd683d30b6a..4401bdb4f5ad 100644 --- a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs +++ b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs @@ -280,9 +280,6 @@ impl FileScanConfig { /// # Parameters: /// * `object_store_url`: See [`Self::object_store_url`] /// * `file_schema`: See [`Self::file_schema`] - // - // todo: should this function (without projection) exist at all? - // config.with_source depends on projection being set correctly... pub fn new( object_store_url: ObjectStoreUrl, file_schema: SchemaRef, @@ -309,34 +306,6 @@ impl FileScanConfig { config } - /// Create a new [`FileScanConfig`] also setting the projection - pub fn new_with_projection( - object_store_url: ObjectStoreUrl, - file_schema: SchemaRef, - file_source: Arc, - projection: Option>, - ) -> Self { - let statistics = Statistics::new_unknown(&file_schema); - - let mut config = Self { - object_store_url, - file_schema, - file_groups: vec![], - constraints: Constraints::empty(), - statistics, - projection, - limit: None, - table_partition_cols: vec![], - output_ordering: vec![], - file_compression_type: FileCompressionType::UNCOMPRESSED, - new_lines_in_values: false, - source: Arc::clone(&file_source), - }; - - config = config.with_source(Arc::clone(&file_source)); - config - } - /// Set the file source pub fn with_source(mut self, source: Arc) -> Self { let ( @@ -349,6 +318,19 @@ impl FileScanConfig { self } + /// Refreshes the source with updated statistics. + /// Should be called after projection-related fields are updated. + pub fn refresh_source(mut self) -> Self { + let ( + _projected_schema, + _constraints, + projected_statistics, + _projected_output_ordering, + ) = self.project(); + self.source = self.source.with_statistics(projected_statistics); + self + } + /// Set the table constraints of the files pub fn with_constraints(mut self, constraints: Constraints) -> Self { self.constraints = constraints; @@ -617,6 +599,7 @@ impl FileScanConfig { } /// Returns the file_source + /// If projection has changed since the last call, use [`FileScanConfig::refresh_source`] before pub fn file_source(&self) -> &Arc { &self.source } diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs index ab0d5b9518f6..8f392b139161 100644 --- a/datafusion/proto/src/physical_plan/from_proto.rs +++ b/datafusion/proto/src/physical_plan/from_proto.rs @@ -538,18 +538,15 @@ pub fn parse_protobuf_file_scan_config( output_ordering.push(sort_expr); } - Ok(FileScanConfig::new_with_projection( - object_store_url, - file_schema, - source, - projection, - ) - .with_file_groups(file_groups) - .with_constraints(constraints) - .with_statistics(statistics) - .with_limit(proto.limit.as_ref().map(|sl| sl.limit as usize)) - .with_table_partition_cols(table_partition_cols) - .with_output_ordering(output_ordering)) + Ok(FileScanConfig::new(object_store_url, file_schema, source) + .with_projection(projection) + .with_file_groups(file_groups) + .with_constraints(constraints) + .with_statistics(statistics) + .with_limit(proto.limit.as_ref().map(|sl| sl.limit as usize)) + .with_table_partition_cols(table_partition_cols) + .with_output_ordering(output_ordering) + .refresh_source()) } impl TryFrom<&protobuf::PartitionedFile> for PartitionedFile { From 1b8a39c4db40033ad3ee0e33a739195c56c707ed Mon Sep 17 00:00:00 2001 From: blaginin Date: Wed, 19 Feb 2025 21:11:43 +0000 Subject: [PATCH 03/12] Update statistics on proj / partition columns update --- .../physical_plan/file_scan_config.rs | 61 ++++++++++++++----- .../proto/src/physical_plan/from_proto.rs | 5 +- 2 files changed, 47 insertions(+), 19 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs index 311c68ca7c9b..097e7370ea09 100644 --- a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs +++ b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs @@ -330,19 +330,6 @@ impl FileScanConfig { self } - /// Refreshes the source with updated statistics. - /// Should be called after projection-related fields are updated. - pub fn refresh_source(mut self) -> Self { - let ( - _projected_schema, - _constraints, - projected_statistics, - _projected_output_ordering, - ) = self.project(); - self.source = self.source.with_statistics(projected_statistics); - self - } - /// Set the table constraints of the files pub fn with_constraints(mut self, constraints: Constraints) -> Self { self.constraints = constraints; @@ -358,6 +345,32 @@ impl FileScanConfig { /// Set the projection of the files pub fn with_projection(mut self, projection: Option>) -> Self { self.projection = projection; + self.with_updated_statistics() + } + + // Update source statistics with the current projection data + fn with_updated_statistics(mut self) -> Self { + let max_projection_column = *self + .projection + .as_ref() + .and_then(|proj| proj.iter().max()) + .unwrap_or(&0); + + if max_projection_column + >= self.file_schema.fields().len() + self.table_partition_cols.len() + { + // we don't yet have enough information (file schema info or partition column info) to perform projection + return self; + } + + let ( + _projected_schema, + _constraints, + projected_statistics, + _projected_output_ordering, + ) = self.project(); + + self.source = self.source.with_statistics(projected_statistics); self } @@ -396,7 +409,7 @@ impl FileScanConfig { /// Set the partitioning columns of the files pub fn with_table_partition_cols(mut self, table_partition_cols: Vec) -> Self { self.table_partition_cols = table_partition_cols; - self + self.with_updated_statistics() } /// Set the output ordering of the files @@ -611,7 +624,6 @@ impl FileScanConfig { } /// Returns the file_source - /// If projection has changed since the last call, use [`FileScanConfig::refresh_source`] before pub fn file_source(&self) -> &Arc { &self.source } @@ -751,6 +763,13 @@ mod tests { ), ]; // create a projected schema + + let statistics = Statistics { + num_rows: Precision::Inexact(3), + total_byte_size: Precision::Absent, + column_statistics: Statistics::unknown_column(&file_batch.schema()), + }; + let conf = config_for_projection( file_batch.schema(), // keep all cols from file and 2 from partitioning @@ -761,10 +780,20 @@ mod tests { file_batch.schema().fields().len(), file_batch.schema().fields().len() + 2, ]), - Statistics::new_unknown(&file_batch.schema()), + statistics.clone(), to_partition_cols(partition_cols.clone()), ); + + let source_statistics = conf.source.statistics().unwrap(); + + // statistics should be preserved and passed into the source + assert_eq!(source_statistics.num_rows, Precision::Inexact(3)); + + // 3 original statistics + 2 partition statistics + assert_eq!(source_statistics.column_statistics.len(), 5); + let (proj_schema, ..) = conf.project(); + // created a projector for that projected schema let mut proj = PartitionColumnProjector::new( proj_schema, diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs index 8f392b139161..34fb5bb6ddc1 100644 --- a/datafusion/proto/src/physical_plan/from_proto.rs +++ b/datafusion/proto/src/physical_plan/from_proto.rs @@ -539,14 +539,13 @@ pub fn parse_protobuf_file_scan_config( } Ok(FileScanConfig::new(object_store_url, file_schema, source) - .with_projection(projection) .with_file_groups(file_groups) .with_constraints(constraints) .with_statistics(statistics) + .with_projection(projection) .with_limit(proto.limit.as_ref().map(|sl| sl.limit as usize)) .with_table_partition_cols(table_partition_cols) - .with_output_ordering(output_ordering) - .refresh_source()) + .with_output_ordering(output_ordering)) } impl TryFrom<&protobuf::PartitionedFile> for PartitionedFile { From 89ed225dcbe97ce9e9d1245d12e05637e3637f35 Mon Sep 17 00:00:00 2001 From: blaginin Date: Fri, 28 Feb 2025 00:55:48 +0000 Subject: [PATCH 04/12] Use `FileScanConfig` own `source` --- datafusion/datasource/src/file_scan_config.rs | 150 ++++++++---------- .../tests/cases/roundtrip_physical_plan.rs | 43 ++++- 2 files changed, 110 insertions(+), 83 deletions(-) diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index aef188e50606..689c16ab1ece 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -244,7 +244,7 @@ impl DataSource for FileScanConfig { } fn statistics(&self) -> Result { - self.file_source.statistics() + Ok(self.projected_stats()) } fn with_fetch(&self, limit: Option) -> Option> { @@ -324,13 +324,7 @@ impl FileScanConfig { /// Set the file source pub fn with_source(mut self, file_source: Arc) -> Self { - let ( - _projected_schema, - _constraints, - projected_statistics, - _projected_output_ordering, - ) = self.project(); - self.file_source = file_source.with_statistics(projected_statistics); + self.file_source = file_source; self } @@ -346,10 +340,69 @@ impl FileScanConfig { self } + fn projection_indices(&self) -> Vec { + match &self.projection { + Some(proj) => proj.clone(), + None => (0..self.file_schema.fields().len() + + self.table_partition_cols.len()) + .collect(), + } + } + + fn projected_stats(&self) -> Statistics { + let table_cols_stats = self + .projection_indices() + .into_iter() + .map(|idx| { + if idx < self.file_schema.fields().len() { + self.statistics.column_statistics[idx].clone() + } else { + // TODO provide accurate stat for partition column (#1186) + ColumnStatistics::new_unknown() + } + }) + .collect(); + + Statistics { + num_rows: self.statistics.num_rows, + // TODO correct byte size? + total_byte_size: Precision::Absent, + column_statistics: table_cols_stats, + } + } + + fn projected_schema(&self) -> Arc { + let table_fields: Vec<_> = self + .projection_indices() + .into_iter() + .map(|idx| { + if idx < self.file_schema.fields().len() { + self.file_schema.field(idx).clone() + } else { + let partition_idx = idx - self.file_schema.fields().len(); + self.table_partition_cols[partition_idx].clone() + } + }) + .collect(); + + Arc::new(Schema::new_with_metadata( + table_fields, + self.file_schema.metadata().clone(), + )) + } + + fn projected_constraints(&self) -> Constraints { + let indexes = self.projection_indices(); + + self.constraints + .project(&indexes) + .unwrap_or_else(Constraints::empty) + } + /// Set the projection of the files pub fn with_projection(mut self, projection: Option>) -> Self { self.projection = projection; - self.with_updated_statistics() + self } /// Set the limit of the files @@ -358,32 +411,6 @@ impl FileScanConfig { self } - // Update source statistics with the current projection data - fn with_updated_statistics(mut self) -> Self { - let max_projection_column = *self - .projection - .as_ref() - .and_then(|proj| proj.iter().max()) - .unwrap_or(&0); - - if max_projection_column - >= self.file_schema.fields().len() + self.table_partition_cols.len() - { - // we don't yet have enough information (file schema info or partition column info) to perform projection - return self; - } - - let ( - _projected_schema, - _constraints, - projected_statistics, - _projected_output_ordering, - ) = self.project(); - - self.file_source = self.file_source.with_statistics(projected_statistics); - self - } - /// Add a file as a single group /// /// See [Self::file_groups] for more information. @@ -413,7 +440,7 @@ impl FileScanConfig { /// Set the partitioning columns of the files pub fn with_table_partition_cols(mut self, table_partition_cols: Vec) -> Self { self.table_partition_cols = table_partition_cols; - self.with_updated_statistics() + self } /// Set the output ordering of the files @@ -459,54 +486,13 @@ impl FileScanConfig { ); } - let proj_indices = if let Some(proj) = &self.projection { - proj - } else { - let len = self.file_schema.fields().len() + self.table_partition_cols.len(); - &(0..len).collect::>() - }; - - let mut table_fields = vec![]; - let mut table_cols_stats = vec![]; - for idx in proj_indices { - if *idx < self.file_schema.fields().len() { - let field = self.file_schema.field(*idx); - table_fields.push(field.clone()); - table_cols_stats.push(self.statistics.column_statistics[*idx].clone()) - } else { - let partition_idx = idx - self.file_schema.fields().len(); - table_fields.push(self.table_partition_cols[partition_idx].to_owned()); - // TODO provide accurate stat for partition column (#1186) - table_cols_stats.push(ColumnStatistics::new_unknown()) - } - } - - let table_stats = Statistics { - num_rows: self.statistics.num_rows, - // TODO correct byte size? - total_byte_size: Precision::Absent, - column_statistics: table_cols_stats, - }; - - let projected_schema = Arc::new(Schema::new_with_metadata( - table_fields, - self.file_schema.metadata().clone(), - )); - - let projected_constraints = self - .constraints - .project(proj_indices) - .unwrap_or_else(Constraints::empty); + let schema = self.projected_schema(); + let constraints = self.projected_constraints(); + let stats = self.projected_stats(); - let projected_output_ordering = - get_projected_output_ordering(self, &projected_schema); + let output_ordering = get_projected_output_ordering(self, &schema); - ( - projected_schema, - projected_constraints, - table_stats, - projected_output_ordering, - ) + (schema, constraints, stats, output_ordering) } #[cfg_attr(not(feature = "avro"), allow(unused))] // Only used by avro diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index a2506bb318d2..702fa96fe078 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -48,7 +48,7 @@ use datafusion::datasource::listing::{ListingTableUrl, PartitionedFile}; use datafusion::datasource::object_store::ObjectStoreUrl; use datafusion::datasource::physical_plan::{ wrap_partition_type_in_dict, wrap_partition_value_in_dict, FileScanConfig, - FileSinkConfig, ParquetSource, + FileSinkConfig, FileSource, ParquetSource, }; use datafusion::execution::FunctionRegistry; use datafusion::functions_aggregate::sum::sum_udaf; @@ -1579,3 +1579,44 @@ async fn roundtrip_coalesce() -> Result<()> { Ok(()) } + +#[tokio::test] +async fn roundtrip_projection_source() -> Result<()> { + let schema = Arc::new(Schema::new(Fields::from([ + Arc::new(Field::new("a", DataType::Utf8, false)), + Arc::new(Field::new("b", DataType::Utf8, false)), + Arc::new(Field::new("c", DataType::Int32, false)), + Arc::new(Field::new("d", DataType::Int32, false)), + ]))); + + let statistics = Statistics::new_unknown(&schema); + + let source = ParquetSource::default().with_statistics(statistics.clone()); + let scan_config = FileScanConfig { + object_store_url: ObjectStoreUrl::local_filesystem(), + file_groups: vec![vec![PartitionedFile::new( + "/path/to/file.parquet".to_string(), + 1024, + )]], + constraints: Constraints::empty(), + statistics, + file_schema: schema.clone(), + projection: Some(vec![0, 1, 2]), + limit: None, + table_partition_cols: vec![], + output_ordering: vec![], + file_compression_type: FileCompressionType::UNCOMPRESSED, + new_lines_in_values: false, + file_source: source, + }; + + let filter = Arc::new( + FilterExec::try_new( + Arc::new(BinaryExpr::new(col("c", &schema)?, Operator::Eq, lit(1))), + scan_config.build(), + )? + .with_projection(Some(vec![0, 1]))?, + ); + + roundtrip_test(filter) +} From 9dd4e38bfb9ff6f89ba06c53cc9bc25a93a65fcf Mon Sep 17 00:00:00 2001 From: blaginin Date: Fri, 28 Feb 2025 01:03:56 +0000 Subject: [PATCH 05/12] Extend test to ensure stats are different --- datafusion/datasource/src/file_scan_config.rs | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index 689c16ab1ece..f2009b21814d 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -1236,12 +1236,17 @@ mod tests { ); let source_statistics = conf.file_source.statistics().unwrap(); + let conf_stats = conf.statistics().unwrap(); - // statistics should be preserved and passed into the source - assert_eq!(source_statistics.num_rows, Precision::Inexact(3)); + // projection should be reflected in the file source statistics + assert_eq!(conf_stats.num_rows, Precision::Inexact(3)); // 3 original statistics + 2 partition statistics - assert_eq!(source_statistics.column_statistics.len(), 5); + assert_eq!(conf_stats.column_statistics.len(), 5); + + // file statics should not be modified + assert_eq!(source_statistics, statistics); + assert_eq!(source_statistics.column_statistics.len(), 3); let (proj_schema, ..) = conf.project(); // created a projector for that projected schema @@ -1706,7 +1711,7 @@ mod tests { FileScanConfig::new( ObjectStoreUrl::parse("test:///").unwrap(), file_schema, - Arc::new(MockSource::default()), + MockSource::default().with_statistics(statistics.clone()), ) .with_projection(projection) .with_statistics(statistics) From 09a2f6e4b2579953a6335fe2285d4b2b617a9853 Mon Sep 17 00:00:00 2001 From: blaginin Date: Fri, 28 Feb 2025 01:08:51 +0000 Subject: [PATCH 06/12] Unify names --- datafusion/datasource/src/file_scan_config.rs | 42 +++++++++---------- 1 file changed, 21 insertions(+), 21 deletions(-) diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index f2009b21814d..4b5175c809ec 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -244,7 +244,7 @@ impl DataSource for FileScanConfig { } fn statistics(&self) -> Result { - Ok(self.projected_stats()) + Ok(self.projection_stats()) } fn with_fetch(&self, limit: Option) -> Option> { @@ -349,7 +349,7 @@ impl FileScanConfig { } } - fn projected_stats(&self) -> Statistics { + fn projection_stats(&self) -> Statistics { let table_cols_stats = self .projection_indices() .into_iter() @@ -371,7 +371,7 @@ impl FileScanConfig { } } - fn projected_schema(&self) -> Arc { + fn projection_schema(&self) -> Arc { let table_fields: Vec<_> = self .projection_indices() .into_iter() @@ -391,7 +391,7 @@ impl FileScanConfig { )) } - fn projected_constraints(&self) -> Constraints { + fn projection_constraints(&self) -> Constraints { let indexes = self.projection_indices(); self.constraints @@ -486,9 +486,9 @@ impl FileScanConfig { ); } - let schema = self.projected_schema(); - let constraints = self.projected_constraints(); - let stats = self.projected_stats(); + let schema = self.projection_schema(); + let constraints = self.projection_constraints(); + let stats = self.projection_stats(); let output_ordering = get_projected_output_ordering(self, &schema); @@ -672,17 +672,17 @@ pub struct PartitionColumnProjector { /// insert the partition columns in the target record batch. projected_partition_indexes: Vec<(usize, usize)>, /// The schema of the table once the projection was applied. - projected_schema: SchemaRef, + projection_schema: SchemaRef, } impl PartitionColumnProjector { // Create a projector to insert the partitioning columns into batches read from files - // - `projected_schema`: the target schema with both file and partitioning columns + // - `projection_schema`: the target schema with both file and partitioning columns // - `table_partition_cols`: all the partitioning column names - pub fn new(projected_schema: SchemaRef, table_partition_cols: &[String]) -> Self { + pub fn new(projection_schema: SchemaRef, table_partition_cols: &[String]) -> Self { let mut idx_map = HashMap::new(); for (partition_idx, partition_name) in table_partition_cols.iter().enumerate() { - if let Ok(schema_idx) = projected_schema.index_of(partition_name) { + if let Ok(schema_idx) = projection_schema.index_of(partition_name) { idx_map.insert(partition_idx, schema_idx); } } @@ -693,12 +693,12 @@ impl PartitionColumnProjector { Self { projected_partition_indexes, key_buffer_cache: Default::default(), - projected_schema, + projection_schema, } } // Transform the batch read from the file by inserting the partitioning columns - // to the right positions as deduced from `projected_schema` + // to the right positions as deduced from `projection_schema` // - `file_batch`: batch read from the file, with internal projection applied // - `partition_values`: the list of partition values, one for each partition column pub fn project( @@ -706,8 +706,8 @@ impl PartitionColumnProjector { file_batch: RecordBatch, partition_values: &[ScalarValue], ) -> Result { - let expected_cols = - self.projected_schema.fields().len() - self.projected_partition_indexes.len(); + let expected_cols = self.projection_schema.fields().len() + - self.projected_partition_indexes.len(); if file_batch.columns().len() != expected_cols { return exec_err!( @@ -729,7 +729,7 @@ impl PartitionColumnProjector { let mut partition_value = Cow::Borrowed(p_value); // check if user forgot to dict-encode the partition value - let field = self.projected_schema.field(sidx); + let field = self.projection_schema.field(sidx); let expected_data_type = field.data_type(); let actual_data_type = partition_value.data_type(); if let DataType::Dictionary(key_type, _) = expected_data_type { @@ -753,7 +753,7 @@ impl PartitionColumnProjector { } RecordBatch::try_new_with_options( - Arc::clone(&self.projected_schema), + Arc::clone(&self.projection_schema), cols, &RecordBatchOptions::new().with_row_count(Some(file_batch.num_rows())), ) @@ -965,7 +965,7 @@ fn create_output_array( ///``` fn get_projected_output_ordering( base_config: &FileScanConfig, - projected_schema: &SchemaRef, + projection_schema: &SchemaRef, ) -> Vec { let mut all_orderings = vec![]; for output_ordering in &base_config.output_ordering { @@ -973,7 +973,7 @@ fn get_projected_output_ordering( for PhysicalSortExpr { expr, options } in output_ordering.iter() { if let Some(col) = expr.as_any().downcast_ref::() { let name = col.name(); - if let Some((idx, _)) = projected_schema.column_with_name(name) { + if let Some((idx, _)) = projection_schema.column_with_name(name) { // Compute the new sort expression (with correct index) after projection: new_ordering.push(PhysicalSortExpr { expr: Arc::new(Column::new(name, idx)), @@ -982,7 +982,7 @@ fn get_projected_output_ordering( continue; } } - // Cannot find expression in the projected_schema, stop iterating + // Cannot find expression in the projection_schema, stop iterating // since rest of the orderings are violated break; } @@ -1002,7 +1002,7 @@ fn get_projected_output_ordering( let statistics = match MinMaxStatistics::new_from_files( &new_ordering, - projected_schema, + projection_schema, base_config.projection.as_deref(), group, ) { From e87ffb9b2f41fa1f3776e626e86cba0aba18aafe Mon Sep 17 00:00:00 2001 From: blaginin Date: Fri, 28 Feb 2025 07:26:49 +0000 Subject: [PATCH 07/12] Comment `total_byte_size` in tests --- datafusion/core/src/datasource/file_format/parquet.rs | 6 ++++-- datafusion/core/src/datasource/listing/table.rs | 7 +++++-- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 98aa24ad00cb..094fec2332fd 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -1934,7 +1934,8 @@ mod tests { // test metadata assert_eq!(exec.statistics()?.num_rows, Precision::Exact(8)); - assert_eq!(exec.statistics()?.total_byte_size, Precision::Exact(671)); + // assert_eq!(exec.statistics()?.total_byte_size, Precision::Exact(671)); + // todo: uncomment when FileScanConfig::projection_stats puts byte size Ok(()) } @@ -1976,7 +1977,8 @@ mod tests { // note: even if the limit is set, the executor rounds up to the batch size assert_eq!(exec.statistics()?.num_rows, Precision::Exact(8)); - assert_eq!(exec.statistics()?.total_byte_size, Precision::Exact(671)); + // assert_eq!(exec.statistics()?.total_byte_size, Precision::Exact(671)); + // todo: uncomment when FileScanConfig::projection_stats puts byte size let batches = collect(exec, task_ctx).await?; assert_eq!(1, batches.len()); assert_eq!(11, batches[0].num_columns()); diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index adef02c38d73..b445bedfd3e2 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -1213,7 +1213,8 @@ mod tests { // test metadata assert_eq!(exec.statistics()?.num_rows, Precision::Exact(8)); - assert_eq!(exec.statistics()?.total_byte_size, Precision::Exact(671)); + // assert_eq!(exec.statistics()?.total_byte_size, Precision::Exact(671)); + // todo: uncomment when FileScanConfig::projection_stats puts byte size Ok(()) } @@ -1239,7 +1240,9 @@ mod tests { let exec = table.scan(&state, None, &[], None).await?; assert_eq!(exec.statistics()?.num_rows, Precision::Exact(8)); - assert_eq!(exec.statistics()?.total_byte_size, Precision::Exact(671)); + + // assert_eq!(exec.statistics()?.total_byte_size, Precision::Exact(671)); + // todo: uncomment when FileScanConfig::projection_stats puts byte size Ok(()) } From 616486d24a23f41cbdac00b468de6978cb463a3e Mon Sep 17 00:00:00 2001 From: blaginin Date: Fri, 28 Feb 2025 12:14:33 +0000 Subject: [PATCH 08/12] Use source stats as a base --- .../core/tests/parquet/file_statistics.rs | 6 +- datafusion/datasource/src/file_scan_config.rs | 55 ++++++++++--------- .../tests/cases/roundtrip_physical_plan.rs | 28 ++++------ 3 files changed, 43 insertions(+), 46 deletions(-) diff --git a/datafusion/core/tests/parquet/file_statistics.rs b/datafusion/core/tests/parquet/file_statistics.rs index 4c1d17c8426e..8a5caf23c51c 100644 --- a/datafusion/core/tests/parquet/file_statistics.rs +++ b/datafusion/core/tests/parquet/file_statistics.rs @@ -82,7 +82,7 @@ async fn load_table_stats_with_session_level_cache() { assert_eq!(exec1.statistics().unwrap().num_rows, Precision::Exact(8)); assert_eq!( exec1.statistics().unwrap().total_byte_size, - Precision::Exact(671) + Precision::Absent // todo: put Precision::Exact(671) when projected_stats supports total_byte_size calculation ); assert_eq!(get_static_cache_size(&state1), 1); @@ -93,7 +93,7 @@ async fn load_table_stats_with_session_level_cache() { assert_eq!(exec2.statistics().unwrap().num_rows, Precision::Exact(8)); assert_eq!( exec2.statistics().unwrap().total_byte_size, - Precision::Exact(671) + Precision::Absent // todo: put Precision::Exact(671) when projected_stats supports total_byte_size calculation ); assert_eq!(get_static_cache_size(&state2), 1); @@ -104,7 +104,7 @@ async fn load_table_stats_with_session_level_cache() { assert_eq!(exec3.statistics().unwrap().num_rows, Precision::Exact(8)); assert_eq!( exec3.statistics().unwrap().total_byte_size, - Precision::Exact(671) + Precision::Absent // todo: put Precision::Exact(671) when projected_stats supports total_byte_size calculation ); // List same file no increase assert_eq!(get_static_cache_size(&state1), 1); diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index 4b5175c809ec..c685e053f572 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -244,7 +244,7 @@ impl DataSource for FileScanConfig { } fn statistics(&self) -> Result { - Ok(self.projection_stats()) + Ok(self.projected_stats()) } fn with_fetch(&self, limit: Option) -> Option> { @@ -324,7 +324,7 @@ impl FileScanConfig { /// Set the file source pub fn with_source(mut self, file_source: Arc) -> Self { - self.file_source = file_source; + self.file_source = file_source.with_statistics(self.statistics.clone()); self } @@ -336,7 +336,8 @@ impl FileScanConfig { /// Set the statistics of the files pub fn with_statistics(mut self, statistics: Statistics) -> Self { - self.statistics = statistics; + self.statistics = statistics.clone(); + self.file_source.with_statistics(statistics); self } @@ -349,13 +350,17 @@ impl FileScanConfig { } } - fn projection_stats(&self) -> Statistics { + fn projected_stats(&self) -> Statistics { + let statistics = self + .file_source + .statistics() + .unwrap_or(self.statistics.clone()); let table_cols_stats = self .projection_indices() .into_iter() .map(|idx| { if idx < self.file_schema.fields().len() { - self.statistics.column_statistics[idx].clone() + statistics.column_statistics[idx].clone() } else { // TODO provide accurate stat for partition column (#1186) ColumnStatistics::new_unknown() @@ -364,14 +369,14 @@ impl FileScanConfig { .collect(); Statistics { - num_rows: self.statistics.num_rows, + num_rows: statistics.num_rows, // TODO correct byte size? total_byte_size: Precision::Absent, column_statistics: table_cols_stats, } } - fn projection_schema(&self) -> Arc { + fn projected_schema(&self) -> Arc { let table_fields: Vec<_> = self .projection_indices() .into_iter() @@ -391,7 +396,7 @@ impl FileScanConfig { )) } - fn projection_constraints(&self) -> Constraints { + fn projected_constraints(&self) -> Constraints { let indexes = self.projection_indices(); self.constraints @@ -486,9 +491,9 @@ impl FileScanConfig { ); } - let schema = self.projection_schema(); - let constraints = self.projection_constraints(); - let stats = self.projection_stats(); + let schema = self.projected_schema(); + let constraints = self.projected_constraints(); + let stats = self.projected_stats(); let output_ordering = get_projected_output_ordering(self, &schema); @@ -672,17 +677,17 @@ pub struct PartitionColumnProjector { /// insert the partition columns in the target record batch. projected_partition_indexes: Vec<(usize, usize)>, /// The schema of the table once the projection was applied. - projection_schema: SchemaRef, + projected_schema: SchemaRef, } impl PartitionColumnProjector { // Create a projector to insert the partitioning columns into batches read from files - // - `projection_schema`: the target schema with both file and partitioning columns + // - `projected_schema`: the target schema with both file and partitioning columns // - `table_partition_cols`: all the partitioning column names - pub fn new(projection_schema: SchemaRef, table_partition_cols: &[String]) -> Self { + pub fn new(projected_schema: SchemaRef, table_partition_cols: &[String]) -> Self { let mut idx_map = HashMap::new(); for (partition_idx, partition_name) in table_partition_cols.iter().enumerate() { - if let Ok(schema_idx) = projection_schema.index_of(partition_name) { + if let Ok(schema_idx) = projected_schema.index_of(partition_name) { idx_map.insert(partition_idx, schema_idx); } } @@ -693,12 +698,12 @@ impl PartitionColumnProjector { Self { projected_partition_indexes, key_buffer_cache: Default::default(), - projection_schema, + projected_schema, } } // Transform the batch read from the file by inserting the partitioning columns - // to the right positions as deduced from `projection_schema` + // to the right positions as deduced from `projected_schema` // - `file_batch`: batch read from the file, with internal projection applied // - `partition_values`: the list of partition values, one for each partition column pub fn project( @@ -706,8 +711,8 @@ impl PartitionColumnProjector { file_batch: RecordBatch, partition_values: &[ScalarValue], ) -> Result { - let expected_cols = self.projection_schema.fields().len() - - self.projected_partition_indexes.len(); + let expected_cols = + self.projected_schema.fields().len() - self.projected_partition_indexes.len(); if file_batch.columns().len() != expected_cols { return exec_err!( @@ -729,7 +734,7 @@ impl PartitionColumnProjector { let mut partition_value = Cow::Borrowed(p_value); // check if user forgot to dict-encode the partition value - let field = self.projection_schema.field(sidx); + let field = self.projected_schema.field(sidx); let expected_data_type = field.data_type(); let actual_data_type = partition_value.data_type(); if let DataType::Dictionary(key_type, _) = expected_data_type { @@ -753,7 +758,7 @@ impl PartitionColumnProjector { } RecordBatch::try_new_with_options( - Arc::clone(&self.projection_schema), + Arc::clone(&self.projected_schema), cols, &RecordBatchOptions::new().with_row_count(Some(file_batch.num_rows())), ) @@ -965,7 +970,7 @@ fn create_output_array( ///``` fn get_projected_output_ordering( base_config: &FileScanConfig, - projection_schema: &SchemaRef, + projected_schema: &SchemaRef, ) -> Vec { let mut all_orderings = vec![]; for output_ordering in &base_config.output_ordering { @@ -973,7 +978,7 @@ fn get_projected_output_ordering( for PhysicalSortExpr { expr, options } in output_ordering.iter() { if let Some(col) = expr.as_any().downcast_ref::() { let name = col.name(); - if let Some((idx, _)) = projection_schema.column_with_name(name) { + if let Some((idx, _)) = projected_schema.column_with_name(name) { // Compute the new sort expression (with correct index) after projection: new_ordering.push(PhysicalSortExpr { expr: Arc::new(Column::new(name, idx)), @@ -982,7 +987,7 @@ fn get_projected_output_ordering( continue; } } - // Cannot find expression in the projection_schema, stop iterating + // Cannot find expression in the projected_schema, stop iterating // since rest of the orderings are violated break; } @@ -1002,7 +1007,7 @@ fn get_projected_output_ordering( let statistics = match MinMaxStatistics::new_from_files( &new_ordering, - projection_schema, + projected_schema, base_config.projection.as_deref(), group, ) { diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index 702fa96fe078..e1cbdb7f8bc4 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -753,24 +753,16 @@ async fn roundtrip_parquet_exec_with_table_partition_cols() -> Result<()> { let schema = Arc::new(Schema::new(vec![Field::new("col", DataType::Utf8, false)])); let source = Arc::new(ParquetSource::default()); - let scan_config = FileScanConfig { - object_store_url: ObjectStoreUrl::local_filesystem(), - file_groups: vec![vec![file_group]], - constraints: Constraints::empty(), - statistics: Statistics::new_unknown(&schema), - file_schema: schema, - projection: Some(vec![0, 1]), - limit: None, - table_partition_cols: vec![Field::new( - "part".to_string(), - wrap_partition_type_in_dict(DataType::Int16), - false, - )], - output_ordering: vec![], - file_compression_type: FileCompressionType::UNCOMPRESSED, - new_lines_in_values: false, - file_source: source, - }; + let scan_config = + FileScanConfig::new(ObjectStoreUrl::local_filesystem(), schema, source) + .with_projection(Some(vec![0, 1])) + .with_file_group(vec![file_group]) + .with_table_partition_cols(vec![Field::new( + "part".to_string(), + wrap_partition_type_in_dict(DataType::Int16), + false, + )]) + .with_newlines_in_values(false); roundtrip_test(scan_config.build()) } From a74637c6cb88b8a4592c3585c36f602d8d616ac3 Mon Sep 17 00:00:00 2001 From: blaginin Date: Fri, 28 Feb 2025 13:01:14 +0000 Subject: [PATCH 09/12] Return correct stats in the `ParquetSource` mock --- datafusion/datasource/src/file_scan_config.rs | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index c685e053f572..17d272c77ba9 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -86,20 +86,22 @@ use crate::{ /// # Field::new("c4", DataType::Int32, false), /// # ])); /// # // Note: crate mock ParquetSource, as ParquetSource is not in the datasource crate -/// # struct ParquetSource {}; +/// # struct ParquetSource { +/// # projected_statistics: Option +/// # }; /// # impl FileSource for ParquetSource { /// # fn create_file_opener(&self, _: Arc, _: &FileScanConfig, _: usize) -> Arc { unimplemented!() } /// # fn as_any(&self) -> &dyn Any { self } /// # fn with_batch_size(&self, _: usize) -> Arc { unimplemented!() } /// # fn with_schema(&self, _: SchemaRef) -> Arc { unimplemented!() } /// # fn with_projection(&self, _: &FileScanConfig) -> Arc { unimplemented!() } -/// # fn with_statistics(&self, _: Statistics) -> Arc { Arc::new(Self::new()) } +/// # fn with_statistics(&self, statistics: Statistics) -> Arc { Arc::new(Self {projected_statistics: Some(statistics)} ) } /// # fn metrics(&self) -> &ExecutionPlanMetricsSet { unimplemented!() } -/// # fn statistics(&self) -> datafusion_common::Result { unimplemented!() } +/// # fn statistics(&self) -> datafusion_common::Result { Ok(self.projected_statistics.clone().expect("projected_statistics should be set")) } /// # fn file_type(&self) -> &str { "parquet" } /// # } /// # impl ParquetSource { -/// # fn new() -> Self { Self{} } +/// # fn new() -> Self { Self {projected_statistics: None} } /// # } /// // create FileScan config for reading parquet files from file:// /// let object_store_url = ObjectStoreUrl::local_filesystem(); @@ -337,7 +339,7 @@ impl FileScanConfig { /// Set the statistics of the files pub fn with_statistics(mut self, statistics: Statistics) -> Self { self.statistics = statistics.clone(); - self.file_source.with_statistics(statistics); + self.file_source = self.file_source.with_statistics(statistics); self } @@ -355,6 +357,7 @@ impl FileScanConfig { .file_source .statistics() .unwrap_or(self.statistics.clone()); + let table_cols_stats = self .projection_indices() .into_iter() @@ -1716,7 +1719,7 @@ mod tests { FileScanConfig::new( ObjectStoreUrl::parse("test:///").unwrap(), file_schema, - MockSource::default().with_statistics(statistics.clone()), + Arc::new(MockSource::default()), ) .with_projection(projection) .with_statistics(statistics) From dbd4f675665c0a6ec7dc342e837f3b173cb7d911 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 28 Feb 2025 09:53:40 -0500 Subject: [PATCH 10/12] Revert test changes, add follow on ticket --- datafusion/core/src/datasource/file_format/parquet.rs | 8 ++++---- datafusion/core/src/datasource/listing/table.rs | 8 +++----- datafusion/datasource/src/file_scan_config.rs | 9 ++++----- 3 files changed, 11 insertions(+), 14 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 094fec2332fd..4a24871aeef7 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -1934,8 +1934,8 @@ mod tests { // test metadata assert_eq!(exec.statistics()?.num_rows, Precision::Exact(8)); - // assert_eq!(exec.statistics()?.total_byte_size, Precision::Exact(671)); - // todo: uncomment when FileScanConfig::projection_stats puts byte size + // TODO correct byte size: https://github.com/apache/datafusion/issues/14936 + assert_eq!(exec.statistics()?.total_byte_size, Precision::Exact(671)); Ok(()) } @@ -1977,8 +1977,8 @@ mod tests { // note: even if the limit is set, the executor rounds up to the batch size assert_eq!(exec.statistics()?.num_rows, Precision::Exact(8)); - // assert_eq!(exec.statistics()?.total_byte_size, Precision::Exact(671)); - // todo: uncomment when FileScanConfig::projection_stats puts byte size + // TODO correct byte size: https://github.com/apache/datafusion/issues/14936 + assert_eq!(exec.statistics()?.total_byte_size, Precision::Exact(671)); let batches = collect(exec, task_ctx).await?; assert_eq!(1, batches.len()); assert_eq!(11, batches[0].num_columns()); diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index b445bedfd3e2..41e939d60b08 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -1213,8 +1213,7 @@ mod tests { // test metadata assert_eq!(exec.statistics()?.num_rows, Precision::Exact(8)); - // assert_eq!(exec.statistics()?.total_byte_size, Precision::Exact(671)); - // todo: uncomment when FileScanConfig::projection_stats puts byte size + assert_eq!(exec.statistics()?.total_byte_size, Precision::Exact(671)); Ok(()) } @@ -1240,9 +1239,8 @@ mod tests { let exec = table.scan(&state, None, &[], None).await?; assert_eq!(exec.statistics()?.num_rows, Precision::Exact(8)); - - // assert_eq!(exec.statistics()?.total_byte_size, Precision::Exact(671)); - // todo: uncomment when FileScanConfig::projection_stats puts byte size + // TODO correct byte size: https://github.com/apache/datafusion/issues/14936 + assert_eq!(exec.statistics()?.total_byte_size, Precision::Exact(671)); Ok(()) } diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index 17d272c77ba9..79279b5c8231 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -31,9 +31,7 @@ use arrow::{ buffer::Buffer, datatypes::{ArrowNativeType, DataType, Field, Schema, SchemaRef, UInt16Type}, }; -use datafusion_common::{ - exec_err, stats::Precision, ColumnStatistics, Constraints, Result, Statistics, -}; +use datafusion_common::{exec_err, ColumnStatistics, Constraints, Result, Statistics}; use datafusion_common::{DataFusionError, ScalarValue}; use datafusion_execution::{ object_store::ObjectStoreUrl, SendableRecordBatchStream, TaskContext, @@ -373,8 +371,8 @@ impl FileScanConfig { Statistics { num_rows: statistics.num_rows, - // TODO correct byte size? - total_byte_size: Precision::Absent, + // TODO correct byte size: https://github.com/apache/datafusion/issues/14936 + total_byte_size: statistics.total_byte_size, column_statistics: table_cols_stats, } } @@ -1068,6 +1066,7 @@ mod tests { compute::SortOptions, }; + use datafusion_common::stats::Precision; use datafusion_common::{assert_batches_eq, DFSchema}; use datafusion_expr::{execution_props::ExecutionProps, SortExpr}; use datafusion_physical_expr::create_physical_expr; From 9ca4e9df610d79b7f11d05ba03882ea4d5ed0be5 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 28 Feb 2025 09:57:22 -0500 Subject: [PATCH 11/12] Revert statistics total_byte_count change --- .../sqllogictest/test_files/explain.slt | 26 +++++++++---------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/datafusion/sqllogictest/test_files/explain.slt b/datafusion/sqllogictest/test_files/explain.slt index 455b9a439776..16c61a1db6ee 100644 --- a/datafusion/sqllogictest/test_files/explain.slt +++ b/datafusion/sqllogictest/test_files/explain.slt @@ -294,22 +294,22 @@ CREATE EXTERNAL TABLE alltypes_plain STORED AS PARQUET LOCATION '../../parquet-t query TT EXPLAIN SELECT * FROM alltypes_plain limit 10; ---- -physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, statistics=[Rows=Exact(8), Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] +physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, statistics=[Rows=Exact(8), Bytes=Exact(671), [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] # explain verbose with both collect & show statistics on query TT EXPLAIN VERBOSE SELECT * FROM alltypes_plain limit 10; ---- initial_physical_plan -01)GlobalLimitExec: skip=0, fetch=10, statistics=[Rows=Exact(8), Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, statistics=[Rows=Exact(8), Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] +01)GlobalLimitExec: skip=0, fetch=10, statistics=[Rows=Exact(8), Bytes=Exact(671), [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, statistics=[Rows=Exact(8), Bytes=Exact(671), [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] initial_physical_plan_with_schema 01)GlobalLimitExec: skip=0, fetch=10, schema=[id:Int32;N, bool_col:Boolean;N, tinyint_col:Int32;N, smallint_col:Int32;N, int_col:Int32;N, bigint_col:Int64;N, float_col:Float32;N, double_col:Float64;N, date_string_col:BinaryView;N, string_col:BinaryView;N, timestamp_col:Timestamp(Nanosecond, None);N] 02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, schema=[id:Int32;N, bool_col:Boolean;N, tinyint_col:Int32;N, smallint_col:Int32;N, int_col:Int32;N, bigint_col:Int64;N, float_col:Float32;N, double_col:Float64;N, date_string_col:BinaryView;N, string_col:BinaryView;N, timestamp_col:Timestamp(Nanosecond, None);N] physical_plan after OutputRequirements -01)OutputRequirementExec, statistics=[Rows=Exact(8), Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] -02)--GlobalLimitExec: skip=0, fetch=10, statistics=[Rows=Exact(8), Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] -03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, statistics=[Rows=Exact(8), Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] +01)OutputRequirementExec, statistics=[Rows=Exact(8), Bytes=Exact(671), [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] +02)--GlobalLimitExec: skip=0, fetch=10, statistics=[Rows=Exact(8), Bytes=Exact(671), [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] +03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, statistics=[Rows=Exact(8), Bytes=Exact(671), [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] physical_plan after aggregate_statistics SAME TEXT AS ABOVE physical_plan after join_selection SAME TEXT AS ABOVE physical_plan after LimitedDistinctAggregation SAME TEXT AS ABOVE @@ -320,13 +320,13 @@ physical_plan after OptimizeAggregateOrder SAME TEXT AS ABOVE physical_plan after ProjectionPushdown SAME TEXT AS ABOVE physical_plan after coalesce_batches SAME TEXT AS ABOVE physical_plan after OutputRequirements -01)GlobalLimitExec: skip=0, fetch=10, statistics=[Rows=Exact(8), Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, statistics=[Rows=Exact(8), Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] +01)GlobalLimitExec: skip=0, fetch=10, statistics=[Rows=Exact(8), Bytes=Exact(671), [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, statistics=[Rows=Exact(8), Bytes=Exact(671), [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] physical_plan after LimitAggregation SAME TEXT AS ABOVE physical_plan after ProjectionPushdown SAME TEXT AS ABOVE -physical_plan after LimitPushdown DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, statistics=[Rows=Exact(8), Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] +physical_plan after LimitPushdown DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, statistics=[Rows=Exact(8), Bytes=Exact(671), [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] physical_plan after SanityCheckPlan SAME TEXT AS ABOVE -physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, statistics=[Rows=Exact(8), Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] +physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, statistics=[Rows=Exact(8), Bytes=Exact(671), [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] physical_plan_with_schema DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, schema=[id:Int32;N, bool_col:Boolean;N, tinyint_col:Int32;N, smallint_col:Int32;N, int_col:Int32;N, bigint_col:Int64;N, float_col:Float32;N, double_col:Float64;N, date_string_col:BinaryView;N, string_col:BinaryView;N, timestamp_col:Timestamp(Nanosecond, None);N] @@ -341,8 +341,8 @@ initial_physical_plan 01)GlobalLimitExec: skip=0, fetch=10 02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet initial_physical_plan_with_stats -01)GlobalLimitExec: skip=0, fetch=10, statistics=[Rows=Exact(8), Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, statistics=[Rows=Exact(8), Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] +01)GlobalLimitExec: skip=0, fetch=10, statistics=[Rows=Exact(8), Bytes=Exact(671), [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, statistics=[Rows=Exact(8), Bytes=Exact(671), [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] initial_physical_plan_with_schema 01)GlobalLimitExec: skip=0, fetch=10, schema=[id:Int32;N, bool_col:Boolean;N, tinyint_col:Int32;N, smallint_col:Int32;N, int_col:Int32;N, bigint_col:Int64;N, float_col:Float32;N, double_col:Float64;N, date_string_col:BinaryView;N, string_col:BinaryView;N, timestamp_col:Timestamp(Nanosecond, None);N] 02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, schema=[id:Int32;N, bool_col:Boolean;N, tinyint_col:Int32;N, smallint_col:Int32;N, int_col:Int32;N, bigint_col:Int64;N, float_col:Float32;N, double_col:Float64;N, date_string_col:BinaryView;N, string_col:BinaryView;N, timestamp_col:Timestamp(Nanosecond, None);N] @@ -367,7 +367,7 @@ physical_plan after ProjectionPushdown SAME TEXT AS ABOVE physical_plan after LimitPushdown DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet physical_plan after SanityCheckPlan SAME TEXT AS ABOVE physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet -physical_plan_with_stats DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, statistics=[Rows=Exact(8), Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] +physical_plan_with_stats DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, statistics=[Rows=Exact(8), Bytes=Exact(671), [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] physical_plan_with_schema DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, schema=[id:Int32;N, bool_col:Boolean;N, tinyint_col:Int32;N, smallint_col:Int32;N, int_col:Int32;N, bigint_col:Int64;N, float_col:Float32;N, double_col:Float64;N, date_string_col:BinaryView;N, string_col:BinaryView;N, timestamp_col:Timestamp(Nanosecond, None);N] From e9ef934d1c09a4b65f665a288866c731f268af35 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 28 Feb 2025 10:00:58 -0500 Subject: [PATCH 12/12] Update test --- datafusion/core/tests/parquet/file_statistics.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/datafusion/core/tests/parquet/file_statistics.rs b/datafusion/core/tests/parquet/file_statistics.rs index 8a5caf23c51c..7e98ebed6c9a 100644 --- a/datafusion/core/tests/parquet/file_statistics.rs +++ b/datafusion/core/tests/parquet/file_statistics.rs @@ -82,7 +82,8 @@ async fn load_table_stats_with_session_level_cache() { assert_eq!(exec1.statistics().unwrap().num_rows, Precision::Exact(8)); assert_eq!( exec1.statistics().unwrap().total_byte_size, - Precision::Absent // todo: put Precision::Exact(671) when projected_stats supports total_byte_size calculation + // TODO correct byte size: https://github.com/apache/datafusion/issues/14936 + Precision::Exact(671), ); assert_eq!(get_static_cache_size(&state1), 1); @@ -93,7 +94,8 @@ async fn load_table_stats_with_session_level_cache() { assert_eq!(exec2.statistics().unwrap().num_rows, Precision::Exact(8)); assert_eq!( exec2.statistics().unwrap().total_byte_size, - Precision::Absent // todo: put Precision::Exact(671) when projected_stats supports total_byte_size calculation + // TODO correct byte size: https://github.com/apache/datafusion/issues/14936 + Precision::Exact(671), ); assert_eq!(get_static_cache_size(&state2), 1); @@ -104,7 +106,8 @@ async fn load_table_stats_with_session_level_cache() { assert_eq!(exec3.statistics().unwrap().num_rows, Precision::Exact(8)); assert_eq!( exec3.statistics().unwrap().total_byte_size, - Precision::Absent // todo: put Precision::Exact(671) when projected_stats supports total_byte_size calculation + // TODO correct byte size: https://github.com/apache/datafusion/issues/14936 + Precision::Exact(671), ); // List same file no increase assert_eq!(get_static_cache_size(&state1), 1);