Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 65 additions & 21 deletions vortex-datafusion/src/persistent/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use object_store::{ObjectMeta, ObjectStore};
use vortex::dtype::arrow::FromArrowType;
use vortex::dtype::{DType, Nullability, PType};
use vortex::error::{VortexExpect, VortexResult, vortex_err};
use vortex::file::VORTEX_FILE_EXTENSION;
use vortex::file::{VORTEX_FILE_EXTENSION, VortexWriteOptionsFactory};
use vortex::metrics::VortexMetrics;
use vortex::scalar::Scalar;
use vortex::session::VortexSession;
Expand All @@ -50,6 +50,7 @@ pub struct VortexFormat {
session: Arc<VortexSession>,
file_cache: VortexFileCache,
opts: VortexOptions,
write_options_factory: Arc<VortexWriteOptionsFactory>,
}

impl Debug for VortexFormat {
Expand Down Expand Up @@ -81,6 +82,7 @@ impl Eq for VortexOptions {}
pub struct VortexFormatFactory {
session: Arc<VortexSession>,
options: Option<VortexOptions>,
write_options_factory: Option<VortexWriteOptionsFactory>,
}

impl GetExt for VortexFormatFactory {
Expand All @@ -96,16 +98,7 @@ impl VortexFormatFactory {
Self {
session: Arc::new(VortexSession::default()),
options: None,
}
}

/// Creates a new instance with customized session and default options for all [`VortexFormat`] instances created from this factory.
///
/// The options can be overridden by table-level configuration pass in [`FileFormatFactory::create`].
pub fn new_with_options(session: Arc<VortexSession>, options: VortexOptions) -> Self {
Self {
session,
options: Some(options),
write_options_factory: None,
}
}

Expand All @@ -121,6 +114,23 @@ impl VortexFormatFactory {
self.options = Some(options);
self
}

/// Override the default write options for this factory.
////
/// For example:
/// ```rust
/// use vortex_datafusion::VortexFormatFactory;
/// use vortex::file::VortexWriteOptionsFactory;
///
/// let factory = VortexFormatFactory::new().with_write_options_factory(VortexWriteOptionsFactory::default());
/// ```
pub fn with_write_options_factory(
mut self,
write_options_factory: VortexWriteOptionsFactory,
) -> Self {
self.write_options_factory = Some(write_options_factory);
self
}
}

impl FileFormatFactory for VortexFormatFactory {
Expand All @@ -139,10 +149,13 @@ impl FileFormatFactory for VortexFormatFactory {
}
}

Ok(Arc::new(VortexFormat::new_with_options(
self.session.clone(),
opts,
)))
let write_opts = self.write_options_factory.clone().unwrap_or_default();

Ok(Arc::new(
VortexFormat::new(self.session.clone())
.with_options(opts)
.with_write_options_factory(write_opts),
))
}

fn default(&self) -> Arc<dyn FileFormat> {
Expand All @@ -163,11 +176,7 @@ impl Default for VortexFormat {
impl VortexFormat {
/// Create a new instance with default options.
pub fn new(session: Arc<VortexSession>) -> Self {
Self::new_with_options(session, VortexOptions::default())
}

/// Creates a new instance with configured by a [`VortexOptions`].
pub fn new_with_options(session: Arc<VortexSession>, opts: VortexOptions) -> Self {
let opts = VortexOptions::default();
Self {
session: session.clone(),
file_cache: VortexFileCache::new(
Expand All @@ -176,13 +185,48 @@ impl VortexFormat {
session,
),
opts,
write_options_factory: VortexWriteOptionsFactory::default().into(),
}
}

/// Override the default options for this format.
////
/// For example:
/// ```rust
/// use vortex_datafusion::{VortexFormat, VortexOptions};
///
/// let format = VortexFormat::default().with_options(VortexOptions::default());
/// ```
pub fn with_options(mut self, opts: VortexOptions) -> Self {
self.opts = opts;
self
}

/// Override the default write options for this format.
//// For example:
/// ```rust
/// use vortex_datafusion::VortexFormat;
/// use vortex::file::VortexWriteOptionsFactory;
///
/// let format = VortexFormat::default().with_write_options_factory(VortexWriteOptionsFactory::default());
/// ```
pub fn with_write_options_factory(
mut self,
write_options_factory: impl Into<Arc<VortexWriteOptionsFactory>>,
) -> Self {
self.write_options_factory = write_options_factory.into();
self
}

/// Return the format specific configuration
pub fn options(&self) -> &VortexOptions {
&self.opts
}

/// Return the write options factory
pub fn write_options_factory(&self) -> Arc<VortexWriteOptionsFactory> {
Arc::clone(&self.write_options_factory)
}
}

#[async_trait]
Expand Down Expand Up @@ -395,7 +439,7 @@ impl FileFormat for VortexFormat {
}

let schema = conf.output_schema().clone();
let sink = Arc::new(VortexSink::new(conf, schema));
let sink = Arc::new(VortexSink::new(conf, schema, self.write_options_factory()));

Ok(Arc::new(DataSinkExec::new(input, sink, order_requirements)) as _)
}
Expand Down
19 changes: 15 additions & 4 deletions vortex-datafusion/src/persistent/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,27 @@ use vortex::arrow::FromArrowArray;
use vortex::dtype::DType;
use vortex::dtype::arrow::FromArrowType;
use vortex::error::VortexResult;
use vortex::file::VortexWriteOptions;
use vortex::file::VortexWriteOptionsFactory;
use vortex::io::{ObjectStoreWriter, VortexWrite};
use vortex::stream::ArrayStreamAdapter;

pub struct VortexSink {
config: FileSinkConfig,
schema: SchemaRef,
write_options_factory: Arc<VortexWriteOptionsFactory>,
}

impl VortexSink {
pub fn new(config: FileSinkConfig, schema: SchemaRef) -> Self {
Self { config, schema }
pub fn new(
config: FileSinkConfig,
schema: SchemaRef,
write_options_factory: Arc<VortexWriteOptionsFactory>,
) -> Self {
Self {
config,
schema,
write_options_factory,
}
}
}

Expand Down Expand Up @@ -106,6 +115,7 @@ impl FileSink for VortexSink {
let row_counter = row_counter.clone();
let object_store = object_store.clone();
let writer_schema = get_writer_schema(&self.config);
let write_options_factory = Arc::clone(&self.write_options_factory);
let dtype = DType::from_arrow(writer_schema);

// We need to spawn work because there's a dependency between the different files. If one file has too many batches buffered,
Expand All @@ -126,7 +136,8 @@ impl FileSink for VortexSink {
))
})?;

VortexWriteOptions::default()
let write_options = write_options_factory.build();
write_options
.write(&mut sink, stream_adapter)
.await
.map_err(|e| {
Expand Down
110 changes: 107 additions & 3 deletions vortex-file/src/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,91 @@ use crate::footer::FileStatistics;
use crate::segments::writer::BufferedSegmentSink;
use crate::{Footer, MAGIC_BYTES, WriteStrategyBuilder};

const DEFAULT_EXCLUDE_DTYPE: bool = false;
const DEFAULT_MAX_VARIABLE_LENGTH_STATISTICS_SIZE: usize = 64;
const DEFAULT_FILE_STATISTICS: &[Stat] = PRUNING_STATS;

/// Factory for creating [`VortexWriteOptions`] with custom defaults.
///
/// This can be used to configure writer options before acquiring a handle, where we later reuse the options but need to source an available handle.
///
/// This factory maintains the default behaviour of [`VortexWriteOptions::default`].
#[derive(Default, Clone)]
pub struct VortexWriteOptionsFactory {
strategy: Option<Arc<dyn LayoutStrategy>>,
exclude_dtype: Option<bool>,
max_variable_length_statistics_size: Option<usize>,
file_statistics: Option<Vec<Stat>>,
}

impl std::fmt::Debug for VortexWriteOptionsFactory {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("VortexWriteOptions")
.field("exclude_dtype", &self.exclude_dtype)
.field(
"max_variable_length_statistics_size",
&self.max_variable_length_statistics_size,
)
.field("file_statistics", &self.file_statistics)
.finish()
}
}

impl VortexWriteOptionsFactory {
/// Create a new builder with default settings.
pub fn new() -> Self {
Self::default()
}

/// Replace the default layout strategy with the provided one.
pub fn with_strategy(mut self, strategy: Arc<dyn LayoutStrategy>) -> Self {
self.strategy = Some(strategy);
self
}

/// Exclude the DType from the Vortex file. You must provide the DType to the reader.
///
/// See [`VortexWriteOptions::exclude_dtype`] for details.
pub fn exclude_dtype(mut self) -> Self {
self.exclude_dtype = Some(true);
self
}

pub fn with_max_variable_length_statistics_size(mut self, size: usize) -> Self {
self.max_variable_length_statistics_size = Some(size);
self
}

/// Configure which statistics to compute at the file-level.
///
/// See [`VortexWriteOptions::with_file_statistics`] for details.
pub fn with_file_statistics(mut self, stats: Vec<Stat>) -> Self {
self.file_statistics = Some(stats);
self
}

/// Build the [`VortexWriteOptions`] with the configured settings.
///
/// Finds an appropriate [`Handle`] automatically.
pub fn build(&self) -> VortexWriteOptions {
VortexWriteOptions {
strategy: self
.strategy
.clone()
.unwrap_or_else(|| WriteStrategyBuilder::new().build()),
exclude_dtype: self.exclude_dtype.unwrap_or(DEFAULT_EXCLUDE_DTYPE),
max_variable_length_statistics_size: self
.max_variable_length_statistics_size
.unwrap_or(DEFAULT_MAX_VARIABLE_LENGTH_STATISTICS_SIZE),
file_statistics: self
.file_statistics
.clone()
.unwrap_or_else(|| DEFAULT_FILE_STATISTICS.to_vec()),
handle: Handle::find(),
}
}
}

/// Configure a new writer, which can eventually be used to write an [`ArrayStream`] into a sink that implements [`VortexWrite`].
///
/// Unless overridden, the default [write strategy][crate::WriteStrategyBuilder] will be used with no
Expand All @@ -45,9 +130,9 @@ impl Default for VortexWriteOptions {
fn default() -> Self {
Self {
strategy: WriteStrategyBuilder::new().build(),
exclude_dtype: false,
file_statistics: PRUNING_STATS.to_vec(),
max_variable_length_statistics_size: 64,
exclude_dtype: DEFAULT_EXCLUDE_DTYPE,
file_statistics: DEFAULT_FILE_STATISTICS.to_vec(),
max_variable_length_statistics_size: DEFAULT_MAX_VARIABLE_LENGTH_STATISTICS_SIZE,
handle: Handle::find(),
}
}
Expand Down Expand Up @@ -447,3 +532,22 @@ impl WriteSummary {
self.footer.row_count()
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_write_options_factory() {
let factory = VortexWriteOptionsFactory::new()
.exclude_dtype()
.with_max_variable_length_statistics_size(128)
.with_file_statistics(vec![Stat::Min, Stat::Max]);
let options = factory.build();

assert!(options.exclude_dtype);
assert_eq!(options.max_variable_length_statistics_size, 128);
assert_eq!(options.file_statistics, vec![Stat::Min, Stat::Max]);
assert_eq!(options.handle.is_some(), false); // test is running synchronously, so no handle
}
}
Loading