Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 22 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ members = [
"vortex-python",
"vortex-scan",
"vortex-scalar",
"vortex-session",
"vortex-tui",
"vortex-utils",
"vortex-vector",
Expand Down Expand Up @@ -243,6 +244,7 @@ vortex-runend = { version = "0.1.0", path = "./encodings/runend", default-featur
vortex-scalar = { version = "0.1.0", path = "./vortex-scalar", default-features = false }
vortex-scan = { version = "0.1.0", path = "./vortex-scan", default-features = false }
vortex-sequence = { version = "0.1.0", path = "encodings/sequence", default-features = false }
vortex-session = { version = "0.1.0", path = "./vortex-session", default-features = false }
vortex-sparse = { version = "0.1.0", path = "./encodings/sparse", default-features = false }
vortex-tui = { version = "0.1.0", path = "./vortex-tui", default-features = false }
vortex-utils = { version = "0.1.0", path = "./vortex-utils", default-features = false }
Expand Down
10 changes: 5 additions & 5 deletions bench-vortex/src/clickbench/clickbench_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@ use tokio::fs::{OpenOptions, create_dir_all};
use tracing::{Instrument, info, warn};
use url::Url;
use vortex::error::VortexExpect;
use vortex::file::VortexWriteOptions;
use vortex::file::WriteOptionsSessionExt;
use vortex_datafusion::VortexFormat;

use crate::conversions::parquet_to_vortex;
#[cfg(feature = "lance")]
use crate::utils;
use crate::utils::file_utils::{idempotent, idempotent_async};
use crate::{CompactionStrategy, Format};
use crate::{CompactionStrategy, Format, SESSION};

pub static HITS_SCHEMA: LazyLock<Schema> = LazyLock::new(|| {
use DataType::*;
Expand Down Expand Up @@ -204,7 +204,7 @@ pub async fn convert_parquet_to_vortex(
.open(&vtx_file)
.await?;

let write_options = compaction.apply_options(VortexWriteOptions::default());
let write_options = compaction.apply_options(SESSION.write_options());

write_options.write(&mut f, array_stream).await?;

Expand Down Expand Up @@ -250,7 +250,7 @@ pub async fn register_vortex_files(
glob_pattern: Option<Pattern>,
) -> anyhow::Result<()> {
let vortex_path = input_path.join(&format!("{}/", Format::OnDiskVortex.name()))?;
let format = Arc::new(VortexFormat::default());
let format = Arc::new(VortexFormat::new(SESSION.clone()));

info!(
"Registering table from {vortex_path} with glob {:?}",
Expand Down Expand Up @@ -283,7 +283,7 @@ pub async fn register_vortex_compact_files(
glob_pattern: Option<Pattern>,
) -> anyhow::Result<()> {
let vortex_compact_path = input_path.join(&format!("{}/", Format::VortexCompact.name()))?;
let format = Arc::new(VortexFormat::default());
let format = Arc::new(VortexFormat::new(SESSION.clone()));

info!(
"Registering vortex-compact table from {vortex_compact_path} with glob {:?}",
Expand Down
9 changes: 6 additions & 3 deletions bench-vortex/src/compress/vortex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,23 @@ use std::sync::Arc;
use bytes::Bytes;
use futures::{StreamExt, pin_mut};
use vortex::Array;
use vortex::file::{VortexOpenOptions, VortexWriteOptions};
use vortex::file::{OpenOptionsSessionExt, WriteOptionsSessionExt};

use crate::SESSION;

#[inline(never)]
pub async fn vortex_compress_write(array: &dyn Array, buf: &mut Vec<u8>) -> anyhow::Result<u64> {
let mut cursor = Cursor::new(buf);
VortexWriteOptions::default()
SESSION
.write_options()
.write(&mut cursor, array.to_array_stream())
.await?;
Ok(cursor.position())
}

#[inline(never)]
pub async fn vortex_decompress_read(buf: Bytes) -> anyhow::Result<usize> {
let scan = VortexOpenOptions::new().open_buffer(buf)?.scan()?;
let scan = SESSION.open_options().open_buffer(buf)?.scan()?;
let schema = Arc::new(scan.dtype()?.to_arrow_schema()?);

let stream = scan.into_record_batch_stream(schema)?;
Expand Down
5 changes: 3 additions & 2 deletions bench-vortex/src/datasets/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use vortex_datafusion::VortexFormat;
#[cfg(feature = "lance")]
use {crate::Format, lance::datafusion::LanceTableProvider, lance::dataset::Dataset};

use crate::SESSION;
use crate::datasets::BenchmarkDataset;

pub async fn register_parquet_files(
Expand Down Expand Up @@ -85,7 +86,7 @@ pub async fn register_vortex_files(
&file_url,
glob.as_ref().map(|g| g.as_str()).unwrap_or("")
);
let format = Arc::new(VortexFormat::default());
let format = Arc::new(VortexFormat::new(SESSION.clone()));
let table_url = ListingTableUrl::try_new(file_url.clone(), glob)?;
let config = ListingTableConfig::new(table_url).with_listing_options(
ListingOptions::new(format).with_session_config_options(session.state().config()),
Expand Down Expand Up @@ -133,7 +134,7 @@ pub async fn register_vortex_compact_files(
&file_url,
glob.as_ref().map(|g| g.as_str()).unwrap_or("")
);
let format = Arc::new(VortexFormat::default());
let format = Arc::new(VortexFormat::new(SESSION.clone()));
let table_url = ListingTableUrl::try_new(file_url.clone(), glob)?;
let config = ListingTableConfig::new(table_url).with_listing_options(
ListingOptions::new(format).with_session_config_options(session.state().config()),
Expand Down
13 changes: 7 additions & 6 deletions bench-vortex/src/datasets/taxi_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use async_trait::async_trait;
use tokio::fs::File as TokioFile;
use tokio::io::AsyncWriteExt;
use vortex::ArrayRef;
use vortex::file::{VortexOpenOptions, VortexWriteOptions};
use vortex::file::{OpenOptionsSessionExt, WriteOptionsSessionExt};
use vortex::stream::ArrayStreamExt;
#[cfg(feature = "lance")]
use {
Expand All @@ -21,7 +21,7 @@ use {
use crate::conversions::parquet_to_vortex;
use crate::datasets::Dataset;
use crate::datasets::data_downloads::download_data;
use crate::{CompactionStrategy, IdempotentPath, idempotent_async};
use crate::{CompactionStrategy, IdempotentPath, SESSION, idempotent_async};

pub struct TaxiData;

Expand All @@ -45,7 +45,8 @@ pub async fn taxi_data_parquet() -> Result<PathBuf> {

pub async fn fetch_taxi_data() -> Result<ArrayRef> {
let vortex_data = taxi_data_vortex().await?;
Ok(VortexOpenOptions::new()
Ok(SESSION
.open_options()
.open(vortex_data)
.await?
.scan()?
Expand All @@ -58,7 +59,8 @@ pub async fn taxi_data_vortex() -> Result<PathBuf> {
idempotent_async("taxi/taxi.vortex", |output_fname| async move {
let buf = output_fname.to_path_buf();
let mut output_file = TokioFile::create(output_fname).await?;
VortexWriteOptions::default()
SESSION
.write_options()
.write(
&mut output_file,
parquet_to_vortex(taxi_data_parquet().await?)?,
Expand All @@ -76,8 +78,7 @@ pub async fn taxi_data_vortex_compact() -> Result<PathBuf> {
let mut output_file = TokioFile::create(output_fname).await?;

// This is the only difference to `taxi_data_vortex`.
let write_options =
CompactionStrategy::Compact.apply_options(VortexWriteOptions::default());
let write_options = CompactionStrategy::Compact.apply_options(SESSION.write_options());

write_options
.write(
Expand Down
6 changes: 3 additions & 3 deletions bench-vortex/src/datasets/tpch_l_comment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@ use glob::glob;
use vortex::arrays::ChunkedArray;
use vortex::dtype::Nullability::NonNullable;
use vortex::expr::{col, pack};
use vortex::file::VortexOpenOptions;
use vortex::file::OpenOptionsSessionExt;
use vortex::{Array, ArrayRef, IntoArray, ToCanonical};

use crate::datasets::Dataset;
use crate::tpch::tpchgen::{TpchGenOptions, generate_tpch_tables};
use crate::{Format, IdempotentPath};
use crate::{Format, IdempotentPath, SESSION};

pub struct TPCHLCommentChunked;

Expand Down Expand Up @@ -44,7 +44,7 @@ impl Dataset for TPCHLCommentChunked {
.to_string_lossy()
.as_ref(),
)? {
let file = VortexOpenOptions::new().open(path?).await?;
let file = SESSION.open_options().open(path?).await?;
let file_chunks: Vec<_> = file
.scan()?
.with_projection(pack(vec![("l_comment", col("l_comment"))], NonNullable))
Expand Down
10 changes: 6 additions & 4 deletions bench-vortex/src/downloadable_dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@
use async_trait::async_trait;
use tokio::fs::File;
use vortex::ArrayRef;
use vortex::file::{VortexOpenOptions, VortexWriteOptions};
use vortex::file::{OpenOptionsSessionExt, WriteOptionsSessionExt};
use vortex::stream::ArrayStreamExt;

use crate::conversions::parquet_to_vortex;
use crate::datasets::Dataset;
use crate::datasets::data_downloads::download_data;
use crate::{IdempotentPath, idempotent_async};
use crate::{IdempotentPath, SESSION, idempotent_async};

/// Datasets which can be downloaded over HTTP in Parquet format.
///
Expand Down Expand Up @@ -57,7 +57,8 @@ impl Dataset for DownloadableDataset {
let vortex = dir.join(format!("{}.vortex", self.name()));
download_data(parquet.clone(), self.parquet_url()).await?;
idempotent_async(&vortex, async |path| -> anyhow::Result<()> {
VortexWriteOptions::default()
SESSION
.write_options()
.write(
&mut File::create(path)
.await
Expand All @@ -70,7 +71,8 @@ impl Dataset for DownloadableDataset {
})
.await?;

Ok(VortexOpenOptions::new()
Ok(SESSION
.open_options()
.open(vortex.as_path())
.await?
.scan()?
Expand Down
14 changes: 9 additions & 5 deletions bench-vortex/src/fineweb/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@ use log::info;
use parquet::arrow::async_writer::AsyncFileWriter;
use url::Url;
use vortex::compressor::CompactCompressor;
use vortex::file::{VortexWriteOptions, WriteStrategyBuilder};
use vortex::file::{WriteOptionsSessionExt, WriteStrategyBuilder};
use vortex_datafusion::VortexFormat;

use crate::benchmark_trait::Benchmark;
use crate::conversions::parquet_to_vortex;
use crate::engines::EngineCtx;
use crate::{BenchmarkDataset, Format, Target, idempotent_async};
use crate::{BenchmarkDataset, Format, SESSION, Target, idempotent_async};

/// URL to the sample file
const SAMPLE_URL: &str = "https://huggingface.co/datasets/HuggingFaceFW/fineweb/resolve/v1.4.0/sample/10BT/001_00000.parquet";
Expand Down Expand Up @@ -146,7 +146,8 @@ impl Benchmark for Fineweb {
info!("Converting FineWeb to Vortex with default compressor");
let array_stream = parquet_to_vortex(parquet)?;
let w = tokio::fs::File::create(vortex_path).await?;
VortexWriteOptions::default()
SESSION
.write_options()
.write(w, array_stream)
.await
.map_err(|e| anyhow::anyhow!("Failed to write to VortexWriter: {e}"))
Expand All @@ -158,7 +159,8 @@ impl Benchmark for Fineweb {
info!("Converting FineWeb to Vortex with Compact compressor");
let array_stream = parquet_to_vortex(parquet)?;
let w = tokio::fs::File::create(vortex_path).await?;
VortexWriteOptions::default()
SESSION
.write_options()
.with_strategy(
WriteStrategyBuilder::new()
.with_compressor(CompactCompressor::default())
Expand Down Expand Up @@ -221,7 +223,9 @@ pub async fn register_table(
.with_listing_options(
ListingOptions::new(match format {
Format::Parquet => Arc::from(ParquetFormat::new()),
Format::OnDiskVortex | Format::VortexCompact => Arc::from(VortexFormat::default()),
Format::OnDiskVortex | Format::VortexCompact => {
Arc::from(VortexFormat::new(SESSION.clone()))
}
_ => anyhow::bail!("unsupported format for `fineweb` bench: {}", format),
})
.with_session_config_options(session.state().config()),
Expand Down
Loading
Loading