Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add FileScanConfigBuilder #15352

Merged
merged 17 commits into from
Mar 28, 2025
15 changes: 9 additions & 6 deletions datafusion-examples/examples/advanced_parquet_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use datafusion::common::{
use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::physical_plan::parquet::ParquetAccessPlan;
use datafusion::datasource::physical_plan::{
FileMeta, FileScanConfig, ParquetFileReaderFactory, ParquetSource,
FileMeta, FileScanConfigBuilder, ParquetFileReaderFactory, ParquetSource,
};
use datafusion::datasource::TableProvider;
use datafusion::execution::object_store::ObjectStoreUrl;
Expand All @@ -55,6 +55,7 @@ use arrow::array::{ArrayRef, Int32Array, RecordBatch, StringArray};
use arrow::datatypes::SchemaRef;
use async_trait::async_trait;
use bytes::Bytes;
use datafusion::datasource::memory::DataSourceExec;
use futures::future::BoxFuture;
use futures::FutureExt;
use object_store::ObjectStore;
Expand Down Expand Up @@ -498,13 +499,15 @@ impl TableProvider for IndexTableProvider {
// provide the factory to create parquet reader without re-reading metadata
.with_parquet_file_reader_factory(Arc::new(reader_factory)),
);
let file_scan_config = FileScanConfig::new(object_store_url, schema, file_source)
.with_limit(limit)
.with_projection(projection.cloned())
.with_file(partitioned_file);
let file_scan_config =
FileScanConfigBuilder::new(object_store_url, schema, file_source)
.with_limit(limit)
.with_projection(projection.cloned())
.with_file(partitioned_file)
.build();

// Finally, put it all together into a DataSourceExec
Ok(file_scan_config.build())
Ok(DataSourceExec::from_data_source(file_scan_config))
}

/// Tell DataFusion to push filters down to the scan method
Expand Down
15 changes: 8 additions & 7 deletions datafusion-examples/examples/csv_json_opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,14 @@ use datafusion::{
file_format::file_compression_type::FileCompressionType,
listing::PartitionedFile,
object_store::ObjectStoreUrl,
physical_plan::{
CsvSource, FileScanConfig, FileSource, FileStream, JsonOpener, JsonSource,
},
physical_plan::{CsvSource, FileSource, FileStream, JsonOpener, JsonSource},
},
error::Result,
physical_plan::metrics::ExecutionPlanMetricsSet,
test_util::aggr_test_schema,
};

use datafusion::datasource::physical_plan::FileScanConfigBuilder;
use futures::StreamExt;
use object_store::{local::LocalFileSystem, memory::InMemory, ObjectStore};

Expand All @@ -56,14 +55,15 @@ async fn csv_opener() -> Result<()> {

let path = std::path::Path::new(&path).canonicalize()?;

let scan_config = FileScanConfig::new(
let scan_config = FileScanConfigBuilder::new(
ObjectStoreUrl::local_filesystem(),
Arc::clone(&schema),
Arc::new(CsvSource::default()),
)
.with_projection(Some(vec![12, 0]))
.with_limit(Some(5))
.with_file(PartitionedFile::new(path.display().to_string(), 10));
.with_file(PartitionedFile::new(path.display().to_string(), 10))
.build();

let config = CsvSource::new(true, b',', b'"')
.with_comment(Some(b'#'))
Expand Down Expand Up @@ -121,14 +121,15 @@ async fn json_opener() -> Result<()> {
Arc::new(object_store),
);

let scan_config = FileScanConfig::new(
let scan_config = FileScanConfigBuilder::new(
ObjectStoreUrl::local_filesystem(),
schema,
Arc::new(JsonSource::default()),
)
.with_projection(Some(vec![1, 0]))
.with_limit(Some(5))
.with_file(PartitionedFile::new(path.to_string(), 10));
.with_file(PartitionedFile::new(path.to_string(), 10))
.build();

let mut stream = FileStream::new(
&scan_config,
Expand Down
18 changes: 10 additions & 8 deletions datafusion-examples/examples/parquet_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ use datafusion::common::{
internal_datafusion_err, DFSchema, DataFusionError, Result, ScalarValue,
};
use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::physical_plan::{FileScanConfig, ParquetSource};
use datafusion::datasource::memory::DataSourceExec;
use datafusion::datasource::physical_plan::{FileScanConfigBuilder, ParquetSource};
use datafusion::datasource::TableProvider;
use datafusion::execution::object_store::ObjectStoreUrl;
use datafusion::logical_expr::{
Expand Down Expand Up @@ -243,8 +244,8 @@ impl TableProvider for IndexTableProvider {
let object_store_url = ObjectStoreUrl::parse("file://")?;
let source =
Arc::new(ParquetSource::default().with_predicate(self.schema(), predicate));
let mut file_scan_config =
FileScanConfig::new(object_store_url, self.schema(), source)
let mut file_scan_config_builder =
FileScanConfigBuilder::new(object_store_url, self.schema(), source)
.with_projection(projection.cloned())
.with_limit(limit);

Expand All @@ -253,12 +254,13 @@ impl TableProvider for IndexTableProvider {
for (file_name, file_size) in files {
let path = self.dir.join(file_name);
let canonical_path = fs::canonicalize(path)?;
file_scan_config = file_scan_config.with_file(PartitionedFile::new(
canonical_path.display().to_string(),
file_size,
));
file_scan_config_builder = file_scan_config_builder.with_file(
PartitionedFile::new(canonical_path.display().to_string(), file_size),
);
}
Ok(file_scan_config.build())
Ok(DataSourceExec::from_data_source(
file_scan_config_builder.build(),
))
}

/// Tell DataFusion to push filters down to the scan method
Expand Down
10 changes: 8 additions & 2 deletions datafusion/core/src/datasource/file_format/arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ use datafusion_common::{
use datafusion_common_runtime::{JoinSet, SpawnedTask};
use datafusion_datasource::display::FileGroupDisplay;
use datafusion_datasource::file::FileSource;
use datafusion_datasource::file_scan_config::FileScanConfig;
use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
use datafusion_datasource::sink::{DataSink, DataSinkExec};
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
use datafusion_expr::dml::InsertOp;
Expand All @@ -58,6 +58,7 @@ use datafusion_physical_expr_common::sort_expr::LexRequirement;

use async_trait::async_trait;
use bytes::Bytes;
use datafusion_datasource::source::DataSourceExec;
use futures::stream::BoxStream;
use futures::StreamExt;
use object_store::{GetResultPayload, ObjectMeta, ObjectStore};
Expand Down Expand Up @@ -173,7 +174,12 @@ impl FileFormat for ArrowFormat {
conf: FileScanConfig,
_filters: Option<&Arc<dyn PhysicalExpr>>,
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(conf.with_source(Arc::new(ArrowSource::default())).build())
let source = Arc::new(ArrowSource::default());
let config = FileScanConfigBuilder::from(conf)
.with_source(source)
.build();

Ok(DataSourceExec::from_data_source(config))
}

async fn create_writer_physical_plan(
Expand Down
10 changes: 5 additions & 5 deletions datafusion/core/src/datasource/file_format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,8 @@ pub(crate) mod test_util {

use datafusion_catalog::Session;
use datafusion_common::Result;
use datafusion_datasource::{
file_format::FileFormat, file_scan_config::FileScanConfig, PartitionedFile,
};
use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
use datafusion_datasource::{file_format::FileFormat, PartitionedFile};
use datafusion_execution::object_store::ObjectStoreUrl;

use crate::test::object_store::local_unpartitioned_file;
Expand Down Expand Up @@ -79,15 +78,16 @@ pub(crate) mod test_util {
let exec = format
.create_physical_plan(
state,
FileScanConfig::new(
FileScanConfigBuilder::new(
ObjectStoreUrl::local_filesystem(),
file_schema,
format.file_source(),
)
.with_file_groups(file_groups)
.with_statistics(statistics)
.with_projection(projection)
.with_limit(limit),
.with_limit(limit)
.build(),
None,
)
.await?;
Expand Down
7 changes: 4 additions & 3 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use crate::datasource::{
use crate::execution::context::SessionState;
use datafusion_catalog::TableProvider;
use datafusion_common::{config_err, DataFusionError, Result};
use datafusion_datasource::file_scan_config::FileScanConfig;
use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
use datafusion_expr::dml::InsertOp;
use datafusion_expr::{utils::conjunction, Expr, TableProviderFilterPushDown};
use datafusion_expr::{SortExpr, TableType};
Expand Down Expand Up @@ -942,7 +942,7 @@ impl TableProvider for ListingTable {
.format
.create_physical_plan(
session_state,
FileScanConfig::new(
FileScanConfigBuilder::new(
object_store_url,
Arc::clone(&self.file_schema),
self.options.format.file_source(),
Expand All @@ -953,7 +953,8 @@ impl TableProvider for ListingTable {
.with_projection(projection.cloned())
.with_limit(limit)
.with_output_ordering(output_ordering)
.with_table_partition_cols(table_partition_cols),
.with_table_partition_cols(table_partition_cols)
.build(),
filters.as_ref(),
)
.await
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/datasource/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ impl TableProvider for MemTable {
source = source.try_with_sort_information(file_sort_order)?;
}

Ok(Arc::new(DataSourceExec::new(Arc::new(source))))
Ok(DataSourceExec::from_data_source(source))
}

/// Returns an ExecutionPlan that inserts the execution results of a given [`ExecutionPlan`] into this [`MemTable`].
Expand Down
15 changes: 10 additions & 5 deletions datafusion/core/src/datasource/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ mod tests {
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
use datafusion_common::test_util::batches_to_sort_string;
use datafusion_datasource::file_scan_config::FileScanConfig;
use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
use datafusion_datasource::schema_adapter::{
DefaultSchemaAdapterFactory, SchemaAdapter, SchemaAdapterFactory, SchemaMapper,
};
Expand All @@ -72,6 +72,7 @@ mod tests {

use ::object_store::path::Path;
use ::object_store::ObjectMeta;
use datafusion_datasource::source::DataSourceExec;
use datafusion_physical_plan::collect;
use tempfile::TempDir;

Expand Down Expand Up @@ -128,11 +129,15 @@ mod tests {
ParquetSource::default()
.with_schema_adapter_factory(Arc::new(TestSchemaAdapterFactory {})),
);
let base_conf =
FileScanConfig::new(ObjectStoreUrl::local_filesystem(), schema, source)
.with_file(partitioned_file);
let base_conf = FileScanConfigBuilder::new(
ObjectStoreUrl::local_filesystem(),
schema,
source,
)
.with_file(partitioned_file)
.build();

let parquet_exec = base_conf.build();
let parquet_exec = DataSourceExec::from_data_source(base_conf);

let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
Expand Down
33 changes: 20 additions & 13 deletions datafusion/core/src/datasource/physical_plan/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,14 @@ mod tests {
use datafusion_common::test_util::batches_to_string;
use datafusion_common::{test_util, Result, ScalarValue};
use datafusion_datasource::file_format::FileFormat;
use datafusion_datasource::file_scan_config::FileScanConfig;
use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
use datafusion_datasource::PartitionedFile;
use datafusion_datasource_avro::source::AvroSource;
use datafusion_datasource_avro::AvroFormat;
use datafusion_execution::object_store::ObjectStoreUrl;
use datafusion_physical_plan::ExecutionPlan;

use datafusion_datasource::source::DataSourceExec;
use futures::StreamExt;
use insta::assert_snapshot;
use object_store::chunked::ChunkedStore;
Expand Down Expand Up @@ -81,12 +82,16 @@ mod tests {
.await?;

let source = Arc::new(AvroSource::new());
let conf =
FileScanConfig::new(ObjectStoreUrl::local_filesystem(), file_schema, source)
.with_file(meta.into())
.with_projection(Some(vec![0, 1, 2]));

let source_exec = conf.build();
let conf = FileScanConfigBuilder::new(
ObjectStoreUrl::local_filesystem(),
file_schema,
source,
)
.with_file(meta.into())
.with_projection(Some(vec![0, 1, 2]))
.build();

let source_exec = DataSourceExec::from_data_source(conf);
assert_eq!(
source_exec
.properties()
Expand Down Expand Up @@ -153,11 +158,12 @@ mod tests {
let projection = Some(vec![0, 1, 2, actual_schema.fields().len()]);

let source = Arc::new(AvroSource::new());
let conf = FileScanConfig::new(object_store_url, file_schema, source)
let conf = FileScanConfigBuilder::new(object_store_url, file_schema, source)
.with_file(meta.into())
.with_projection(projection);
.with_projection(projection)
.build();

let source_exec = conf.build();
let source_exec = DataSourceExec::from_data_source(conf);
assert_eq!(
source_exec
.properties()
Expand Down Expand Up @@ -222,14 +228,15 @@ mod tests {

let projection = Some(vec![0, 1, file_schema.fields().len(), 2]);
let source = Arc::new(AvroSource::new());
let conf = FileScanConfig::new(object_store_url, file_schema, source)
let conf = FileScanConfigBuilder::new(object_store_url, file_schema, source)
// select specific columns of the files as well as the partitioning
// column which is supposed to be the last column in the table schema.
.with_projection(projection)
.with_file(partitioned_file)
.with_table_partition_cols(vec![Field::new("date", DataType::Utf8, false)]);
.with_table_partition_cols(vec![Field::new("date", DataType::Utf8, false)])
.build();

let source_exec = conf.build();
let source_exec = DataSourceExec::from_data_source(conf);

assert_eq!(
source_exec
Expand Down
Loading