diff --git a/vortex-datafusion/src/persistent/format.rs b/vortex-datafusion/src/persistent/format.rs index 45f8607d782..81077be798b 100644 --- a/vortex-datafusion/src/persistent/format.rs +++ b/vortex-datafusion/src/persistent/format.rs @@ -32,7 +32,7 @@ use object_store::{ObjectMeta, ObjectStore}; use vortex::dtype::arrow::FromArrowType; use vortex::dtype::{DType, Nullability, PType}; use vortex::error::{VortexExpect, VortexResult, vortex_err}; -use vortex::file::VORTEX_FILE_EXTENSION; +use vortex::file::{VORTEX_FILE_EXTENSION, VortexWriteOptionsFactory}; use vortex::metrics::VortexMetrics; use vortex::scalar::Scalar; use vortex::session::VortexSession; @@ -50,6 +50,7 @@ pub struct VortexFormat { session: Arc, file_cache: VortexFileCache, opts: VortexOptions, + write_options_factory: Arc, } impl Debug for VortexFormat { @@ -81,6 +82,7 @@ impl Eq for VortexOptions {} pub struct VortexFormatFactory { session: Arc, options: Option, + write_options_factory: Option, } impl GetExt for VortexFormatFactory { @@ -96,16 +98,7 @@ impl VortexFormatFactory { Self { session: Arc::new(VortexSession::default()), options: None, - } - } - - /// Creates a new instance with customized session and default options for all [`VortexFormat`] instances created from this factory. - /// - /// The options can be overridden by table-level configuration pass in [`FileFormatFactory::create`]. - pub fn new_with_options(session: Arc, options: VortexOptions) -> Self { - Self { - session, - options: Some(options), + write_options_factory: None, } } @@ -121,6 +114,23 @@ impl VortexFormatFactory { self.options = Some(options); self } + + /// Override the default write options for this factory. + //// + /// For example: + /// ```rust + /// use vortex_datafusion::VortexFormatFactory; + /// use vortex::file::VortexWriteOptionsFactory; + /// + /// let factory = VortexFormatFactory::new().with_write_options_factory(VortexWriteOptionsFactory::default()); + /// ``` + pub fn with_write_options_factory( + mut self, + write_options_factory: VortexWriteOptionsFactory, + ) -> Self { + self.write_options_factory = Some(write_options_factory); + self + } } impl FileFormatFactory for VortexFormatFactory { @@ -139,10 +149,13 @@ impl FileFormatFactory for VortexFormatFactory { } } - Ok(Arc::new(VortexFormat::new_with_options( - self.session.clone(), - opts, - ))) + let write_opts = self.write_options_factory.clone().unwrap_or_default(); + + Ok(Arc::new( + VortexFormat::new(self.session.clone()) + .with_options(opts) + .with_write_options_factory(write_opts), + )) } fn default(&self) -> Arc { @@ -163,11 +176,7 @@ impl Default for VortexFormat { impl VortexFormat { /// Create a new instance with default options. pub fn new(session: Arc) -> Self { - Self::new_with_options(session, VortexOptions::default()) - } - - /// Creates a new instance with configured by a [`VortexOptions`]. - pub fn new_with_options(session: Arc, opts: VortexOptions) -> Self { + let opts = VortexOptions::default(); Self { session: session.clone(), file_cache: VortexFileCache::new( @@ -176,13 +185,48 @@ impl VortexFormat { session, ), opts, + write_options_factory: VortexWriteOptionsFactory::default().into(), } } + /// Override the default options for this format. + //// + /// For example: + /// ```rust + /// use vortex_datafusion::{VortexFormat, VortexOptions}; + /// + /// let format = VortexFormat::default().with_options(VortexOptions::default()); + /// ``` + pub fn with_options(mut self, opts: VortexOptions) -> Self { + self.opts = opts; + self + } + + /// Override the default write options for this format. + //// For example: + /// ```rust + /// use vortex_datafusion::VortexFormat; + /// use vortex::file::VortexWriteOptionsFactory; + /// + /// let format = VortexFormat::default().with_write_options_factory(VortexWriteOptionsFactory::default()); + /// ``` + pub fn with_write_options_factory( + mut self, + write_options_factory: impl Into>, + ) -> Self { + self.write_options_factory = write_options_factory.into(); + self + } + /// Return the format specific configuration pub fn options(&self) -> &VortexOptions { &self.opts } + + /// Return the write options factory + pub fn write_options_factory(&self) -> Arc { + Arc::clone(&self.write_options_factory) + } } #[async_trait] @@ -395,7 +439,7 @@ impl FileFormat for VortexFormat { } let schema = conf.output_schema().clone(); - let sink = Arc::new(VortexSink::new(conf, schema)); + let sink = Arc::new(VortexSink::new(conf, schema, self.write_options_factory())); Ok(Arc::new(DataSinkExec::new(input, sink, order_requirements)) as _) } diff --git a/vortex-datafusion/src/persistent/sink.rs b/vortex-datafusion/src/persistent/sink.rs index 8a97b5118f8..dd84012276f 100644 --- a/vortex-datafusion/src/persistent/sink.rs +++ b/vortex-datafusion/src/persistent/sink.rs @@ -25,18 +25,27 @@ use vortex::arrow::FromArrowArray; use vortex::dtype::DType; use vortex::dtype::arrow::FromArrowType; use vortex::error::VortexResult; -use vortex::file::VortexWriteOptions; +use vortex::file::VortexWriteOptionsFactory; use vortex::io::{ObjectStoreWriter, VortexWrite}; use vortex::stream::ArrayStreamAdapter; pub struct VortexSink { config: FileSinkConfig, schema: SchemaRef, + write_options_factory: Arc, } impl VortexSink { - pub fn new(config: FileSinkConfig, schema: SchemaRef) -> Self { - Self { config, schema } + pub fn new( + config: FileSinkConfig, + schema: SchemaRef, + write_options_factory: Arc, + ) -> Self { + Self { + config, + schema, + write_options_factory, + } } } @@ -106,6 +115,7 @@ impl FileSink for VortexSink { let row_counter = row_counter.clone(); let object_store = object_store.clone(); let writer_schema = get_writer_schema(&self.config); + let write_options_factory = Arc::clone(&self.write_options_factory); let dtype = DType::from_arrow(writer_schema); // We need to spawn work because there's a dependency between the different files. If one file has too many batches buffered, @@ -126,7 +136,8 @@ impl FileSink for VortexSink { )) })?; - VortexWriteOptions::default() + let write_options = write_options_factory.build(); + write_options .write(&mut sink, stream_adapter) .await .map_err(|e| { diff --git a/vortex-file/src/writer.rs b/vortex-file/src/writer.rs index a4c1f9b75ec..cdb7671b562 100644 --- a/vortex-file/src/writer.rs +++ b/vortex-file/src/writer.rs @@ -29,6 +29,91 @@ use crate::footer::FileStatistics; use crate::segments::writer::BufferedSegmentSink; use crate::{Footer, MAGIC_BYTES, WriteStrategyBuilder}; +const DEFAULT_EXCLUDE_DTYPE: bool = false; +const DEFAULT_MAX_VARIABLE_LENGTH_STATISTICS_SIZE: usize = 64; +const DEFAULT_FILE_STATISTICS: &[Stat] = PRUNING_STATS; + +/// Factory for creating [`VortexWriteOptions`] with custom defaults. +/// +/// This can be used to configure writer options before acquiring a handle, where we later reuse the options but need to source an available handle. +/// +/// This factory maintains the default behaviour of [`VortexWriteOptions::default`]. +#[derive(Default, Clone)] +pub struct VortexWriteOptionsFactory { + strategy: Option>, + exclude_dtype: Option, + max_variable_length_statistics_size: Option, + file_statistics: Option>, +} + +impl std::fmt::Debug for VortexWriteOptionsFactory { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("VortexWriteOptions") + .field("exclude_dtype", &self.exclude_dtype) + .field( + "max_variable_length_statistics_size", + &self.max_variable_length_statistics_size, + ) + .field("file_statistics", &self.file_statistics) + .finish() + } +} + +impl VortexWriteOptionsFactory { + /// Create a new builder with default settings. + pub fn new() -> Self { + Self::default() + } + + /// Replace the default layout strategy with the provided one. + pub fn with_strategy(mut self, strategy: Arc) -> Self { + self.strategy = Some(strategy); + self + } + + /// Exclude the DType from the Vortex file. You must provide the DType to the reader. + /// + /// See [`VortexWriteOptions::exclude_dtype`] for details. + pub fn exclude_dtype(mut self) -> Self { + self.exclude_dtype = Some(true); + self + } + + pub fn with_max_variable_length_statistics_size(mut self, size: usize) -> Self { + self.max_variable_length_statistics_size = Some(size); + self + } + + /// Configure which statistics to compute at the file-level. + /// + /// See [`VortexWriteOptions::with_file_statistics`] for details. + pub fn with_file_statistics(mut self, stats: Vec) -> Self { + self.file_statistics = Some(stats); + self + } + + /// Build the [`VortexWriteOptions`] with the configured settings. + /// + /// Finds an appropriate [`Handle`] automatically. + pub fn build(&self) -> VortexWriteOptions { + VortexWriteOptions { + strategy: self + .strategy + .clone() + .unwrap_or_else(|| WriteStrategyBuilder::new().build()), + exclude_dtype: self.exclude_dtype.unwrap_or(DEFAULT_EXCLUDE_DTYPE), + max_variable_length_statistics_size: self + .max_variable_length_statistics_size + .unwrap_or(DEFAULT_MAX_VARIABLE_LENGTH_STATISTICS_SIZE), + file_statistics: self + .file_statistics + .clone() + .unwrap_or_else(|| DEFAULT_FILE_STATISTICS.to_vec()), + handle: Handle::find(), + } + } +} + /// Configure a new writer, which can eventually be used to write an [`ArrayStream`] into a sink that implements [`VortexWrite`]. /// /// Unless overridden, the default [write strategy][crate::WriteStrategyBuilder] will be used with no @@ -45,9 +130,9 @@ impl Default for VortexWriteOptions { fn default() -> Self { Self { strategy: WriteStrategyBuilder::new().build(), - exclude_dtype: false, - file_statistics: PRUNING_STATS.to_vec(), - max_variable_length_statistics_size: 64, + exclude_dtype: DEFAULT_EXCLUDE_DTYPE, + file_statistics: DEFAULT_FILE_STATISTICS.to_vec(), + max_variable_length_statistics_size: DEFAULT_MAX_VARIABLE_LENGTH_STATISTICS_SIZE, handle: Handle::find(), } } @@ -447,3 +532,22 @@ impl WriteSummary { self.footer.row_count() } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_write_options_factory() { + let factory = VortexWriteOptionsFactory::new() + .exclude_dtype() + .with_max_variable_length_statistics_size(128) + .with_file_statistics(vec![Stat::Min, Stat::Max]); + let options = factory.build(); + + assert!(options.exclude_dtype); + assert_eq!(options.max_variable_length_statistics_size, 128); + assert_eq!(options.file_statistics, vec![Stat::Min, Stat::Max]); + assert_eq!(options.handle.is_some(), false); // test is running synchronously, so no handle + } +}