diff --git a/datafusion-examples/examples/advanced_parquet_index.rs b/datafusion-examples/examples/advanced_parquet_index.rs index d6cf61c61d73..b8c303e22161 100644 --- a/datafusion-examples/examples/advanced_parquet_index.rs +++ b/datafusion-examples/examples/advanced_parquet_index.rs @@ -30,7 +30,7 @@ use datafusion::common::{ use datafusion::datasource::listing::PartitionedFile; use datafusion::datasource::physical_plan::parquet::ParquetAccessPlan; use datafusion::datasource::physical_plan::{ - FileMeta, FileScanConfig, ParquetFileReaderFactory, ParquetSource, + FileMeta, FileScanConfigBuilder, ParquetFileReaderFactory, ParquetSource, }; use datafusion::datasource::TableProvider; use datafusion::execution::object_store::ObjectStoreUrl; @@ -55,6 +55,7 @@ use arrow::array::{ArrayRef, Int32Array, RecordBatch, StringArray}; use arrow::datatypes::SchemaRef; use async_trait::async_trait; use bytes::Bytes; +use datafusion::datasource::memory::DataSourceExec; use futures::future::BoxFuture; use futures::FutureExt; use object_store::ObjectStore; @@ -498,13 +499,15 @@ impl TableProvider for IndexTableProvider { // provide the factory to create parquet reader without re-reading metadata .with_parquet_file_reader_factory(Arc::new(reader_factory)), ); - let file_scan_config = FileScanConfig::new(object_store_url, schema, file_source) - .with_limit(limit) - .with_projection(projection.cloned()) - .with_file(partitioned_file); + let file_scan_config = + FileScanConfigBuilder::new(object_store_url, schema, file_source) + .with_limit(limit) + .with_projection(projection.cloned()) + .with_file(partitioned_file) + .build(); // Finally, put it all together into a DataSourceExec - Ok(file_scan_config.build()) + Ok(DataSourceExec::from_data_source(file_scan_config)) } /// Tell DataFusion to push filters down to the scan method diff --git a/datafusion-examples/examples/csv_json_opener.rs b/datafusion-examples/examples/csv_json_opener.rs index 6dc38a436a0c..1a2c2cbff418 100644 --- a/datafusion-examples/examples/csv_json_opener.rs +++ b/datafusion-examples/examples/csv_json_opener.rs @@ -24,15 +24,14 @@ use datafusion::{ file_format::file_compression_type::FileCompressionType, listing::PartitionedFile, object_store::ObjectStoreUrl, - physical_plan::{ - CsvSource, FileScanConfig, FileSource, FileStream, JsonOpener, JsonSource, - }, + physical_plan::{CsvSource, FileSource, FileStream, JsonOpener, JsonSource}, }, error::Result, physical_plan::metrics::ExecutionPlanMetricsSet, test_util::aggr_test_schema, }; +use datafusion::datasource::physical_plan::FileScanConfigBuilder; use futures::StreamExt; use object_store::{local::LocalFileSystem, memory::InMemory, ObjectStore}; @@ -56,14 +55,15 @@ async fn csv_opener() -> Result<()> { let path = std::path::Path::new(&path).canonicalize()?; - let scan_config = FileScanConfig::new( + let scan_config = FileScanConfigBuilder::new( ObjectStoreUrl::local_filesystem(), Arc::clone(&schema), Arc::new(CsvSource::default()), ) .with_projection(Some(vec![12, 0])) .with_limit(Some(5)) - .with_file(PartitionedFile::new(path.display().to_string(), 10)); + .with_file(PartitionedFile::new(path.display().to_string(), 10)) + .build(); let config = CsvSource::new(true, b',', b'"') .with_comment(Some(b'#')) @@ -121,14 +121,15 @@ async fn json_opener() -> Result<()> { Arc::new(object_store), ); - let scan_config = FileScanConfig::new( + let scan_config = FileScanConfigBuilder::new( ObjectStoreUrl::local_filesystem(), schema, Arc::new(JsonSource::default()), ) .with_projection(Some(vec![1, 0])) .with_limit(Some(5)) - .with_file(PartitionedFile::new(path.to_string(), 10)); + .with_file(PartitionedFile::new(path.to_string(), 10)) + .build(); let mut stream = FileStream::new( &scan_config, diff --git a/datafusion-examples/examples/parquet_index.rs b/datafusion-examples/examples/parquet_index.rs index 3851dca2a775..0b6bccc27b1d 100644 --- a/datafusion-examples/examples/parquet_index.rs +++ b/datafusion-examples/examples/parquet_index.rs @@ -27,7 +27,8 @@ use datafusion::common::{ internal_datafusion_err, DFSchema, DataFusionError, Result, ScalarValue, }; use datafusion::datasource::listing::PartitionedFile; -use datafusion::datasource::physical_plan::{FileScanConfig, ParquetSource}; +use datafusion::datasource::memory::DataSourceExec; +use datafusion::datasource::physical_plan::{FileScanConfigBuilder, ParquetSource}; use datafusion::datasource::TableProvider; use datafusion::execution::object_store::ObjectStoreUrl; use datafusion::logical_expr::{ @@ -243,8 +244,8 @@ impl TableProvider for IndexTableProvider { let object_store_url = ObjectStoreUrl::parse("file://")?; let source = Arc::new(ParquetSource::default().with_predicate(self.schema(), predicate)); - let mut file_scan_config = - FileScanConfig::new(object_store_url, self.schema(), source) + let mut file_scan_config_builder = + FileScanConfigBuilder::new(object_store_url, self.schema(), source) .with_projection(projection.cloned()) .with_limit(limit); @@ -253,12 +254,13 @@ impl TableProvider for IndexTableProvider { for (file_name, file_size) in files { let path = self.dir.join(file_name); let canonical_path = fs::canonicalize(path)?; - file_scan_config = file_scan_config.with_file(PartitionedFile::new( - canonical_path.display().to_string(), - file_size, - )); + file_scan_config_builder = file_scan_config_builder.with_file( + PartitionedFile::new(canonical_path.display().to_string(), file_size), + ); } - Ok(file_scan_config.build()) + Ok(DataSourceExec::from_data_source( + file_scan_config_builder.build(), + )) } /// Tell DataFusion to push filters down to the scan method diff --git a/datafusion/core/src/datasource/file_format/arrow.rs b/datafusion/core/src/datasource/file_format/arrow.rs index 7c41855110e0..6c7c9463cf3b 100644 --- a/datafusion/core/src/datasource/file_format/arrow.rs +++ b/datafusion/core/src/datasource/file_format/arrow.rs @@ -49,7 +49,7 @@ use datafusion_common::{ use datafusion_common_runtime::{JoinSet, SpawnedTask}; use datafusion_datasource::display::FileGroupDisplay; use datafusion_datasource::file::FileSource; -use datafusion_datasource::file_scan_config::FileScanConfig; +use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder}; use datafusion_datasource::sink::{DataSink, DataSinkExec}; use datafusion_execution::{SendableRecordBatchStream, TaskContext}; use datafusion_expr::dml::InsertOp; @@ -58,6 +58,7 @@ use datafusion_physical_expr_common::sort_expr::LexRequirement; use async_trait::async_trait; use bytes::Bytes; +use datafusion_datasource::source::DataSourceExec; use futures::stream::BoxStream; use futures::StreamExt; use object_store::{GetResultPayload, ObjectMeta, ObjectStore}; @@ -173,7 +174,12 @@ impl FileFormat for ArrowFormat { conf: FileScanConfig, _filters: Option<&Arc>, ) -> Result> { - Ok(conf.with_source(Arc::new(ArrowSource::default())).build()) + let source = Arc::new(ArrowSource::default()); + let config = FileScanConfigBuilder::from(conf) + .with_source(source) + .build(); + + Ok(DataSourceExec::from_data_source(config)) } async fn create_writer_physical_plan( diff --git a/datafusion/core/src/datasource/file_format/mod.rs b/datafusion/core/src/datasource/file_format/mod.rs index 4914a91722e0..e921f0158e54 100644 --- a/datafusion/core/src/datasource/file_format/mod.rs +++ b/datafusion/core/src/datasource/file_format/mod.rs @@ -40,9 +40,8 @@ pub(crate) mod test_util { use datafusion_catalog::Session; use datafusion_common::Result; - use datafusion_datasource::{ - file_format::FileFormat, file_scan_config::FileScanConfig, PartitionedFile, - }; + use datafusion_datasource::file_scan_config::FileScanConfigBuilder; + use datafusion_datasource::{file_format::FileFormat, PartitionedFile}; use datafusion_execution::object_store::ObjectStoreUrl; use crate::test::object_store::local_unpartitioned_file; @@ -79,7 +78,7 @@ pub(crate) mod test_util { let exec = format .create_physical_plan( state, - FileScanConfig::new( + FileScanConfigBuilder::new( ObjectStoreUrl::local_filesystem(), file_schema, format.file_source(), @@ -87,7 +86,8 @@ pub(crate) mod test_util { .with_file_groups(file_groups) .with_statistics(statistics) .with_projection(projection) - .with_limit(limit), + .with_limit(limit) + .build(), None, ) .await?; diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 56d662571fd3..61eeb419a480 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -34,7 +34,7 @@ use crate::datasource::{ use crate::execution::context::SessionState; use datafusion_catalog::TableProvider; use datafusion_common::{config_err, DataFusionError, Result}; -use datafusion_datasource::file_scan_config::FileScanConfig; +use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder}; use datafusion_expr::dml::InsertOp; use datafusion_expr::{utils::conjunction, Expr, TableProviderFilterPushDown}; use datafusion_expr::{SortExpr, TableType}; @@ -942,7 +942,7 @@ impl TableProvider for ListingTable { .format .create_physical_plan( session_state, - FileScanConfig::new( + FileScanConfigBuilder::new( object_store_url, Arc::clone(&self.file_schema), self.options.format.file_source(), @@ -953,7 +953,8 @@ impl TableProvider for ListingTable { .with_projection(projection.cloned()) .with_limit(limit) .with_output_ordering(output_ordering) - .with_table_partition_cols(table_partition_cols), + .with_table_partition_cols(table_partition_cols) + .build(), filters.as_ref(), ) .await diff --git a/datafusion/core/src/datasource/memory.rs b/datafusion/core/src/datasource/memory.rs index 27352c5146ce..0288cd3e8bc7 100644 --- a/datafusion/core/src/datasource/memory.rs +++ b/datafusion/core/src/datasource/memory.rs @@ -251,7 +251,7 @@ impl TableProvider for MemTable { source = source.try_with_sort_information(file_sort_order)?; } - Ok(Arc::new(DataSourceExec::new(Arc::new(source)))) + Ok(DataSourceExec::from_data_source(source)) } /// Returns an ExecutionPlan that inserts the execution results of a given [`ExecutionPlan`] into this [`MemTable`]. diff --git a/datafusion/core/src/datasource/mod.rs b/datafusion/core/src/datasource/mod.rs index 3d86e32816b5..35a451cbc803 100644 --- a/datafusion/core/src/datasource/mod.rs +++ b/datafusion/core/src/datasource/mod.rs @@ -61,7 +61,7 @@ mod tests { use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use arrow::record_batch::RecordBatch; use datafusion_common::test_util::batches_to_sort_string; - use datafusion_datasource::file_scan_config::FileScanConfig; + use datafusion_datasource::file_scan_config::FileScanConfigBuilder; use datafusion_datasource::schema_adapter::{ DefaultSchemaAdapterFactory, SchemaAdapter, SchemaAdapterFactory, SchemaMapper, }; @@ -72,6 +72,7 @@ mod tests { use ::object_store::path::Path; use ::object_store::ObjectMeta; + use datafusion_datasource::source::DataSourceExec; use datafusion_physical_plan::collect; use tempfile::TempDir; @@ -128,11 +129,15 @@ mod tests { ParquetSource::default() .with_schema_adapter_factory(Arc::new(TestSchemaAdapterFactory {})), ); - let base_conf = - FileScanConfig::new(ObjectStoreUrl::local_filesystem(), schema, source) - .with_file(partitioned_file); + let base_conf = FileScanConfigBuilder::new( + ObjectStoreUrl::local_filesystem(), + schema, + source, + ) + .with_file(partitioned_file) + .build(); - let parquet_exec = base_conf.build(); + let parquet_exec = DataSourceExec::from_data_source(base_conf); let session_ctx = SessionContext::new(); let task_ctx = session_ctx.task_ctx(); diff --git a/datafusion/core/src/datasource/physical_plan/avro.rs b/datafusion/core/src/datasource/physical_plan/avro.rs index 879fa45f4e6e..8a00af959ccc 100644 --- a/datafusion/core/src/datasource/physical_plan/avro.rs +++ b/datafusion/core/src/datasource/physical_plan/avro.rs @@ -33,13 +33,14 @@ mod tests { use datafusion_common::test_util::batches_to_string; use datafusion_common::{test_util, Result, ScalarValue}; use datafusion_datasource::file_format::FileFormat; - use datafusion_datasource::file_scan_config::FileScanConfig; + use datafusion_datasource::file_scan_config::FileScanConfigBuilder; use datafusion_datasource::PartitionedFile; use datafusion_datasource_avro::source::AvroSource; use datafusion_datasource_avro::AvroFormat; use datafusion_execution::object_store::ObjectStoreUrl; use datafusion_physical_plan::ExecutionPlan; + use datafusion_datasource::source::DataSourceExec; use futures::StreamExt; use insta::assert_snapshot; use object_store::chunked::ChunkedStore; @@ -81,12 +82,16 @@ mod tests { .await?; let source = Arc::new(AvroSource::new()); - let conf = - FileScanConfig::new(ObjectStoreUrl::local_filesystem(), file_schema, source) - .with_file(meta.into()) - .with_projection(Some(vec![0, 1, 2])); - - let source_exec = conf.build(); + let conf = FileScanConfigBuilder::new( + ObjectStoreUrl::local_filesystem(), + file_schema, + source, + ) + .with_file(meta.into()) + .with_projection(Some(vec![0, 1, 2])) + .build(); + + let source_exec = DataSourceExec::from_data_source(conf); assert_eq!( source_exec .properties() @@ -153,11 +158,12 @@ mod tests { let projection = Some(vec![0, 1, 2, actual_schema.fields().len()]); let source = Arc::new(AvroSource::new()); - let conf = FileScanConfig::new(object_store_url, file_schema, source) + let conf = FileScanConfigBuilder::new(object_store_url, file_schema, source) .with_file(meta.into()) - .with_projection(projection); + .with_projection(projection) + .build(); - let source_exec = conf.build(); + let source_exec = DataSourceExec::from_data_source(conf); assert_eq!( source_exec .properties() @@ -222,14 +228,15 @@ mod tests { let projection = Some(vec![0, 1, file_schema.fields().len(), 2]); let source = Arc::new(AvroSource::new()); - let conf = FileScanConfig::new(object_store_url, file_schema, source) + let conf = FileScanConfigBuilder::new(object_store_url, file_schema, source) // select specific columns of the files as well as the partitioning // column which is supposed to be the last column in the table schema. .with_projection(projection) .with_file(partitioned_file) - .with_table_partition_cols(vec![Field::new("date", DataType::Utf8, false)]); + .with_table_partition_cols(vec![Field::new("date", DataType::Utf8, false)]) + .build(); - let source_exec = conf.build(); + let source_exec = DataSourceExec::from_data_source(conf); assert_eq!( source_exec diff --git a/datafusion/core/src/datasource/physical_plan/csv.rs b/datafusion/core/src/datasource/physical_plan/csv.rs index 1f5a395c0548..5914924797dc 100644 --- a/datafusion/core/src/datasource/physical_plan/csv.rs +++ b/datafusion/core/src/datasource/physical_plan/csv.rs @@ -50,6 +50,8 @@ mod tests { use arrow::datatypes::*; use bytes::Bytes; + use datafusion_datasource::file_scan_config::FileScanConfigBuilder; + use datafusion_datasource::source::DataSourceExec; use insta::assert_snapshot; use object_store::chunked::ChunkedStore; use object_store::local::LocalFileSystem; @@ -109,13 +111,18 @@ mod tests { )?; let source = Arc::new(CsvSource::new(true, b',', b'"')); - let config = partitioned_csv_config(file_schema, file_groups, source) - .with_file_compression_type(file_compression_type) - .with_newlines_in_values(false) - .with_projection(Some(vec![0, 2, 4])); + let config = FileScanConfigBuilder::from(partitioned_csv_config( + file_schema, + file_groups, + source, + )) + .with_file_compression_type(file_compression_type) + .with_newlines_in_values(false) + .with_projection(Some(vec![0, 2, 4])) + .build(); assert_eq!(13, config.file_schema.fields().len()); - let csv = config.build(); + let csv = DataSourceExec::from_data_source(config); assert_eq!(3, csv.schema().fields().len()); @@ -169,12 +176,17 @@ mod tests { )?; let source = Arc::new(CsvSource::new(true, b',', b'"')); - let config = partitioned_csv_config(file_schema, file_groups, source) - .with_newlines_in_values(false) - .with_file_compression_type(file_compression_type.to_owned()) - .with_projection(Some(vec![4, 0, 2])); + let config = FileScanConfigBuilder::from(partitioned_csv_config( + file_schema, + file_groups, + source, + )) + .with_newlines_in_values(false) + .with_file_compression_type(file_compression_type.to_owned()) + .with_projection(Some(vec![4, 0, 2])) + .build(); assert_eq!(13, config.file_schema.fields().len()); - let csv = config.build(); + let csv = DataSourceExec::from_data_source(config); assert_eq!(3, csv.schema().fields().len()); let mut stream = csv.execute(0, task_ctx)?; @@ -229,12 +241,17 @@ mod tests { )?; let source = Arc::new(CsvSource::new(true, b',', b'"')); - let config = partitioned_csv_config(file_schema, file_groups, source) - .with_newlines_in_values(false) - .with_file_compression_type(file_compression_type.to_owned()) - .with_limit(Some(5)); + let config = FileScanConfigBuilder::from(partitioned_csv_config( + file_schema, + file_groups, + source, + )) + .with_newlines_in_values(false) + .with_file_compression_type(file_compression_type.to_owned()) + .with_limit(Some(5)) + .build(); assert_eq!(13, config.file_schema.fields().len()); - let csv = config.build(); + let csv = DataSourceExec::from_data_source(config); assert_eq!(13, csv.schema().fields().len()); let mut it = csv.execute(0, task_ctx)?; @@ -287,12 +304,17 @@ mod tests { )?; let source = Arc::new(CsvSource::new(true, b',', b'"')); - let config = partitioned_csv_config(file_schema, file_groups, source) - .with_newlines_in_values(false) - .with_file_compression_type(file_compression_type.to_owned()) - .with_limit(Some(5)); + let config = FileScanConfigBuilder::from(partitioned_csv_config( + file_schema, + file_groups, + source, + )) + .with_newlines_in_values(false) + .with_file_compression_type(file_compression_type.to_owned()) + .with_limit(Some(5)) + .build(); assert_eq!(14, config.file_schema.fields().len()); - let csv = config.build(); + let csv = DataSourceExec::from_data_source(config); assert_eq!(14, csv.schema().fields().len()); // errors due to https://github.com/apache/datafusion/issues/4918 @@ -337,9 +359,14 @@ mod tests { )?; let source = Arc::new(CsvSource::new(true, b',', b'"')); - let mut config = partitioned_csv_config(file_schema, file_groups, source) - .with_newlines_in_values(false) - .with_file_compression_type(file_compression_type.to_owned()); + let mut config = FileScanConfigBuilder::from(partitioned_csv_config( + file_schema, + file_groups, + source, + )) + .with_newlines_in_values(false) + .with_file_compression_type(file_compression_type.to_owned()) + .build(); // Add partition columns config.table_partition_cols = vec![Field::new("date", DataType::Utf8, false)]; @@ -353,7 +380,7 @@ mod tests { // partitions are resolved during scan anyway assert_eq!(13, config.file_schema.fields().len()); - let csv = config.build(); + let csv = DataSourceExec::from_data_source(config); assert_eq!(2, csv.schema().fields().len()); let mut it = csv.execute(0, task_ctx)?; @@ -437,10 +464,15 @@ mod tests { .unwrap(); let source = Arc::new(CsvSource::new(true, b',', b'"')); - let config = partitioned_csv_config(file_schema, file_groups, source) - .with_newlines_in_values(false) - .with_file_compression_type(file_compression_type.to_owned()); - let csv = config.build(); + let config = FileScanConfigBuilder::from(partitioned_csv_config( + file_schema, + file_groups, + source, + )) + .with_newlines_in_values(false) + .with_file_compression_type(file_compression_type.to_owned()) + .build(); + let csv = DataSourceExec::from_data_source(config); let it = csv.execute(0, task_ctx).unwrap(); let batches: Vec<_> = it.try_collect().await.unwrap(); diff --git a/datafusion/core/src/datasource/physical_plan/json.rs b/datafusion/core/src/datasource/physical_plan/json.rs index 398a842a4f5d..910c4316d973 100644 --- a/datafusion/core/src/datasource/physical_plan/json.rs +++ b/datafusion/core/src/datasource/physical_plan/json.rs @@ -49,6 +49,8 @@ mod tests { use arrow::datatypes::SchemaRef; use arrow::datatypes::{Field, SchemaBuilder}; use datafusion_datasource::file_groups::FileGroup; + use datafusion_datasource::file_scan_config::FileScanConfigBuilder; + use datafusion_datasource::source::DataSourceExec; use insta::assert_snapshot; use object_store::chunked::ChunkedStore; use object_store::local::LocalFileSystem; @@ -168,7 +170,7 @@ mod tests { let state = session_ctx.state(); let task_ctx = session_ctx.task_ctx(); use arrow::datatypes::DataType; - use datafusion_datasource::file_scan_config::FileScanConfig; + use futures::StreamExt; let tmp_dir = TempDir::new()?; @@ -176,11 +178,12 @@ mod tests { prepare_store(&state, file_compression_type.to_owned(), tmp_dir.path()).await; let source = Arc::new(JsonSource::new()); - let conf = FileScanConfig::new(object_store_url, file_schema, source) + let conf = FileScanConfigBuilder::new(object_store_url, file_schema, source) .with_file_groups(file_groups) .with_limit(Some(3)) - .with_file_compression_type(file_compression_type.to_owned()); - let exec = conf.build(); + .with_file_compression_type(file_compression_type.to_owned()) + .build(); + let exec = DataSourceExec::from_data_source(conf); // TODO: this is not where schema inference should be tested @@ -232,7 +235,7 @@ mod tests { file_compression_type: FileCompressionType, ) -> Result<()> { use arrow::datatypes::DataType; - use datafusion_datasource::file_scan_config::FileScanConfig; + use futures::StreamExt; let session_ctx = SessionContext::new(); @@ -250,11 +253,12 @@ mod tests { let missing_field_idx = file_schema.fields.len() - 1; let source = Arc::new(JsonSource::new()); - let conf = FileScanConfig::new(object_store_url, file_schema, source) + let conf = FileScanConfigBuilder::new(object_store_url, file_schema, source) .with_file_groups(file_groups) .with_limit(Some(3)) - .with_file_compression_type(file_compression_type.to_owned()); - let exec = conf.build(); + .with_file_compression_type(file_compression_type.to_owned()) + .build(); + let exec = DataSourceExec::from_data_source(conf); let mut it = exec.execute(0, task_ctx)?; let batch = it.next().await.unwrap()?; @@ -282,7 +286,6 @@ mod tests { async fn nd_json_exec_file_projection( file_compression_type: FileCompressionType, ) -> Result<()> { - use datafusion_datasource::file_scan_config::FileScanConfig; use futures::StreamExt; let session_ctx = SessionContext::new(); @@ -293,11 +296,12 @@ mod tests { prepare_store(&state, file_compression_type.to_owned(), tmp_dir.path()).await; let source = Arc::new(JsonSource::new()); - let conf = FileScanConfig::new(object_store_url, file_schema, source) + let conf = FileScanConfigBuilder::new(object_store_url, file_schema, source) .with_file_groups(file_groups) .with_projection(Some(vec![0, 2])) - .with_file_compression_type(file_compression_type.to_owned()); - let exec = conf.build(); + .with_file_compression_type(file_compression_type.to_owned()) + .build(); + let exec = DataSourceExec::from_data_source(conf); let inferred_schema = exec.schema(); assert_eq!(inferred_schema.fields().len(), 2); @@ -330,7 +334,6 @@ mod tests { async fn nd_json_exec_file_mixed_order_projection( file_compression_type: FileCompressionType, ) -> Result<()> { - use datafusion_datasource::file_scan_config::FileScanConfig; use futures::StreamExt; let session_ctx = SessionContext::new(); @@ -341,11 +344,12 @@ mod tests { prepare_store(&state, file_compression_type.to_owned(), tmp_dir.path()).await; let source = Arc::new(JsonSource::new()); - let conf = FileScanConfig::new(object_store_url, file_schema, source) + let conf = FileScanConfigBuilder::new(object_store_url, file_schema, source) .with_file_groups(file_groups) .with_projection(Some(vec![3, 0, 2])) - .with_file_compression_type(file_compression_type.to_owned()); - let exec = conf.build(); + .with_file_compression_type(file_compression_type.to_owned()) + .build(); + let exec = DataSourceExec::from_data_source(conf); let inferred_schema = exec.schema(); assert_eq!(inferred_schema.fields().len(), 3); diff --git a/datafusion/core/src/datasource/physical_plan/mod.rs b/datafusion/core/src/datasource/physical_plan/mod.rs index 9d8148ca7560..e3f237803b34 100644 --- a/datafusion/core/src/datasource/physical_plan/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/mod.rs @@ -58,6 +58,7 @@ pub use datafusion_datasource::file_groups::FileGroupPartitioner; pub use datafusion_datasource::file_meta::FileMeta; pub use datafusion_datasource::file_scan_config::{ wrap_partition_type_in_dict, wrap_partition_value_in_dict, FileScanConfig, + FileScanConfigBuilder, }; pub use datafusion_datasource::file_sink_config::*; diff --git a/datafusion/core/src/datasource/physical_plan/parquet.rs b/datafusion/core/src/datasource/physical_plan/parquet.rs index affb90b7262e..9e1b2822e854 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet.rs @@ -50,7 +50,7 @@ mod tests { use datafusion_common::{assert_contains, Result, ScalarValue}; use datafusion_datasource::file_format::FileFormat; use datafusion_datasource::file_meta::FileMeta; - use datafusion_datasource::file_scan_config::FileScanConfig; + use datafusion_datasource::file_scan_config::FileScanConfigBuilder; use datafusion_datasource::source::DataSourceExec; use datafusion_datasource::{FileRange, PartitionedFile}; @@ -180,17 +180,19 @@ mod tests { source = source.with_enable_page_index(true); } - let base_config = FileScanConfig::new( + let base_config = FileScanConfigBuilder::new( ObjectStoreUrl::local_filesystem(), file_schema, Arc::new(source.clone()), ) .with_file_group(file_group) - .with_projection(projection); + .with_projection(projection) + .build(); let session_ctx = SessionContext::new(); let task_ctx = session_ctx.task_ctx(); - let parquet_exec = base_config.clone().build(); + + let parquet_exec = DataSourceExec::from_data_source(base_config.clone()); RoundTripResult { batches: collect(parquet_exec.clone(), task_ctx).await, parquet_exec, @@ -1118,13 +1120,15 @@ mod tests { expected_row_num: Option, file_schema: SchemaRef, ) -> Result<()> { - let parquet_exec = FileScanConfig::new( + let config = FileScanConfigBuilder::new( ObjectStoreUrl::local_filesystem(), file_schema, Arc::new(ParquetSource::default()), ) .with_file_groups(file_groups) .build(); + + let parquet_exec = DataSourceExec::from_data_source(config); assert_eq!( parquet_exec .properties() @@ -1222,7 +1226,7 @@ mod tests { ]); let source = Arc::new(ParquetSource::default()); - let parquet_exec = FileScanConfig::new(object_store_url, schema.clone(), source) + let config = FileScanConfigBuilder::new(object_store_url, schema.clone(), source) .with_file(partitioned_file) // file has 10 cols so index 12 should be month and 13 should be day .with_projection(Some(vec![0, 1, 2, 12, 13])) @@ -1239,6 +1243,8 @@ mod tests { ), ]) .build(); + + let parquet_exec = DataSourceExec::from_data_source(config); let partition_count = parquet_exec .data_source() .output_partitioning() @@ -1295,7 +1301,7 @@ mod tests { }; let file_schema = Arc::new(Schema::empty()); - let parquet_exec = FileScanConfig::new( + let config = FileScanConfigBuilder::new( ObjectStoreUrl::local_filesystem(), file_schema, Arc::new(ParquetSource::default()), @@ -1303,6 +1309,8 @@ mod tests { .with_file(partitioned_file) .build(); + let parquet_exec = DataSourceExec::from_data_source(config); + let mut results = parquet_exec.execute(0, state.task_ctx())?; let batch = results.next().await.unwrap(); // invalid file should produce an error to that effect @@ -1925,7 +1933,7 @@ mod tests { .with_parquet_file_reader_factory(reader_factory) .with_metadata_size_hint(456), ); - let exec = FileScanConfig::new(store_url, schema, source) + let config = FileScanConfigBuilder::new(store_url, schema, source) .with_file( PartitionedFile { object_meta: ObjectMeta { @@ -1959,6 +1967,8 @@ mod tests { }) .build(); + let exec = DataSourceExec::from_data_source(config); + let res = collect(exec, ctx.task_ctx()).await.unwrap(); assert_eq!(res.len(), 2); diff --git a/datafusion/core/src/test/mod.rs b/datafusion/core/src/test/mod.rs index 2912eed6c914..8719a16f4919 100644 --- a/datafusion/core/src/test/mod.rs +++ b/datafusion/core/src/test/mod.rs @@ -46,6 +46,7 @@ use bzip2::write::BzEncoder; #[cfg(feature = "compression")] use bzip2::Compression as BzCompression; use datafusion_datasource::file_groups::FileGroup; +use datafusion_datasource::file_scan_config::FileScanConfigBuilder; use datafusion_datasource_csv::partitioned_csv_config; #[cfg(feature = "compression")] use flate2::write::GzEncoder; @@ -91,9 +92,11 @@ pub fn scan_partitioned_csv( work_dir, )?; let source = Arc::new(CsvSource::new(true, b'"', b'"')); - let config = partitioned_csv_config(schema, file_groups, source) - .with_file_compression_type(FileCompressionType::UNCOMPRESSED); - Ok(config.build()) + let config = + FileScanConfigBuilder::from(partitioned_csv_config(schema, file_groups, source)) + .with_file_compression_type(FileCompressionType::UNCOMPRESSED) + .build(); + Ok(DataSourceExec::from_data_source(config)) } /// Returns file groups [`Vec`] for scanning `partitions` of `filename` diff --git a/datafusion/core/src/test_util/parquet.rs b/datafusion/core/src/test_util/parquet.rs index 8c046460c2c7..084554eecbdb 100644 --- a/datafusion/core/src/test_util/parquet.rs +++ b/datafusion/core/src/test_util/parquet.rs @@ -37,7 +37,7 @@ use crate::physical_plan::metrics::MetricsSet; use crate::physical_plan::ExecutionPlan; use crate::prelude::{Expr, SessionConfig, SessionContext}; -use datafusion_datasource::file_scan_config::FileScanConfig; +use datafusion_datasource::file_scan_config::FileScanConfigBuilder; use datafusion_datasource::source::DataSourceExec; use object_store::path::Path; use object_store::ObjectMeta; @@ -157,7 +157,7 @@ impl TestParquetFile { ) -> Result> { let parquet_options = ctx.copied_table_options().parquet; let source = Arc::new(ParquetSource::new(parquet_options.clone())); - let scan_config = FileScanConfig::new( + let scan_config_builder = FileScanConfigBuilder::new( self.object_store_url.clone(), Arc::clone(&self.schema), source, @@ -183,15 +183,17 @@ impl TestParquetFile { create_physical_expr(&filter, &df_schema, &ExecutionProps::default())?; let source = Arc::new(ParquetSource::new(parquet_options).with_predicate( - Arc::clone(&scan_config.file_schema), + Arc::clone(&self.schema), Arc::clone(&physical_filter_expr), )); - let parquet_exec = scan_config.with_source(source).build(); + let config = scan_config_builder.with_source(source).build(); + let parquet_exec = DataSourceExec::from_data_source(config); let exec = Arc::new(FilterExec::try_new(physical_filter_expr, parquet_exec)?); Ok(exec) } else { - Ok(scan_config.build()) + let config = scan_config_builder.build(); + Ok(DataSourceExec::from_data_source(config)) } } diff --git a/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs b/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs index 1b98a19581ea..dcf477135a37 100644 --- a/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs @@ -353,12 +353,12 @@ async fn run_aggregate_test(input1: Vec, group_by_columns: Vec<&str ) .unwrap(); - let running_source = Arc::new(DataSourceExec::new(Arc::new( + let running_source = DataSourceExec::from_data_source( MemorySourceConfig::try_new(&[input1.clone()], schema.clone(), None) .unwrap() .try_with_sort_information(vec![sort_keys]) .unwrap(), - ))); + ); let aggregate_expr = vec![ diff --git a/datafusion/core/tests/fuzz_cases/pruning.rs b/datafusion/core/tests/fuzz_cases/pruning.rs index f87572631b28..11dd961a54ee 100644 --- a/datafusion/core/tests/fuzz_cases/pruning.rs +++ b/datafusion/core/tests/fuzz_cases/pruning.rs @@ -21,13 +21,12 @@ use arrow::array::{Array, RecordBatch, StringArray}; use arrow::datatypes::{DataType, Field, Schema}; use bytes::{BufMut, Bytes, BytesMut}; use datafusion::{ - datasource::{ - listing::PartitionedFile, - physical_plan::{FileScanConfig, ParquetSource}, - }, + datasource::{listing::PartitionedFile, physical_plan::ParquetSource}, prelude::*, }; use datafusion_common::DFSchema; +use datafusion_datasource::file_scan_config::FileScanConfigBuilder; +use datafusion_datasource::source::DataSourceExec; use datafusion_execution::object_store::ObjectStoreUrl; use datafusion_physical_expr::PhysicalExpr; use datafusion_physical_plan::{collect, filter::FilterExec, ExecutionPlan}; @@ -281,7 +280,7 @@ async fn execute_with_predicate( } else { ParquetSource::default() }; - let scan = FileScanConfig::new( + let config = FileScanConfigBuilder::new( ObjectStoreUrl::parse("memory://").unwrap(), schema.clone(), Arc::new(parquet_source), @@ -293,8 +292,9 @@ async fn execute_with_predicate( PartitionedFile::new(test_file.path.clone(), test_file.size as u64) }) .collect(), - ); - let exec = scan.build(); + ) + .build(); + let exec = DataSourceExec::from_data_source(config); let exec = Arc::new(FilterExec::try_new(predicate, exec).unwrap()) as Arc; diff --git a/datafusion/core/tests/fuzz_cases/window_fuzz.rs b/datafusion/core/tests/fuzz_cases/window_fuzz.rs index a7f9e38c9ae3..6b166dd32782 100644 --- a/datafusion/core/tests/fuzz_cases/window_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/window_fuzz.rs @@ -636,10 +636,10 @@ async fn run_window_test( options: Default::default(), }, ]); - let mut exec1 = Arc::new(DataSourceExec::new(Arc::new( + let mut exec1 = DataSourceExec::from_data_source( MemorySourceConfig::try_new(&[vec![concat_input_record]], schema.clone(), None)? .try_with_sort_information(vec![source_sort_keys.clone()])?, - ))) as _; + ) as _; // Table is ordered according to ORDER BY a, b, c In linear test we use PARTITION BY b, ORDER BY a // For WindowAggExec to produce correct result it need table to be ordered by b,a. Hence add a sort. if is_linear { @@ -662,10 +662,10 @@ async fn run_window_test( exec1, false, )?) as _; - let exec2 = Arc::new(DataSourceExec::new(Arc::new( + let exec2 = DataSourceExec::from_data_source( MemorySourceConfig::try_new(&[input1.clone()], schema.clone(), None)? .try_with_sort_information(vec![source_sort_keys.clone()])?, - ))); + ); let running_window_exec = Arc::new(BoundedWindowAggExec::try_new( vec![create_window_expr( &window_fn, diff --git a/datafusion/core/tests/memory_limit/mod.rs b/datafusion/core/tests/memory_limit/mod.rs index 6a0a797d4ded..dd5acc8d8908 100644 --- a/datafusion/core/tests/memory_limit/mod.rs +++ b/datafusion/core/tests/memory_limit/mod.rs @@ -943,6 +943,6 @@ impl TableProvider for SortedTableProvider { )? .try_with_sort_information(self.sort_information.clone())?; - Ok(Arc::new(DataSourceExec::new(Arc::new(mem_conf)))) + Ok(DataSourceExec::from_data_source(mem_conf)) } } diff --git a/datafusion/core/tests/parquet/custom_reader.rs b/datafusion/core/tests/parquet/custom_reader.rs index 4a4059db2547..ce5c0d720174 100644 --- a/datafusion/core/tests/parquet/custom_reader.rs +++ b/datafusion/core/tests/parquet/custom_reader.rs @@ -27,7 +27,7 @@ use datafusion::datasource::file_format::parquet::fetch_parquet_metadata; use datafusion::datasource::listing::PartitionedFile; use datafusion::datasource::object_store::ObjectStoreUrl; use datafusion::datasource::physical_plan::{ - FileMeta, FileScanConfig, ParquetFileMetrics, ParquetFileReaderFactory, ParquetSource, + FileMeta, ParquetFileMetrics, ParquetFileReaderFactory, ParquetSource, }; use datafusion::physical_plan::collect; use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet; @@ -36,6 +36,8 @@ use datafusion_common::test_util::batches_to_sort_string; use datafusion_common::Result; use bytes::Bytes; +use datafusion_datasource::file_scan_config::FileScanConfigBuilder; +use datafusion_datasource::source::DataSourceExec; use futures::future::BoxFuture; use futures::{FutureExt, TryFutureExt}; use insta::assert_snapshot; @@ -83,15 +85,16 @@ async fn route_data_access_ops_to_parquet_file_reader_factory() { InMemoryParquetFileReaderFactory(Arc::clone(&in_memory_object_store)), )), ); - let base_config = FileScanConfig::new( + let base_config = FileScanConfigBuilder::new( // just any url that doesn't point to in memory object store ObjectStoreUrl::local_filesystem(), file_schema, source, ) - .with_file_group(file_group); + .with_file_group(file_group) + .build(); - let parquet_exec = base_config.build(); + let parquet_exec = DataSourceExec::from_data_source(base_config); let session_ctx = SessionContext::new(); let task_ctx = session_ctx.task_ctx(); diff --git a/datafusion/core/tests/parquet/external_access_plan.rs b/datafusion/core/tests/parquet/external_access_plan.rs index 31c685378a21..bbef073345b7 100644 --- a/datafusion/core/tests/parquet/external_access_plan.rs +++ b/datafusion/core/tests/parquet/external_access_plan.rs @@ -27,7 +27,7 @@ use arrow::datatypes::SchemaRef; use arrow::util::pretty::pretty_format_batches; use datafusion::common::Result; use datafusion::datasource::listing::PartitionedFile; -use datafusion::datasource::physical_plan::{FileScanConfig, ParquetSource}; +use datafusion::datasource::physical_plan::ParquetSource; use datafusion::prelude::SessionContext; use datafusion_common::{assert_contains, DFSchema}; use datafusion_datasource_parquet::{ParquetAccessPlan, RowGroupAccess}; @@ -36,6 +36,8 @@ use datafusion_expr::{col, lit, Expr}; use datafusion_physical_plan::metrics::MetricsSet; use datafusion_physical_plan::ExecutionPlan; +use datafusion_datasource::file_scan_config::FileScanConfigBuilder; +use datafusion_datasource::source::DataSourceExec; use parquet::arrow::arrow_reader::{RowSelection, RowSelector}; use parquet::arrow::ArrowWriter; use parquet::file::properties::WriterProperties; @@ -348,10 +350,11 @@ impl TestFull { } else { Arc::new(ParquetSource::default()) }; - let config = FileScanConfig::new(object_store_url, schema.clone(), source) - .with_file(partitioned_file); + let config = FileScanConfigBuilder::new(object_store_url, schema.clone(), source) + .with_file(partitioned_file) + .build(); - let plan: Arc = config.build(); + let plan: Arc = DataSourceExec::from_data_source(config); // run the DataSourceExec and collect the results let results = diff --git a/datafusion/core/tests/parquet/page_pruning.rs b/datafusion/core/tests/parquet/page_pruning.rs index fe96a2eb5e71..7006bf083eee 100644 --- a/datafusion/core/tests/parquet/page_pruning.rs +++ b/datafusion/core/tests/parquet/page_pruning.rs @@ -24,7 +24,7 @@ use datafusion::datasource::file_format::parquet::ParquetFormat; use datafusion::datasource::file_format::FileFormat; use datafusion::datasource::listing::PartitionedFile; use datafusion::datasource::object_store::ObjectStoreUrl; -use datafusion::datasource::physical_plan::{FileScanConfig, ParquetSource}; +use datafusion::datasource::physical_plan::ParquetSource; use datafusion::datasource::source::DataSourceExec; use datafusion::execution::context::SessionState; use datafusion::physical_plan::metrics::MetricValue; @@ -35,6 +35,7 @@ use datafusion_expr::execution_props::ExecutionProps; use datafusion_expr::{col, lit, Expr}; use datafusion_physical_expr::create_physical_expr; +use datafusion_datasource::file_scan_config::FileScanConfigBuilder; use futures::StreamExt; use object_store::path::Path; use object_store::ObjectMeta; @@ -79,8 +80,9 @@ async fn get_parquet_exec(state: &SessionState, filter: Expr) -> DataSourceExec .with_predicate(Arc::clone(&schema), predicate) .with_enable_page_index(true), ); - let base_config = - FileScanConfig::new(object_store_url, schema, source).with_file(partitioned_file); + let base_config = FileScanConfigBuilder::new(object_store_url, schema, source) + .with_file(partitioned_file) + .build(); DataSourceExec::new(Arc::new(base_config)) } diff --git a/datafusion/core/tests/parquet/schema_coercion.rs b/datafusion/core/tests/parquet/schema_coercion.rs index 85bc1104795f..59cbf4b0872e 100644 --- a/datafusion/core/tests/parquet/schema_coercion.rs +++ b/datafusion/core/tests/parquet/schema_coercion.rs @@ -22,7 +22,7 @@ use arrow::array::{ StringArray, }; use arrow::datatypes::{DataType, Field, Schema}; -use datafusion::datasource::physical_plan::{FileScanConfig, ParquetSource}; +use datafusion::datasource::physical_plan::ParquetSource; use datafusion::physical_plan::collect; use datafusion::prelude::SessionContext; use datafusion::test::object_store::local_unpartitioned_file; @@ -30,6 +30,8 @@ use datafusion_common::test_util::batches_to_sort_string; use datafusion_common::Result; use datafusion_execution::object_store::ObjectStoreUrl; +use datafusion_datasource::file_scan_config::FileScanConfigBuilder; +use datafusion_datasource::source::DataSourceExec; use insta::assert_snapshot; use object_store::ObjectMeta; use parquet::arrow::ArrowWriter; @@ -61,11 +63,15 @@ async fn multi_parquet_coercion() { Field::new("c3", DataType::Float64, true), ])); let source = Arc::new(ParquetSource::default()); - let conf = - FileScanConfig::new(ObjectStoreUrl::local_filesystem(), file_schema, source) - .with_file_group(file_group); + let conf = FileScanConfigBuilder::new( + ObjectStoreUrl::local_filesystem(), + file_schema, + source, + ) + .with_file_group(file_group) + .build(); - let parquet_exec = conf.build(); + let parquet_exec = DataSourceExec::from_data_source(conf); let session_ctx = SessionContext::new(); let task_ctx = session_ctx.task_ctx(); @@ -114,7 +120,7 @@ async fn multi_parquet_coercion_projection() { Field::new("c2", DataType::Int32, true), Field::new("c3", DataType::Float64, true), ])); - let parquet_exec = FileScanConfig::new( + let config = FileScanConfigBuilder::new( ObjectStoreUrl::local_filesystem(), file_schema, Arc::new(ParquetSource::default()), @@ -123,6 +129,8 @@ async fn multi_parquet_coercion_projection() { .with_projection(Some(vec![1, 0, 2])) .build(); + let parquet_exec = DataSourceExec::from_data_source(config); + let session_ctx = SessionContext::new(); let task_ctx = session_ctx.task_ctx(); let read = collect(parquet_exec, task_ctx).await.unwrap(); diff --git a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs index 5e3671ea83fc..9898f6204e88 100644 --- a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs @@ -30,12 +30,13 @@ use datafusion::config::ConfigOptions; use datafusion::datasource::file_format::file_compression_type::FileCompressionType; use datafusion::datasource::listing::PartitionedFile; use datafusion::datasource::object_store::ObjectStoreUrl; -use datafusion::datasource::physical_plan::{CsvSource, FileScanConfig, ParquetSource}; +use datafusion::datasource::physical_plan::{CsvSource, ParquetSource}; use datafusion::datasource::source::DataSourceExec; use datafusion_common::error::Result; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_common::ScalarValue; use datafusion_datasource::file_groups::FileGroup; +use datafusion_datasource::file_scan_config::FileScanConfigBuilder; use datafusion_expr::{JoinType, Operator}; use datafusion_physical_expr::expressions::{BinaryExpr, Column, Literal}; use datafusion_physical_expr::PhysicalExpr; @@ -184,7 +185,7 @@ fn parquet_exec_multiple() -> Arc { fn parquet_exec_multiple_sorted( output_ordering: Vec, ) -> Arc { - FileScanConfig::new( + let config = FileScanConfigBuilder::new( ObjectStoreUrl::parse("test:///").unwrap(), schema(), Arc::new(ParquetSource::default()), @@ -194,7 +195,9 @@ fn parquet_exec_multiple_sorted( FileGroup::new(vec![PartitionedFile::new("y".to_string(), 100)]), ]) .with_output_ordering(output_ordering) - .build() + .build(); + + DataSourceExec::from_data_source(config) } fn csv_exec() -> Arc { @@ -202,14 +205,16 @@ fn csv_exec() -> Arc { } fn csv_exec_with_sort(output_ordering: Vec) -> Arc { - FileScanConfig::new( + let config = FileScanConfigBuilder::new( ObjectStoreUrl::parse("test:///").unwrap(), schema(), Arc::new(CsvSource::new(false, b',', b'"')), ) .with_file(PartitionedFile::new("x".to_string(), 100)) .with_output_ordering(output_ordering) - .build() + .build(); + + DataSourceExec::from_data_source(config) } fn csv_exec_multiple() -> Arc { @@ -218,7 +223,7 @@ fn csv_exec_multiple() -> Arc { // Created a sorted parquet exec with multiple files fn csv_exec_multiple_sorted(output_ordering: Vec) -> Arc { - FileScanConfig::new( + let config = FileScanConfigBuilder::new( ObjectStoreUrl::parse("test:///").unwrap(), schema(), Arc::new(CsvSource::new(false, b',', b'"')), @@ -228,7 +233,9 @@ fn csv_exec_multiple_sorted(output_ordering: Vec) -> Arc Result<()> { }; let plan = aggregate_exec_with_alias( - FileScanConfig::new( - ObjectStoreUrl::parse("test:///").unwrap(), - schema(), - Arc::new(CsvSource::new(false, b',', b'"')), - ) - .with_file(PartitionedFile::new("x".to_string(), 100)) - .with_file_compression_type(compression_type) - .build(), + DataSourceExec::from_data_source( + FileScanConfigBuilder::new( + ObjectStoreUrl::parse("test:///").unwrap(), + schema(), + Arc::new(CsvSource::new(false, b',', b'"')), + ) + .with_file(PartitionedFile::new("x".to_string(), 100)) + .with_file_compression_type(compression_type) + .build(), + ), vec![("a".to_string(), "a".to_string())], ); let test_config = TestConfig::default() diff --git a/datafusion/core/tests/physical_optimizer/enforce_sorting.rs b/datafusion/core/tests/physical_optimizer/enforce_sorting.rs index bb77192e05b8..4d2c875d3f1d 100644 --- a/datafusion/core/tests/physical_optimizer/enforce_sorting.rs +++ b/datafusion/core/tests/physical_optimizer/enforce_sorting.rs @@ -45,7 +45,7 @@ use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeE use datafusion_physical_plan::sorts::sort::SortExec; use datafusion_physical_plan::windows::{create_window_expr, BoundedWindowAggExec, WindowAggExec}; use datafusion_physical_plan::{displayable, get_plan_string, ExecutionPlan, InputOrderMode}; -use datafusion::datasource::physical_plan::{CsvSource, FileScanConfig, ParquetSource}; +use datafusion::datasource::physical_plan::{CsvSource, ParquetSource}; use datafusion::datasource::listing::PartitionedFile; use datafusion_physical_optimizer::enforce_sorting::{EnforceSorting, PlanWithCorrespondingCoalescePartitions, PlanWithCorrespondingSort, parallelize_sorts, ensure_sorting}; use datafusion_physical_optimizer::enforce_sorting::replace_with_order_preserving_variants::{replace_with_order_preserving_variants, OrderPreservationContext}; @@ -56,6 +56,8 @@ use datafusion_functions_aggregate::average::avg_udaf; use datafusion_functions_aggregate::count::count_udaf; use datafusion_functions_aggregate::min_max::{max_udaf, min_udaf}; +use datafusion_datasource::file_scan_config::FileScanConfigBuilder; +use datafusion_datasource::source::DataSourceExec; use rstest::rstest; /// Create a csv exec for tests @@ -65,14 +67,16 @@ fn csv_exec_ordered( ) -> Arc { let sort_exprs = sort_exprs.into_iter().collect(); - FileScanConfig::new( + let config = FileScanConfigBuilder::new( ObjectStoreUrl::parse("test:///").unwrap(), schema.clone(), Arc::new(CsvSource::new(true, 0, b'"')), ) .with_file(PartitionedFile::new("file_path".to_string(), 100)) .with_output_ordering(vec![sort_exprs]) - .build() + .build(); + + DataSourceExec::from_data_source(config) } /// Created a sorted parquet exec @@ -83,14 +87,16 @@ pub fn parquet_exec_sorted( let sort_exprs = sort_exprs.into_iter().collect(); let source = Arc::new(ParquetSource::default()); - FileScanConfig::new( + let config = FileScanConfigBuilder::new( ObjectStoreUrl::parse("test:///").unwrap(), schema.clone(), source, ) .with_file(PartitionedFile::new("x".to_string(), 100)) .with_output_ordering(vec![sort_exprs]) - .build() + .build(); + + DataSourceExec::from_data_source(config) } /// Create a sorted Csv exec @@ -100,14 +106,16 @@ fn csv_exec_sorted( ) -> Arc { let sort_exprs = sort_exprs.into_iter().collect(); - FileScanConfig::new( + let config = FileScanConfigBuilder::new( ObjectStoreUrl::parse("test:///").unwrap(), schema.clone(), Arc::new(CsvSource::new(false, 0, 0)), ) .with_file(PartitionedFile::new("x".to_string(), 100)) .with_output_ordering(vec![sort_exprs]) - .build() + .build(); + + DataSourceExec::from_data_source(config) } /// Runs the sort enforcement optimizer and asserts the plan diff --git a/datafusion/core/tests/physical_optimizer/projection_pushdown.rs b/datafusion/core/tests/physical_optimizer/projection_pushdown.rs index abe058df99d0..911d2c0cee05 100644 --- a/datafusion/core/tests/physical_optimizer/projection_pushdown.rs +++ b/datafusion/core/tests/physical_optimizer/projection_pushdown.rs @@ -22,7 +22,7 @@ use arrow::compute::SortOptions; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use datafusion::datasource::listing::PartitionedFile; use datafusion::datasource::memory::MemorySourceConfig; -use datafusion::datasource::physical_plan::{CsvSource, FileScanConfig}; +use datafusion::datasource::physical_plan::CsvSource; use datafusion::datasource::source::DataSourceExec; use datafusion_common::config::ConfigOptions; use datafusion_common::Result; @@ -59,6 +59,7 @@ use datafusion_physical_plan::streaming::StreamingTableExec; use datafusion_physical_plan::union::UnionExec; use datafusion_physical_plan::{get_plan_string, ExecutionPlan}; +use datafusion_datasource::file_scan_config::FileScanConfigBuilder; use datafusion_expr_common::columnar_value::ColumnarValue; use itertools::Itertools; @@ -372,14 +373,16 @@ fn create_simple_csv_exec() -> Arc { Field::new("d", DataType::Int32, true), Field::new("e", DataType::Int32, true), ])); - FileScanConfig::new( + let config = FileScanConfigBuilder::new( ObjectStoreUrl::parse("test:///").unwrap(), schema, Arc::new(CsvSource::new(false, 0, 0)), ) .with_file(PartitionedFile::new("x".to_string(), 100)) .with_projection(Some(vec![0, 1, 2, 3, 4])) - .build() + .build(); + + DataSourceExec::from_data_source(config) } fn create_projecting_csv_exec() -> Arc { @@ -389,14 +392,16 @@ fn create_projecting_csv_exec() -> Arc { Field::new("c", DataType::Int32, true), Field::new("d", DataType::Int32, true), ])); - FileScanConfig::new( + let config = FileScanConfigBuilder::new( ObjectStoreUrl::parse("test:///").unwrap(), schema, Arc::new(CsvSource::new(false, 0, 0)), ) .with_file(PartitionedFile::new("x".to_string(), 100)) .with_projection(Some(vec![3, 2, 1])) - .build() + .build(); + + DataSourceExec::from_data_source(config) } fn create_projecting_memory_exec() -> Arc { @@ -1398,7 +1403,7 @@ fn partitioned_data_source() -> Arc { Field::new("string_col", DataType::Utf8, true), ])); - FileScanConfig::new( + let config = FileScanConfigBuilder::new( ObjectStoreUrl::parse("test:///").unwrap(), file_schema.clone(), Arc::new(CsvSource::default()), @@ -1406,7 +1411,9 @@ fn partitioned_data_source() -> Arc { .with_file(PartitionedFile::new("x".to_string(), 100)) .with_table_partition_cols(vec![Field::new("partition_col", DataType::Utf8, true)]) .with_projection(Some(vec![0, 1, 2])) - .build() + .build(); + + DataSourceExec::from_data_source(config) } #[test] diff --git a/datafusion/core/tests/physical_optimizer/test_utils.rs b/datafusion/core/tests/physical_optimizer/test_utils.rs index 99a75e6e5067..4587f99989d3 100644 --- a/datafusion/core/tests/physical_optimizer/test_utils.rs +++ b/datafusion/core/tests/physical_optimizer/test_utils.rs @@ -33,7 +33,7 @@ use datafusion_common::config::ConfigOptions; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_common::utils::expr::COUNT_STAR_EXPANSION; use datafusion_common::{JoinType, Result}; -use datafusion_datasource::file_scan_config::FileScanConfig; +use datafusion_datasource::file_scan_config::FileScanConfigBuilder; use datafusion_execution::object_store::ObjectStoreUrl; use datafusion_execution::{SendableRecordBatchStream, TaskContext}; use datafusion_expr::{WindowFrame, WindowFunctionDefinition}; @@ -69,27 +69,31 @@ use datafusion_physical_plan::{ /// Create a non sorted parquet exec pub fn parquet_exec(schema: &SchemaRef) -> Arc { - FileScanConfig::new( + let config = FileScanConfigBuilder::new( ObjectStoreUrl::parse("test:///").unwrap(), schema.clone(), Arc::new(ParquetSource::default()), ) .with_file(PartitionedFile::new("x".to_string(), 100)) - .build() + .build(); + + DataSourceExec::from_data_source(config) } /// Create a single parquet file that is sorted pub(crate) fn parquet_exec_with_sort( output_ordering: Vec, ) -> Arc { - FileScanConfig::new( + let config = FileScanConfigBuilder::new( ObjectStoreUrl::parse("test:///").unwrap(), schema(), Arc::new(ParquetSource::default()), ) .with_file(PartitionedFile::new("x".to_string(), 100)) .with_output_ordering(output_ordering) - .build() + .build(); + + DataSourceExec::from_data_source(config) } pub fn schema() -> SchemaRef { diff --git a/datafusion/datasource-avro/src/file_format.rs b/datafusion/datasource-avro/src/file_format.rs index 779b610fe44c..4b50fee1d326 100644 --- a/datafusion/datasource-avro/src/file_format.rs +++ b/datafusion/datasource-avro/src/file_format.rs @@ -35,7 +35,8 @@ use datafusion_common::{Result, Statistics}; use datafusion_datasource::file::FileSource; use datafusion_datasource::file_compression_type::FileCompressionType; use datafusion_datasource::file_format::{FileFormat, FileFormatFactory}; -use datafusion_datasource::file_scan_config::FileScanConfig; +use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder}; +use datafusion_datasource::source::DataSourceExec; use datafusion_physical_expr::PhysicalExpr; use datafusion_physical_plan::ExecutionPlan; use datafusion_session::Session; @@ -151,7 +152,10 @@ impl FileFormat for AvroFormat { conf: FileScanConfig, _filters: Option<&Arc>, ) -> Result> { - Ok(conf.with_source(self.file_source()).build()) + let config = FileScanConfigBuilder::from(conf) + .with_source(self.file_source()) + .build(); + Ok(DataSourceExec::from_data_source(config)) } fn file_source(&self) -> Arc { diff --git a/datafusion/datasource-csv/src/file_format.rs b/datafusion/datasource-csv/src/file_format.rs index 9896643106de..76f3c50a70a7 100644 --- a/datafusion/datasource-csv/src/file_format.rs +++ b/datafusion/datasource-csv/src/file_format.rs @@ -42,7 +42,7 @@ use datafusion_datasource::file_compression_type::FileCompressionType; use datafusion_datasource::file_format::{ FileFormat, FileFormatFactory, DEFAULT_SCHEMA_INFER_MAX_RECORD, }; -use datafusion_datasource::file_scan_config::FileScanConfig; +use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder}; use datafusion_datasource::file_sink_config::{FileSink, FileSinkConfig}; use datafusion_datasource::sink::{DataSink, DataSinkExec}; use datafusion_datasource::write::demux::DemuxedStreamReceiver; @@ -57,6 +57,7 @@ use datafusion_session::Session; use async_trait::async_trait; use bytes::{Buf, Bytes}; +use datafusion_datasource::source::DataSourceExec; use futures::stream::BoxStream; use futures::{pin_mut, Stream, StreamExt, TryStreamExt}; use object_store::{delimited::newline_delimited_stream, ObjectMeta, ObjectStore}; @@ -406,10 +407,9 @@ impl FileFormat for CsvFormat { async fn create_physical_plan( &self, state: &dyn Session, - mut conf: FileScanConfig, + conf: FileScanConfig, _filters: Option<&Arc>, ) -> Result> { - conf.file_compression_type = self.options.compression.into(); // Consult configuration options for default values let has_header = self .options @@ -419,7 +419,10 @@ impl FileFormat for CsvFormat { .options .newlines_in_values .unwrap_or(state.config_options().catalog.newlines_in_values); - conf.new_lines_in_values = newlines_in_values; + + let conf_builder = FileScanConfigBuilder::from(conf) + .with_file_compression_type(self.options.compression.into()) + .with_newlines_in_values(newlines_in_values); let source = Arc::new( CsvSource::new(has_header, self.options.delimiter, self.options.quote) @@ -427,7 +430,10 @@ impl FileFormat for CsvFormat { .with_terminator(self.options.terminator) .with_comment(self.options.comment), ); - Ok(conf.with_source(source).build()) + + let config = conf_builder.with_source(source).build(); + + Ok(DataSourceExec::from_data_source(config)) } async fn create_writer_physical_plan( diff --git a/datafusion/datasource-csv/src/mod.rs b/datafusion/datasource-csv/src/mod.rs index 97f4214cf90f..90538d0808b1 100644 --- a/datafusion/datasource-csv/src/mod.rs +++ b/datafusion/datasource-csv/src/mod.rs @@ -26,6 +26,7 @@ use std::sync::Arc; use arrow::datatypes::SchemaRef; use datafusion_datasource::file_groups::FileGroup; +use datafusion_datasource::file_scan_config::FileScanConfigBuilder; use datafusion_datasource::{file::FileSource, file_scan_config::FileScanConfig}; use datafusion_execution::object_store::ObjectStoreUrl; pub use file_format::*; @@ -36,6 +37,7 @@ pub fn partitioned_csv_config( file_groups: Vec, file_source: Arc, ) -> FileScanConfig { - FileScanConfig::new(ObjectStoreUrl::local_filesystem(), schema, file_source) + FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), schema, file_source) .with_file_groups(file_groups) + .build() } diff --git a/datafusion/datasource-csv/src/source.rs b/datafusion/datasource-csv/src/source.rs index 175ee18197f6..6db4d1870320 100644 --- a/datafusion/datasource-csv/src/source.rs +++ b/datafusion/datasource-csv/src/source.rs @@ -407,7 +407,7 @@ impl ExecutionPlan for CsvExec { /// ``` /// # use std::sync::Arc; /// # use arrow::datatypes::Schema; -/// # use datafusion_datasource::file_scan_config::FileScanConfig; +/// # use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder}; /// # use datafusion_datasource::PartitionedFile; /// # use datafusion_datasource_csv::source::CsvSource; /// # use datafusion_execution::object_store::ObjectStoreUrl; @@ -424,10 +424,11 @@ impl ExecutionPlan for CsvExec { /// .with_terminator(Some(b'#') /// )); /// // Create a DataSourceExec for reading the first 100MB of `file1.csv` -/// let file_scan_config = FileScanConfig::new(object_store_url, file_schema, source) +/// let config = FileScanConfigBuilder::new(object_store_url, file_schema, source) /// .with_file(PartitionedFile::new("file1.csv", 100*1024*1024)) -/// .with_newlines_in_values(true); // The file contains newlines in values; -/// let exec = file_scan_config.build(); +/// .with_newlines_in_values(true) // The file contains newlines in values; +/// .build(); +/// let exec = (DataSourceExec::from_data_source(config)); /// ``` #[derive(Debug, Clone, Default)] pub struct CsvSource { diff --git a/datafusion/datasource-json/src/file_format.rs b/datafusion/datasource-json/src/file_format.rs index 00019e83c42d..a6c52312e412 100644 --- a/datafusion/datasource-json/src/file_format.rs +++ b/datafusion/datasource-json/src/file_format.rs @@ -44,7 +44,7 @@ use datafusion_datasource::file_compression_type::FileCompressionType; use datafusion_datasource::file_format::{ FileFormat, FileFormatFactory, DEFAULT_SCHEMA_INFER_MAX_RECORD, }; -use datafusion_datasource::file_scan_config::FileScanConfig; +use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder}; use datafusion_datasource::file_sink_config::{FileSink, FileSinkConfig}; use datafusion_datasource::sink::{DataSink, DataSinkExec}; use datafusion_datasource::write::demux::DemuxedStreamReceiver; @@ -59,6 +59,7 @@ use datafusion_session::Session; use async_trait::async_trait; use bytes::{Buf, Bytes}; +use datafusion_datasource::source::DataSourceExec; use object_store::{GetResultPayload, ObjectMeta, ObjectStore}; #[derive(Default)] @@ -246,12 +247,17 @@ impl FileFormat for JsonFormat { async fn create_physical_plan( &self, _state: &dyn Session, - mut conf: FileScanConfig, + conf: FileScanConfig, _filters: Option<&Arc>, ) -> Result> { let source = Arc::new(JsonSource::new()); - conf.file_compression_type = FileCompressionType::from(self.options.compression); - Ok(conf.with_source(source).build()) + let conf = FileScanConfigBuilder::from(conf) + .with_file_compression_type(FileCompressionType::from( + self.options.compression, + )) + .with_source(source) + .build(); + Ok(DataSourceExec::from_data_source(conf)) } async fn create_writer_physical_plan( diff --git a/datafusion/datasource-parquet/src/file_format.rs b/datafusion/datasource-parquet/src/file_format.rs index 98ba24df97c5..1d9a67fd2eb6 100644 --- a/datafusion/datasource-parquet/src/file_format.rs +++ b/datafusion/datasource-parquet/src/file_format.rs @@ -23,9 +23,6 @@ use std::fmt::Debug; use std::ops::Range; use std::sync::Arc; -use crate::can_expr_be_pushed_down_with_schemas; -use crate::source::ParquetSource; - use arrow::array::RecordBatch; use arrow::compute::sum; use arrow::datatypes::{DataType, Field, FieldRef}; @@ -45,7 +42,7 @@ use datafusion_datasource::file_compression_type::FileCompressionType; use datafusion_datasource::file_format::{ FileFormat, FileFormatFactory, FilePushdownSupport, }; -use datafusion_datasource::file_scan_config::FileScanConfig; +use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder}; use datafusion_datasource::file_sink_config::{FileSink, FileSinkConfig}; use datafusion_datasource::sink::{DataSink, DataSinkExec}; use datafusion_datasource::write::demux::DemuxedStreamReceiver; @@ -61,8 +58,11 @@ use datafusion_physical_plan::Accumulator; use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan}; use datafusion_session::Session; +use crate::can_expr_be_pushed_down_with_schemas; +use crate::source::ParquetSource; use async_trait::async_trait; use bytes::Bytes; +use datafusion_datasource::source::DataSourceExec; use futures::future::BoxFuture; use futures::{FutureExt, StreamExt, TryStreamExt}; use log::debug; @@ -417,7 +417,11 @@ impl FileFormat for ParquetFormat { if let Some(metadata_size_hint) = metadata_size_hint { source = source.with_metadata_size_hint(metadata_size_hint) } - Ok(conf.with_source(Arc::new(source)).build()) + + let conf = FileScanConfigBuilder::from(conf) + .with_source(Arc::new(source)) + .build(); + Ok(DataSourceExec::from_data_source(conf)) } async fn create_writer_physical_plan( diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index 47e692cb966d..d67d0698a959 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -77,7 +77,7 @@ use object_store::ObjectStore; /// ``` /// # use std::sync::Arc; /// # use arrow::datatypes::Schema; -/// # use datafusion_datasource::file_scan_config::FileScanConfig; +/// # use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder}; /// # use datafusion_datasource_parquet::source::ParquetSource; /// # use datafusion_datasource::PartitionedFile; /// # use datafusion_execution::object_store::ObjectStoreUrl; @@ -93,9 +93,9 @@ use object_store::ObjectStore; /// .with_predicate(Arc::clone(&file_schema), predicate) /// ); /// // Create a DataSourceExec for reading `file1.parquet` with a file size of 100MB -/// let file_scan_config = FileScanConfig::new(object_store_url, file_schema, source) -/// .with_file(PartitionedFile::new("file1.parquet", 100*1024*1024)); -/// let exec = file_scan_config.build(); +/// let config = FileScanConfigBuilder::new(object_store_url, file_schema, source) +/// .with_file(PartitionedFile::new("file1.parquet", 100*1024*1024)).build(); +/// let exec = DataSourceExec::from_data_source(config); /// ``` /// /// # Features @@ -177,7 +177,7 @@ use object_store::ObjectStore; /// .clone() /// .with_file_groups(vec![file_group.clone()]); /// -/// new_config.build() +/// (DataSourceExec::from_data_source(new_config)) /// }) /// .collect::>(); /// ``` @@ -200,7 +200,7 @@ use object_store::ObjectStore; /// # use arrow::datatypes::{Schema, SchemaRef}; /// # use datafusion_datasource::PartitionedFile; /// # use datafusion_datasource_parquet::ParquetAccessPlan; -/// # use datafusion_datasource::file_scan_config::FileScanConfig; +/// # use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder}; /// # use datafusion_datasource_parquet::source::ParquetSource; /// # use datafusion_execution::object_store::ObjectStoreUrl; /// # use datafusion_datasource::source::DataSourceExec; @@ -216,11 +216,11 @@ use object_store::ObjectStore; /// let partitioned_file = PartitionedFile::new("my_file.parquet", 1234) /// .with_extensions(Arc::new(access_plan)); /// // create a FileScanConfig to scan this file -/// let file_scan_config = FileScanConfig::new(ObjectStoreUrl::local_filesystem(), schema(), Arc::new(ParquetSource::default())) -/// .with_file(partitioned_file); +/// let config = FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), schema(), Arc::new(ParquetSource::default())) +/// .with_file(partitioned_file).build(); /// // this parquet DataSourceExec will not even try to read row groups 2 and 4. Additional /// // pruning based on predicates may also happen -/// let exec = file_scan_config.build(); +/// let exec = DataSourceExec::from_data_source(config); /// ``` /// /// For a complete example, see the [`advanced_parquet_index` example]). diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index 770e26bbf456..5172dafb1f91 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -74,8 +74,9 @@ use crate::{ /// # use datafusion_datasource::file::FileSource; /// # use datafusion_datasource::file_groups::FileGroup; /// # use datafusion_datasource::PartitionedFile; -/// # use datafusion_datasource::file_scan_config::FileScanConfig; +/// # use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder}; /// # use datafusion_datasource::file_stream::FileOpener; +/// # use datafusion_datasource::source::DataSourceExec; /// # use datafusion_execution::object_store::ObjectStoreUrl; /// # use datafusion_physical_plan::ExecutionPlan; /// # use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; @@ -106,7 +107,7 @@ use crate::{ /// // create FileScan config for reading parquet files from file:// /// let object_store_url = ObjectStoreUrl::local_filesystem(); /// let file_source = Arc::new(ParquetSource::new()); -/// let config = FileScanConfig::new(object_store_url, file_schema, file_source) +/// let config = FileScanConfigBuilder::new(object_store_url, file_schema, file_source) /// .with_limit(Some(1000)) // read only the first 1000 records /// .with_projection(Some(vec![2, 3])) // project columns 2 and 3 /// // Read /tmp/file1.parquet with known size of 1234 bytes in a single group @@ -116,9 +117,9 @@ use crate::{ /// .with_file_group(FileGroup::new(vec![ /// PartitionedFile::new("file2.parquet", 56), /// PartitionedFile::new("file3.parquet", 78), -/// ])); +/// ])).build(); /// // create an execution plan from the config -/// let plan: Arc = config.build(); +/// let plan: Arc = DataSourceExec::from_data_source(config); /// ``` #[derive(Clone)] pub struct FileScanConfig { @@ -174,6 +175,272 @@ pub struct FileScanConfig { pub batch_size: Option, } +/// A builder for [`FileScanConfig`]'s. +/// +/// Example: +/// +/// ```rust +/// # use std::sync::Arc; +/// # use arrow::datatypes::{DataType, Field, Schema}; +/// # use datafusion_datasource::file_scan_config::{FileScanConfigBuilder, FileScanConfig}; +/// # use datafusion_datasource::file_compression_type::FileCompressionType; +/// # use datafusion_datasource::file_groups::FileGroup; +/// # use datafusion_datasource::PartitionedFile; +/// # use datafusion_execution::object_store::ObjectStoreUrl; +/// # use datafusion_common::Statistics; +/// # use datafusion_datasource::file::FileSource; +/// +/// # fn main() { +/// # fn with_source(file_source: Arc) { +/// // Create a schema for our Parquet files +/// let schema = Arc::new(Schema::new(vec![ +/// Field::new("id", DataType::Int32, false), +/// Field::new("value", DataType::Utf8, false), +/// ])); +/// +/// // Create a builder for scanning Parquet files from a local filesystem +/// let config = FileScanConfigBuilder::new( +/// ObjectStoreUrl::local_filesystem(), +/// schema, +/// file_source, +/// ) +/// // Set a limit of 1000 rows +/// .with_limit(Some(1000)) +/// // Project only the first column +/// .with_projection(Some(vec![0])) +/// // Add partition columns +/// .with_table_partition_cols(vec![ +/// Field::new("date", DataType::Utf8, false), +/// ]) +/// // Add a file group with two files +/// .with_file_group(FileGroup::new(vec![ +/// PartitionedFile::new("data/date=2024-01-01/file1.parquet", 1024), +/// PartitionedFile::new("data/date=2024-01-01/file2.parquet", 2048), +/// ])) +/// // Set compression type +/// .with_file_compression_type(FileCompressionType::UNCOMPRESSED) +/// // Build the final config +/// .build(); +/// # } +/// # } +/// ``` +#[derive(Clone)] +pub struct FileScanConfigBuilder { + object_store_url: ObjectStoreUrl, + file_schema: SchemaRef, + file_source: Arc, + + limit: Option, + projection: Option>, + table_partition_cols: Vec, + constraints: Option, + file_groups: Vec, + statistics: Option, + output_ordering: Vec, + file_compression_type: Option, + new_lines_in_values: Option, + batch_size: Option, +} + +impl FileScanConfigBuilder { + /// Create a new [`FileScanConfigBuilder`] with default settings for scanning files. + /// + /// # Parameters: + /// * `object_store_url`: See [`FileScanConfig::object_store_url`] + /// * `file_schema`: See [`FileScanConfig::file_schema`] + /// * `file_source`: See [`FileScanConfig::file_source`] + pub fn new( + object_store_url: ObjectStoreUrl, + file_schema: SchemaRef, + file_source: Arc, + ) -> Self { + Self { + object_store_url, + file_schema, + file_source, + file_groups: vec![], + statistics: None, + output_ordering: vec![], + file_compression_type: None, + new_lines_in_values: None, + limit: None, + projection: None, + table_partition_cols: vec![], + constraints: None, + batch_size: None, + } + } + + /// Set the maximum number of records to read from this plan. If `None`, + /// all records after filtering are returned. + pub fn with_limit(mut self, limit: Option) -> Self { + self.limit = limit; + self + } + + /// Set the file source for scanning files. + /// + /// This method allows you to change the file source implementation (e.g. ParquetSource, CsvSource, etc.) + /// after the builder has been created. + pub fn with_source(mut self, file_source: Arc) -> Self { + self.file_source = file_source; + self + } + + /// Set the columns on which to project the data. Indexes that are higher than the + /// number of columns of `file_schema` refer to `table_partition_cols`. + pub fn with_projection(mut self, projection: Option>) -> Self { + self.projection = projection; + self + } + + /// Set the partitioning columns + pub fn with_table_partition_cols(mut self, table_partition_cols: Vec) -> Self { + self.table_partition_cols = table_partition_cols; + self + } + + /// Set the table constraints + pub fn with_constraints(mut self, constraints: Constraints) -> Self { + self.constraints = Some(constraints); + self + } + + /// Set the estimated overall statistics of the files, taking `filters` into account. + /// Defaults to [`Statistics::new_unknown`]. + pub fn with_statistics(mut self, statistics: Statistics) -> Self { + self.statistics = Some(statistics); + self + } + + /// Set the list of files to be processed, grouped into partitions. + /// + /// Each file must have a schema of `file_schema` or a subset. If + /// a particular file has a subset, the missing columns are + /// padded with NULLs. + /// + /// DataFusion may attempt to read each partition of files + /// concurrently, however files *within* a partition will be read + /// sequentially, one after the next. + pub fn with_file_groups(mut self, file_groups: Vec) -> Self { + self.file_groups = file_groups; + self + } + + /// Add a new file group + /// + /// See [`Self::with_file_groups`] for more information + pub fn with_file_group(mut self, file_group: FileGroup) -> Self { + self.file_groups.push(file_group); + self + } + + /// Add a file as a single group + /// + /// See [`Self::with_file_groups`] for more information. + pub fn with_file(self, file: PartitionedFile) -> Self { + self.with_file_group(FileGroup::new(vec![file])) + } + + /// Set the output ordering of the files + pub fn with_output_ordering(mut self, output_ordering: Vec) -> Self { + self.output_ordering = output_ordering; + self + } + + /// Set the file compression type + pub fn with_file_compression_type( + mut self, + file_compression_type: FileCompressionType, + ) -> Self { + self.file_compression_type = Some(file_compression_type); + self + } + + /// Set whether new lines in values are supported for CSVOptions + /// + /// Parsing newlines in quoted values may be affected by execution behaviour such as + /// parallel file scanning. Setting this to `true` ensures that newlines in values are + /// parsed successfully, which may reduce performance. + pub fn with_newlines_in_values(mut self, new_lines_in_values: bool) -> Self { + self.new_lines_in_values = Some(new_lines_in_values); + self + } + + /// Set the batch_size property + pub fn with_batch_size(mut self, batch_size: Option) -> Self { + self.batch_size = batch_size; + self + } + + /// Build the final [`FileScanConfig`] with all the configured settings. + /// + /// This method takes ownership of the builder and returns the constructed `FileScanConfig`. + /// Any unset optional fields will use their default values. + pub fn build(self) -> FileScanConfig { + let Self { + object_store_url, + file_schema, + file_source, + limit, + projection, + table_partition_cols, + constraints, + file_groups, + statistics, + output_ordering, + file_compression_type, + new_lines_in_values, + batch_size, + } = self; + + let constraints = constraints.unwrap_or_default(); + let statistics = + statistics.unwrap_or_else(|| Statistics::new_unknown(&file_schema)); + + let file_source = file_source.with_statistics(statistics.clone()); + let file_compression_type = + file_compression_type.unwrap_or(FileCompressionType::UNCOMPRESSED); + let new_lines_in_values = new_lines_in_values.unwrap_or(false); + + FileScanConfig { + object_store_url, + file_schema, + file_source, + limit, + projection, + table_partition_cols, + constraints, + file_groups, + statistics, + output_ordering, + file_compression_type, + new_lines_in_values, + batch_size, + } + } +} + +impl From for FileScanConfigBuilder { + fn from(config: FileScanConfig) -> Self { + Self { + object_store_url: config.object_store_url, + file_schema: config.file_schema, + file_source: config.file_source, + file_groups: config.file_groups, + statistics: Some(config.statistics), + output_ordering: config.output_ordering, + file_compression_type: Some(config.file_compression_type), + new_lines_in_values: Some(config.new_lines_in_values), + limit: config.limit, + projection: config.projection, + table_partition_cols: config.table_partition_cols, + constraints: Some(config.constraints), + batch_size: config.batch_size, + } + } +} + impl DataSource for FileScanConfig { fn open( &self, @@ -267,8 +534,10 @@ impl DataSource for FileScanConfig { } fn with_fetch(&self, limit: Option) -> Option> { - let source = self.clone(); - Some(Arc::new(source.with_limit(limit))) + let source = FileScanConfigBuilder::from(self.clone()) + .with_limit(limit) + .build(); + Some(Arc::new(source)) } fn fetch(&self) -> Option { @@ -306,11 +575,13 @@ impl DataSource for FileScanConfig { .clone() .unwrap_or((0..self.file_schema.fields().len()).collect()), ); - file_scan - // Assign projected statistics to source - .with_projection(Some(new_projections)) - .with_source(source) - .build() as _ + DataSourceExec::from_data_source( + FileScanConfigBuilder::from(file_scan) + // Assign projected statistics to source + .with_projection(Some(new_projections)) + .with_source(source) + .build(), + ) as _ })) } } @@ -326,14 +597,15 @@ impl FileScanConfig { /// # Parameters: /// * `object_store_url`: See [`Self::object_store_url`] /// * `file_schema`: See [`Self::file_schema`] + #[allow(deprecated)] // `new` will be removed same time as `with_source` pub fn new( object_store_url: ObjectStoreUrl, file_schema: SchemaRef, file_source: Arc, ) -> Self { let statistics = Statistics::new_unknown(&file_schema); - - let mut config = Self { + let file_source = file_source.with_statistics(statistics.clone()); + Self { object_store_url, file_schema, file_groups: vec![], @@ -347,25 +619,25 @@ impl FileScanConfig { new_lines_in_values: false, file_source: Arc::clone(&file_source), batch_size: None, - }; - - config = config.with_source(Arc::clone(&file_source)); - config + } } /// Set the file source + #[deprecated(since = "47.0.0", note = "use FileScanConfigBuilder instead")] pub fn with_source(mut self, file_source: Arc) -> Self { self.file_source = file_source.with_statistics(self.statistics.clone()); self } /// Set the table constraints of the files + #[deprecated(since = "47.0.0", note = "use FileScanConfigBuilder instead")] pub fn with_constraints(mut self, constraints: Constraints) -> Self { self.constraints = constraints; self } /// Set the statistics of the files + #[deprecated(since = "47.0.0", note = "use FileScanConfigBuilder instead")] pub fn with_statistics(mut self, statistics: Statistics) -> Self { self.statistics = statistics.clone(); self.file_source = self.file_source.with_statistics(statistics); @@ -437,12 +709,14 @@ impl FileScanConfig { } /// Set the projection of the files + #[deprecated(since = "47.0.0", note = "use FileScanConfigBuilder instead")] pub fn with_projection(mut self, projection: Option>) -> Self { self.projection = projection; self } /// Set the limit of the files + #[deprecated(since = "47.0.0", note = "use FileScanConfigBuilder instead")] pub fn with_limit(mut self, limit: Option) -> Self { self.limit = limit; self @@ -451,6 +725,8 @@ impl FileScanConfig { /// Add a file as a single group /// /// See [Self::file_groups] for more information. + #[deprecated(since = "47.0.0", note = "use FileScanConfigBuilder instead")] + #[allow(deprecated)] pub fn with_file(self, file: PartitionedFile) -> Self { self.with_file_group(FileGroup::new(vec![file])) } @@ -458,6 +734,7 @@ impl FileScanConfig { /// Add the file groups /// /// See [Self::file_groups] for more information. + #[deprecated(since = "47.0.0", note = "use FileScanConfigBuilder instead")] pub fn with_file_groups(mut self, mut file_groups: Vec) -> Self { self.file_groups.append(&mut file_groups); self @@ -466,24 +743,28 @@ impl FileScanConfig { /// Add a new file group /// /// See [Self::file_groups] for more information + #[deprecated(since = "47.0.0", note = "use FileScanConfigBuilder instead")] pub fn with_file_group(mut self, file_group: FileGroup) -> Self { self.file_groups.push(file_group); self } /// Set the partitioning columns of the files + #[deprecated(since = "47.0.0", note = "use FileScanConfigBuilder instead")] pub fn with_table_partition_cols(mut self, table_partition_cols: Vec) -> Self { self.table_partition_cols = table_partition_cols; self } /// Set the output ordering of the files + #[deprecated(since = "47.0.0", note = "use FileScanConfigBuilder instead")] pub fn with_output_ordering(mut self, output_ordering: Vec) -> Self { self.output_ordering = output_ordering; self } /// Set the file compression type + #[deprecated(since = "47.0.0", note = "use FileScanConfigBuilder instead")] pub fn with_file_compression_type( mut self, file_compression_type: FileCompressionType, @@ -493,12 +774,14 @@ impl FileScanConfig { } /// Set the new_lines_in_values property + #[deprecated(since = "47.0.0", note = "use FileScanConfigBuilder instead")] pub fn with_newlines_in_values(mut self, new_lines_in_values: bool) -> Self { self.new_lines_in_values = new_lines_in_values; self } /// Set the batch_size property + #[deprecated(since = "47.0.0", note = "use FileScanConfigBuilder instead")] pub fn with_batch_size(mut self, batch_size: Option) -> Self { self.batch_size = batch_size; self @@ -643,10 +926,10 @@ impl FileScanConfig { .collect()) } - // TODO: This function should be moved into DataSourceExec once FileScanConfig moved out of datafusion/core /// Returns a new [`DataSourceExec`] to scan the files specified by this config + #[deprecated(since = "47.0.0", note = "use DataSourceExec::new instead")] pub fn build(self) -> Arc { - Arc::new(DataSourceExec::new(Arc::new(self))) + DataSourceExec::from_data_source(self) } /// Write the data_type based on file_source @@ -1752,7 +2035,7 @@ mod tests { statistics: Statistics, table_partition_cols: Vec, ) -> FileScanConfig { - FileScanConfig::new( + FileScanConfigBuilder::new( ObjectStoreUrl::parse("test:///").unwrap(), file_schema, Arc::new(MockSource::default()), @@ -1760,6 +2043,7 @@ mod tests { .with_projection(projection) .with_statistics(statistics) .with_table_partition_cols(table_partition_cols) + .build() } /// Convert partition columns from Vec to Vec @@ -1792,4 +2076,150 @@ mod tests { ) .unwrap() } + + #[test] + fn test_file_scan_config_builder() { + let file_schema = aggr_test_schema(); + let object_store_url = ObjectStoreUrl::parse("test:///").unwrap(); + let file_source: Arc = Arc::new(MockSource::default()); + + // Create a builder with required parameters + let builder = FileScanConfigBuilder::new( + object_store_url.clone(), + Arc::clone(&file_schema), + Arc::clone(&file_source), + ); + + // Build with various configurations + let config = builder + .with_limit(Some(1000)) + .with_projection(Some(vec![0, 1])) + .with_table_partition_cols(vec![Field::new( + "date", + wrap_partition_type_in_dict(DataType::Utf8), + false, + )]) + .with_constraints(Constraints::empty()) + .with_statistics(Statistics::new_unknown(&file_schema)) + .with_file_groups(vec![FileGroup::new(vec![PartitionedFile::new( + "test.parquet".to_string(), + 1024, + )])]) + .with_output_ordering(vec![LexOrdering::default()]) + .with_file_compression_type(FileCompressionType::UNCOMPRESSED) + .with_newlines_in_values(true) + .build(); + + // Verify the built config has all the expected values + assert_eq!(config.object_store_url, object_store_url); + assert_eq!(config.file_schema, file_schema); + assert_eq!(config.limit, Some(1000)); + assert_eq!(config.projection, Some(vec![0, 1])); + assert_eq!(config.table_partition_cols.len(), 1); + assert_eq!(config.table_partition_cols[0].name(), "date"); + assert_eq!(config.file_groups.len(), 1); + assert_eq!(config.file_groups[0].len(), 1); + assert_eq!( + config.file_groups[0][0].object_meta.location.as_ref(), + "test.parquet" + ); + assert_eq!( + config.file_compression_type, + FileCompressionType::UNCOMPRESSED + ); + assert!(config.new_lines_in_values); + assert_eq!(config.output_ordering.len(), 1); + } + + #[test] + fn test_file_scan_config_builder_defaults() { + let file_schema = aggr_test_schema(); + let object_store_url = ObjectStoreUrl::parse("test:///").unwrap(); + let file_source: Arc = Arc::new(MockSource::default()); + + // Create a builder with only required parameters and build without any additional configurations + let config = FileScanConfigBuilder::new( + object_store_url.clone(), + Arc::clone(&file_schema), + Arc::clone(&file_source), + ) + .build(); + + // Verify default values + assert_eq!(config.object_store_url, object_store_url); + assert_eq!(config.file_schema, file_schema); + assert_eq!(config.limit, None); + assert_eq!(config.projection, None); + assert!(config.table_partition_cols.is_empty()); + assert!(config.file_groups.is_empty()); + assert_eq!( + config.file_compression_type, + FileCompressionType::UNCOMPRESSED + ); + assert!(!config.new_lines_in_values); + assert!(config.output_ordering.is_empty()); + assert!(config.constraints.is_empty()); + + // Verify statistics are set to unknown + assert_eq!(config.statistics.num_rows, Precision::Absent); + assert_eq!(config.statistics.total_byte_size, Precision::Absent); + assert_eq!( + config.statistics.column_statistics.len(), + file_schema.fields().len() + ); + for stat in config.statistics.column_statistics { + assert_eq!(stat.distinct_count, Precision::Absent); + assert_eq!(stat.min_value, Precision::Absent); + assert_eq!(stat.max_value, Precision::Absent); + assert_eq!(stat.null_count, Precision::Absent); + } + } + + #[test] + fn test_file_scan_config_builder_new_from() { + let schema = aggr_test_schema(); + let object_store_url = ObjectStoreUrl::parse("test:///").unwrap(); + let file_source: Arc = Arc::new(MockSource::default()); + let partition_cols = vec![Field::new( + "date", + wrap_partition_type_in_dict(DataType::Utf8), + false, + )]; + let file = PartitionedFile::new("test_file.parquet", 100); + + // Create a config with non-default values + let original_config = FileScanConfigBuilder::new( + object_store_url.clone(), + Arc::clone(&schema), + Arc::clone(&file_source), + ) + .with_projection(Some(vec![0, 2])) + .with_limit(Some(10)) + .with_table_partition_cols(partition_cols.clone()) + .with_file(file.clone()) + .with_constraints(Constraints::default()) + .with_newlines_in_values(true) + .build(); + + // Create a new builder from the config + let new_builder = FileScanConfigBuilder::from(original_config); + + // Build a new config from this builder + let new_config = new_builder.build(); + + // Verify properties match + assert_eq!(new_config.object_store_url, object_store_url); + assert_eq!(new_config.file_schema, schema); + assert_eq!(new_config.projection, Some(vec![0, 2])); + assert_eq!(new_config.limit, Some(10)); + assert_eq!(new_config.table_partition_cols, partition_cols); + assert_eq!(new_config.file_groups.len(), 1); + assert_eq!(new_config.file_groups[0].len(), 1); + assert_eq!( + new_config.file_groups[0][0].object_meta.location.as_ref(), + "test_file.parquet" + ); + assert_eq!(new_config.constraints, Constraints::default()); + assert!(new_config.new_lines_in_values); + } } diff --git a/datafusion/datasource/src/file_stream.rs b/datafusion/datasource/src/file_stream.rs index 904f1522878a..1caefc3277ac 100644 --- a/datafusion/datasource/src/file_stream.rs +++ b/datafusion/datasource/src/file_stream.rs @@ -522,7 +522,7 @@ impl FileStreamMetrics { #[cfg(test)] mod tests { - use crate::file_scan_config::FileScanConfig; + use crate::file_scan_config::FileScanConfigBuilder; use crate::tests::make_partition; use crate::PartitionedFile; use arrow::error::ArrowError; @@ -656,13 +656,14 @@ mod tests { let on_error = self.on_error; - let config = FileScanConfig::new( + let config = FileScanConfigBuilder::new( ObjectStoreUrl::parse("test:///").unwrap(), file_schema, Arc::new(MockSource::default()), ) .with_file_group(file_group) - .with_limit(self.limit); + .with_limit(self.limit) + .build(); let metrics_set = ExecutionPlanMetricsSet::new(); let file_stream = FileStream::new(&config, 0, Arc::new(self.opener), &metrics_set) diff --git a/datafusion/datasource/src/memory.rs b/datafusion/datasource/src/memory.rs index 64fd56971b29..f2e36672cd5c 100644 --- a/datafusion/datasource/src/memory.rs +++ b/datafusion/datasource/src/memory.rs @@ -521,7 +521,7 @@ impl MemorySourceConfig { projection: Option>, ) -> Result> { let source = Self::try_new(partitions, schema, projection)?; - Ok(Arc::new(DataSourceExec::new(Arc::new(source)))) + Ok(DataSourceExec::from_data_source(source)) } /// Create a new execution plan from a list of constant values (`ValuesExec`) @@ -611,7 +611,7 @@ impl MemorySourceConfig { show_sizes: true, fetch: None, }; - Ok(Arc::new(DataSourceExec::new(Arc::new(source)))) + Ok(DataSourceExec::from_data_source(source)) } /// Set the limit of the files @@ -760,10 +760,10 @@ mod memory_source_tests { expected_output_order.extend(sort2.clone()); let sort_information = vec![sort1.clone(), sort2.clone()]; - let mem_exec = Arc::new(DataSourceExec::new(Arc::new( + let mem_exec = DataSourceExec::from_data_source( MemorySourceConfig::try_new(&[vec![]], schema, None)? .try_with_sort_information(sort_information)?, - ))); + ); assert_eq!( mem_exec.properties().output_ordering().unwrap(), diff --git a/datafusion/datasource/src/source.rs b/datafusion/datasource/src/source.rs index 74fd123e8df9..6c9122ce1ac1 100644 --- a/datafusion/datasource/src/source.rs +++ b/datafusion/datasource/src/source.rs @@ -195,6 +195,10 @@ impl ExecutionPlan for DataSourceExec { } impl DataSourceExec { + pub fn from_data_source(data_source: impl DataSource + 'static) -> Arc { + Arc::new(Self::new(Arc::new(data_source))) + } + pub fn new(data_source: Arc) -> Self { let cache = Self::compute_properties(Arc::clone(&data_source)); Self { data_source, cache } diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs index 833c1ad51286..6b94094225e5 100644 --- a/datafusion/proto/src/physical_plan/from_proto.rs +++ b/datafusion/proto/src/physical_plan/from_proto.rs @@ -33,7 +33,7 @@ use datafusion::datasource::file_format::parquet::ParquetSink; use datafusion::datasource::listing::{FileRange, ListingTableUrl, PartitionedFile}; use datafusion::datasource::object_store::ObjectStoreUrl; use datafusion::datasource::physical_plan::{ - FileGroup, FileScanConfig, FileSinkConfig, FileSource, + FileGroup, FileScanConfig, FileScanConfigBuilder, FileSinkConfig, FileSource, }; use datafusion::execution::FunctionRegistry; use datafusion::logical_expr::WindowFunctionDefinition; @@ -539,7 +539,7 @@ pub fn parse_protobuf_file_scan_config( output_ordering.push(sort_expr); } - let config = FileScanConfig::new(object_store_url, file_schema, file_source) + let config = FileScanConfigBuilder::new(object_store_url, file_schema, file_source) .with_file_groups(file_groups) .with_constraints(constraints) .with_statistics(statistics) @@ -547,7 +547,8 @@ pub fn parse_protobuf_file_scan_config( .with_limit(proto.limit.as_ref().map(|sl| sl.limit as usize)) .with_table_partition_cols(table_partition_cols) .with_output_ordering(output_ordering) - .with_batch_size(proto.batch_size.map(|s| s as usize)); + .with_batch_size(proto.batch_size.map(|s| s as usize)) + .build(); Ok(config) } diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index 7712c018f342..24cc0d5b3b02 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -49,7 +49,9 @@ use datafusion::datasource::file_format::parquet::ParquetSink; use datafusion::datasource::physical_plan::AvroSource; #[cfg(feature = "parquet")] use datafusion::datasource::physical_plan::ParquetSource; -use datafusion::datasource::physical_plan::{CsvSource, FileScanConfig, JsonSource}; +use datafusion::datasource::physical_plan::{ + CsvSource, FileScanConfig, FileScanConfigBuilder, JsonSource, +}; use datafusion::datasource::sink::DataSinkExec; use datafusion::datasource::source::DataSourceExec; use datafusion::execution::runtime_env::RuntimeEnv; @@ -236,15 +238,16 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { .with_comment(comment), ); - let conf = parse_protobuf_file_scan_config( + let conf = FileScanConfigBuilder::from(parse_protobuf_file_scan_config( scan.base_conf.as_ref().unwrap(), registry, extension_codec, source, - )? + )?) .with_newlines_in_values(scan.newlines_in_values) - .with_file_compression_type(FileCompressionType::UNCOMPRESSED); - Ok(conf.build()) + .with_file_compression_type(FileCompressionType::UNCOMPRESSED) + .build(); + Ok(DataSourceExec::from_data_source(conf)) } PhysicalPlanType::JsonScan(scan) => { let scan_conf = parse_protobuf_file_scan_config( @@ -253,7 +256,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { extension_codec, Arc::new(JsonSource::new()), )?; - Ok(scan_conf.build()) + Ok(DataSourceExec::from_data_source(scan_conf)) } #[cfg_attr(not(feature = "parquet"), allow(unused_variables))] PhysicalPlanType::ParquetScan(scan) => { @@ -290,7 +293,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { extension_codec, Arc::new(source), )?; - Ok(base_config.build()) + Ok(DataSourceExec::from_data_source(base_config)) } #[cfg(not(feature = "parquet"))] panic!("Unable to process a Parquet PhysicalPlan when `parquet` feature is not enabled") @@ -305,7 +308,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { extension_codec, Arc::new(AvroSource::new()), )?; - Ok(conf.build()) + Ok(DataSourceExec::from_data_source(conf)) } #[cfg(not(feature = "avro"))] panic!("Unable to process a Avro PhysicalPlan when `avro` feature is not enabled") diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index 1e5543c05b5e..e3d411e4d40e 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -47,10 +47,11 @@ use datafusion::datasource::file_format::parquet::ParquetSink; use datafusion::datasource::listing::{ListingTableUrl, PartitionedFile}; use datafusion::datasource::object_store::ObjectStoreUrl; use datafusion::datasource::physical_plan::{ - wrap_partition_type_in_dict, wrap_partition_value_in_dict, FileGroup, FileScanConfig, - FileSinkConfig, FileSource, ParquetSource, + wrap_partition_type_in_dict, wrap_partition_value_in_dict, FileGroup, + FileScanConfigBuilder, FileSinkConfig, FileSource, ParquetSource, }; use datafusion::datasource::sink::DataSinkExec; +use datafusion::datasource::source::DataSourceExec; use datafusion::execution::FunctionRegistry; use datafusion::functions_aggregate::sum::sum_udaf; use datafusion::functions_window::nth_value::nth_value_udwf; @@ -742,21 +743,25 @@ fn roundtrip_parquet_exec_with_pruning_predicate() -> Result<()> { ParquetSource::new(options).with_predicate(Arc::clone(&file_schema), predicate), ); - let scan_config = - FileScanConfig::new(ObjectStoreUrl::local_filesystem(), file_schema, file_source) - .with_file_groups(vec![FileGroup::new(vec![PartitionedFile::new( - "/path/to/file.parquet".to_string(), - 1024, - )])]) - .with_statistics(Statistics { - num_rows: Precision::Inexact(100), - total_byte_size: Precision::Inexact(1024), - column_statistics: Statistics::unknown_column(&Arc::new(Schema::new( - vec![Field::new("col", DataType::Utf8, false)], - ))), - }); - - roundtrip_test(scan_config.build()) + let scan_config = FileScanConfigBuilder::new( + ObjectStoreUrl::local_filesystem(), + file_schema, + file_source, + ) + .with_file_groups(vec![FileGroup::new(vec![PartitionedFile::new( + "/path/to/file.parquet".to_string(), + 1024, + )])]) + .with_statistics(Statistics { + num_rows: Precision::Inexact(100), + total_byte_size: Precision::Inexact(1024), + column_statistics: Statistics::unknown_column(&Arc::new(Schema::new(vec![ + Field::new("col", DataType::Utf8, false), + ]))), + }) + .build(); + + roundtrip_test(DataSourceExec::from_data_source(scan_config)) } #[tokio::test] @@ -768,18 +773,22 @@ async fn roundtrip_parquet_exec_with_table_partition_cols() -> Result<()> { let schema = Arc::new(Schema::new(vec![Field::new("col", DataType::Utf8, false)])); let file_source = Arc::new(ParquetSource::default()); - let scan_config = - FileScanConfig::new(ObjectStoreUrl::local_filesystem(), schema, file_source) - .with_projection(Some(vec![0, 1])) - .with_file_group(FileGroup::new(vec![file_group])) - .with_table_partition_cols(vec![Field::new( - "part".to_string(), - wrap_partition_type_in_dict(DataType::Int16), - false, - )]) - .with_newlines_in_values(false); + let scan_config = FileScanConfigBuilder::new( + ObjectStoreUrl::local_filesystem(), + schema, + file_source, + ) + .with_projection(Some(vec![0, 1])) + .with_file_group(FileGroup::new(vec![file_group])) + .with_table_partition_cols(vec![Field::new( + "part".to_string(), + wrap_partition_type_in_dict(DataType::Int16), + false, + )]) + .with_newlines_in_values(false) + .build(); - roundtrip_test(scan_config.build()) + roundtrip_test(DataSourceExec::from_data_source(scan_config)) } #[test] @@ -796,19 +805,23 @@ fn roundtrip_parquet_exec_with_custom_predicate_expr() -> Result<()> { .with_predicate(Arc::clone(&file_schema), custom_predicate_expr), ); - let scan_config = - FileScanConfig::new(ObjectStoreUrl::local_filesystem(), file_schema, file_source) - .with_file_groups(vec![FileGroup::new(vec![PartitionedFile::new( - "/path/to/file.parquet".to_string(), - 1024, - )])]) - .with_statistics(Statistics { - num_rows: Precision::Inexact(100), - total_byte_size: Precision::Inexact(1024), - column_statistics: Statistics::unknown_column(&Arc::new(Schema::new( - vec![Field::new("col", DataType::Utf8, false)], - ))), - }); + let scan_config = FileScanConfigBuilder::new( + ObjectStoreUrl::local_filesystem(), + file_schema, + file_source, + ) + .with_file_groups(vec![FileGroup::new(vec![PartitionedFile::new( + "/path/to/file.parquet".to_string(), + 1024, + )])]) + .with_statistics(Statistics { + num_rows: Precision::Inexact(100), + total_byte_size: Precision::Inexact(1024), + column_statistics: Statistics::unknown_column(&Arc::new(Schema::new(vec![ + Field::new("col", DataType::Utf8, false), + ]))), + }) + .build(); #[derive(Debug, Clone, Eq)] struct CustomPredicateExpr { @@ -919,7 +932,7 @@ fn roundtrip_parquet_exec_with_custom_predicate_expr() -> Result<()> { } } - let exec_plan = scan_config.build(); + let exec_plan = DataSourceExec::from_data_source(scan_config); let ctx = SessionContext::new(); roundtrip_test_and_return(exec_plan, &ctx, &CustomPhysicalExtensionCodec {})?; @@ -1605,7 +1618,7 @@ async fn roundtrip_projection_source() -> Result<()> { let statistics = Statistics::new_unknown(&schema); let file_source = ParquetSource::default().with_statistics(statistics.clone()); - let scan_config = FileScanConfig::new( + let scan_config = FileScanConfigBuilder::new( ObjectStoreUrl::local_filesystem(), schema.clone(), file_source, @@ -1615,12 +1628,13 @@ async fn roundtrip_projection_source() -> Result<()> { 1024, )])]) .with_statistics(statistics) - .with_projection(Some(vec![0, 1, 2])); + .with_projection(Some(vec![0, 1, 2])) + .build(); let filter = Arc::new( FilterExec::try_new( Arc::new(BinaryExpr::new(col("c", &schema)?, Operator::Eq, lit(1))), - scan_config.build(), + DataSourceExec::from_data_source(scan_config), )? .with_projection(Some(vec![0, 1]))?, ); diff --git a/datafusion/substrait/src/physical_plan/consumer.rs b/datafusion/substrait/src/physical_plan/consumer.rs index cf1e5c2db97f..4990054ac7fc 100644 --- a/datafusion/substrait/src/physical_plan/consumer.rs +++ b/datafusion/substrait/src/physical_plan/consumer.rs @@ -22,13 +22,20 @@ use datafusion::arrow::datatypes::{DataType, Field, Schema}; use datafusion::common::{not_impl_err, substrait_err}; use datafusion::datasource::listing::PartitionedFile; use datafusion::datasource::object_store::ObjectStoreUrl; -use datafusion::datasource::physical_plan::{FileGroup, FileScanConfig, ParquetSource}; +use datafusion::datasource::physical_plan::{ + FileGroup, FileScanConfigBuilder, ParquetSource, +}; use datafusion::error::{DataFusionError, Result}; use datafusion::physical_plan::ExecutionPlan; use datafusion::prelude::SessionContext; +use crate::variation_const::{ + DEFAULT_CONTAINER_TYPE_VARIATION_REF, LARGE_CONTAINER_TYPE_VARIATION_REF, + VIEW_CONTAINER_TYPE_VARIATION_REF, +}; use async_recursion::async_recursion; use chrono::DateTime; +use datafusion::datasource::memory::DataSourceExec; use object_store::ObjectMeta; use substrait::proto::r#type::{Kind, Nullability}; use substrait::proto::read_rel::local_files::file_or_files::PathType; @@ -37,11 +44,6 @@ use substrait::proto::{ expression::MaskExpression, read_rel::ReadType, rel::RelType, Rel, }; -use crate::variation_const::{ - DEFAULT_CONTAINER_TYPE_VARIATION_REF, LARGE_CONTAINER_TYPE_VARIATION_REF, - VIEW_CONTAINER_TYPE_VARIATION_REF, -}; - /// Convert Substrait Rel to DataFusion ExecutionPlan #[async_recursion] pub async fn from_substrait_rel( @@ -49,7 +51,7 @@ pub async fn from_substrait_rel( rel: &Rel, _extensions: &HashMap, ) -> Result> { - let mut base_config; + let mut base_config_builder; let source = Arc::new(ParquetSource::default()); match &rel.rel_type { @@ -78,7 +80,7 @@ pub async fn from_substrait_rel( .collect::>>() { Ok(fields) => { - base_config = FileScanConfig::new( + base_config_builder = FileScanConfigBuilder::new( ObjectStoreUrl::local_filesystem(), Arc::new(Schema::new(fields)), source, @@ -139,7 +141,8 @@ pub async fn from_substrait_rel( file_groups[part_index].push(partitioned_file) } - base_config = base_config.with_file_groups(file_groups); + base_config_builder = + base_config_builder.with_file_groups(file_groups); if let Some(MaskExpression { select, .. }) = &read.projection { if let Some(projection) = &select.as_ref() { @@ -148,11 +151,15 @@ pub async fn from_substrait_rel( .iter() .map(|item| item.field as usize) .collect(); - base_config.projection = Some(column_indices); + base_config_builder = + base_config_builder.with_projection(Some(column_indices)); } } - Ok(base_config.build() as Arc) + Ok( + DataSourceExec::from_data_source(base_config_builder.build()) + as Arc, + ) } _ => not_impl_err!( "Only LocalFile reads are supported when parsing physical" diff --git a/datafusion/substrait/tests/cases/roundtrip_physical_plan.rs b/datafusion/substrait/tests/cases/roundtrip_physical_plan.rs index d8b386773290..64599465f96f 100644 --- a/datafusion/substrait/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/substrait/tests/cases/roundtrip_physical_plan.rs @@ -22,19 +22,22 @@ use datafusion::arrow::datatypes::Schema; use datafusion::dataframe::DataFrame; use datafusion::datasource::listing::PartitionedFile; use datafusion::datasource::object_store::ObjectStoreUrl; -use datafusion::datasource::physical_plan::{FileGroup, FileScanConfig, ParquetSource}; +use datafusion::datasource::physical_plan::{ + FileGroup, FileScanConfigBuilder, ParquetSource, +}; use datafusion::error::Result; use datafusion::physical_plan::{displayable, ExecutionPlan}; use datafusion::prelude::{ParquetReadOptions, SessionContext}; use datafusion_substrait::physical_plan::{consumer, producer}; +use datafusion::datasource::memory::DataSourceExec; use substrait::proto::extensions; #[tokio::test] async fn parquet_exec() -> Result<()> { let source = Arc::new(ParquetSource::default()); - let scan_config = FileScanConfig::new( + let scan_config = FileScanConfigBuilder::new( ObjectStoreUrl::local_filesystem(), Arc::new(Schema::empty()), source, @@ -48,8 +51,10 @@ async fn parquet_exec() -> Result<()> { "file://foo/part-1.parquet".to_string(), 123, )]), - ]); - let parquet_exec: Arc = scan_config.build(); + ]) + .build(); + let parquet_exec: Arc = + DataSourceExec::from_data_source(scan_config); let mut extension_info: ( Vec,