Skip to content

Set projection before configuring the source #14685

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 16 commits into from
Feb 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion datafusion-testing
2 changes: 2 additions & 0 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1934,6 +1934,7 @@ mod tests {

// test metadata
assert_eq!(exec.statistics()?.num_rows, Precision::Exact(8));
// TODO correct byte size: https://github.com/apache/datafusion/issues/14936
assert_eq!(exec.statistics()?.total_byte_size, Precision::Exact(671));

Ok(())
Expand Down Expand Up @@ -1976,6 +1977,7 @@ mod tests {

// note: even if the limit is set, the executor rounds up to the batch size
assert_eq!(exec.statistics()?.num_rows, Precision::Exact(8));
// TODO correct byte size: https://github.com/apache/datafusion/issues/14936
assert_eq!(exec.statistics()?.total_byte_size, Precision::Exact(671));
let batches = collect(exec, task_ctx).await?;
assert_eq!(1, batches.len());
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1239,6 +1239,7 @@ mod tests {

let exec = table.scan(&state, None, &[], None).await?;
assert_eq!(exec.statistics()?.num_rows, Precision::Exact(8));
// TODO correct byte size: https://github.com/apache/datafusion/issues/14936
assert_eq!(exec.statistics()?.total_byte_size, Precision::Exact(671));

Ok(())
Expand Down
9 changes: 6 additions & 3 deletions datafusion/core/tests/parquet/file_statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ async fn load_table_stats_with_session_level_cache() {
assert_eq!(exec1.statistics().unwrap().num_rows, Precision::Exact(8));
assert_eq!(
exec1.statistics().unwrap().total_byte_size,
Precision::Exact(671)
// TODO correct byte size: https://github.com/apache/datafusion/issues/14936
Precision::Exact(671),
);
assert_eq!(get_static_cache_size(&state1), 1);

Expand All @@ -93,7 +94,8 @@ async fn load_table_stats_with_session_level_cache() {
assert_eq!(exec2.statistics().unwrap().num_rows, Precision::Exact(8));
assert_eq!(
exec2.statistics().unwrap().total_byte_size,
Precision::Exact(671)
// TODO correct byte size: https://github.com/apache/datafusion/issues/14936
Precision::Exact(671),
);
assert_eq!(get_static_cache_size(&state2), 1);

Expand All @@ -104,7 +106,8 @@ async fn load_table_stats_with_session_level_cache() {
assert_eq!(exec3.statistics().unwrap().num_rows, Precision::Exact(8));
assert_eq!(
exec3.statistics().unwrap().total_byte_size,
Precision::Exact(671)
// TODO correct byte size: https://github.com/apache/datafusion/issues/14936
Precision::Exact(671),
);
// List same file no increase
assert_eq!(get_static_cache_size(&state1), 1);
Expand Down
165 changes: 102 additions & 63 deletions datafusion/datasource/src/file_scan_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,7 @@ use arrow::{
buffer::Buffer,
datatypes::{ArrowNativeType, DataType, Field, Schema, SchemaRef, UInt16Type},
};
use datafusion_common::{
exec_err, stats::Precision, ColumnStatistics, Constraints, Result, Statistics,
};
use datafusion_common::{exec_err, ColumnStatistics, Constraints, Result, Statistics};
use datafusion_common::{DataFusionError, ScalarValue};
use datafusion_execution::{
object_store::ObjectStoreUrl, SendableRecordBatchStream, TaskContext,
Expand Down Expand Up @@ -86,20 +84,22 @@ use crate::{
/// # Field::new("c4", DataType::Int32, false),
/// # ]));
/// # // Note: crate mock ParquetSource, as ParquetSource is not in the datasource crate
/// # struct ParquetSource {};
/// # struct ParquetSource {
/// # projected_statistics: Option<Statistics>
/// # };
/// # impl FileSource for ParquetSource {
/// # fn create_file_opener(&self, _: Arc<dyn ObjectStore>, _: &FileScanConfig, _: usize) -> Arc<dyn FileOpener> { unimplemented!() }
/// # fn as_any(&self) -> &dyn Any { self }
/// # fn with_batch_size(&self, _: usize) -> Arc<dyn FileSource> { unimplemented!() }
/// # fn with_schema(&self, _: SchemaRef) -> Arc<dyn FileSource> { unimplemented!() }
/// # fn with_projection(&self, _: &FileScanConfig) -> Arc<dyn FileSource> { unimplemented!() }
/// # fn with_statistics(&self, _: Statistics) -> Arc<dyn FileSource> { Arc::new(Self::new()) }
/// # fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource> { Arc::new(Self {projected_statistics: Some(statistics)} ) }
/// # fn metrics(&self) -> &ExecutionPlanMetricsSet { unimplemented!() }
/// # fn statistics(&self) -> datafusion_common::Result<Statistics> { unimplemented!() }
/// # fn statistics(&self) -> datafusion_common::Result<Statistics> { Ok(self.projected_statistics.clone().expect("projected_statistics should be set")) }
/// # fn file_type(&self) -> &str { "parquet" }
/// # }
/// # impl ParquetSource {
/// # fn new() -> Self { Self{} }
/// # fn new() -> Self { Self {projected_statistics: None} }
/// # }
/// // create FileScan config for reading parquet files from file://
/// let object_store_url = ObjectStoreUrl::local_filesystem();
Expand Down Expand Up @@ -244,7 +244,7 @@ impl DataSource for FileScanConfig {
}

fn statistics(&self) -> Result<Statistics> {
self.file_source.statistics()
Ok(self.projected_stats())
}

fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn DataSource>> {
Expand Down Expand Up @@ -324,13 +324,7 @@ impl FileScanConfig {

/// Set the file source
pub fn with_source(mut self, file_source: Arc<dyn FileSource>) -> Self {
let (
_projected_schema,
_constraints,
projected_statistics,
_projected_output_ordering,
) = self.project();
self.file_source = file_source.with_statistics(projected_statistics);
self.file_source = file_source.with_statistics(self.statistics.clone());
self
}

Expand All @@ -342,10 +336,75 @@ impl FileScanConfig {

/// Set the statistics of the files
pub fn with_statistics(mut self, statistics: Statistics) -> Self {
self.statistics = statistics;
self.statistics = statistics.clone();
self.file_source = self.file_source.with_statistics(statistics);
self
}

fn projection_indices(&self) -> Vec<usize> {
match &self.projection {
Some(proj) => proj.clone(),
None => (0..self.file_schema.fields().len()
+ self.table_partition_cols.len())
.collect(),
}
}

fn projected_stats(&self) -> Statistics {
let statistics = self
.file_source
.statistics()
.unwrap_or(self.statistics.clone());

let table_cols_stats = self
.projection_indices()
.into_iter()
.map(|idx| {
if idx < self.file_schema.fields().len() {
statistics.column_statistics[idx].clone()
} else {
// TODO provide accurate stat for partition column (#1186)
ColumnStatistics::new_unknown()
}
})
.collect();

Statistics {
num_rows: statistics.num_rows,
// TODO correct byte size: https://github.com/apache/datafusion/issues/14936
total_byte_size: statistics.total_byte_size,
column_statistics: table_cols_stats,
}
}

fn projected_schema(&self) -> Arc<Schema> {
let table_fields: Vec<_> = self
.projection_indices()
.into_iter()
.map(|idx| {
if idx < self.file_schema.fields().len() {
self.file_schema.field(idx).clone()
} else {
let partition_idx = idx - self.file_schema.fields().len();
self.table_partition_cols[partition_idx].clone()
}
})
.collect();

Arc::new(Schema::new_with_metadata(
table_fields,
self.file_schema.metadata().clone(),
))
}

fn projected_constraints(&self) -> Constraints {
let indexes = self.projection_indices();

self.constraints
.project(&indexes)
.unwrap_or_else(Constraints::empty)
}

/// Set the projection of the files
pub fn with_projection(mut self, projection: Option<Vec<usize>>) -> Self {
self.projection = projection;
Expand Down Expand Up @@ -433,54 +492,13 @@ impl FileScanConfig {
);
}

let proj_indices = if let Some(proj) = &self.projection {
proj
} else {
let len = self.file_schema.fields().len() + self.table_partition_cols.len();
&(0..len).collect::<Vec<_>>()
};

let mut table_fields = vec![];
let mut table_cols_stats = vec![];
for idx in proj_indices {
if *idx < self.file_schema.fields().len() {
let field = self.file_schema.field(*idx);
table_fields.push(field.clone());
table_cols_stats.push(self.statistics.column_statistics[*idx].clone())
} else {
let partition_idx = idx - self.file_schema.fields().len();
table_fields.push(self.table_partition_cols[partition_idx].to_owned());
// TODO provide accurate stat for partition column (#1186)
table_cols_stats.push(ColumnStatistics::new_unknown())
}
}

let table_stats = Statistics {
num_rows: self.statistics.num_rows,
// TODO correct byte size?
total_byte_size: Precision::Absent,
column_statistics: table_cols_stats,
};

let projected_schema = Arc::new(Schema::new_with_metadata(
table_fields,
self.file_schema.metadata().clone(),
));
let schema = self.projected_schema();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this looks really nice now

let constraints = self.projected_constraints();
let stats = self.projected_stats();

let projected_constraints = self
.constraints
.project(proj_indices)
.unwrap_or_else(Constraints::empty);
let output_ordering = get_projected_output_ordering(self, &schema);

let projected_output_ordering =
get_projected_output_ordering(self, &projected_schema);

(
projected_schema,
projected_constraints,
table_stats,
projected_output_ordering,
)
(schema, constraints, stats, output_ordering)
}

#[cfg_attr(not(feature = "avro"), allow(unused))] // Only used by avro
Expand Down Expand Up @@ -1048,6 +1066,7 @@ mod tests {
compute::SortOptions,
};

use datafusion_common::stats::Precision;
use datafusion_common::{assert_batches_eq, DFSchema};
use datafusion_expr::{execution_props::ExecutionProps, SortExpr};
use datafusion_physical_expr::create_physical_expr;
Expand Down Expand Up @@ -1203,6 +1222,12 @@ mod tests {
),
];
// create a projected schema
let statistics = Statistics {
num_rows: Precision::Inexact(3),
total_byte_size: Precision::Absent,
column_statistics: Statistics::unknown_column(&file_batch.schema()),
};

let conf = config_for_projection(
file_batch.schema(),
// keep all cols from file and 2 from partitioning
Expand All @@ -1213,9 +1238,23 @@ mod tests {
file_batch.schema().fields().len(),
file_batch.schema().fields().len() + 2,
]),
Statistics::new_unknown(&file_batch.schema()),
statistics.clone(),
to_partition_cols(partition_cols.clone()),
);

let source_statistics = conf.file_source.statistics().unwrap();
let conf_stats = conf.statistics().unwrap();

// projection should be reflected in the file source statistics
assert_eq!(conf_stats.num_rows, Precision::Inexact(3));

// 3 original statistics + 2 partition statistics
assert_eq!(conf_stats.column_statistics.len(), 5);

// file statics should not be modified
assert_eq!(source_statistics, statistics);
assert_eq!(source_statistics.column_statistics.len(), 3);

let (proj_schema, ..) = conf.project();
// created a projector for that projected schema
let mut proj = PartitionColumnProjector::new(
Expand Down
Loading