diff --git a/Cargo.lock b/Cargo.lock index 6910acc7165..92765777dbf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8505,6 +8505,7 @@ dependencies = [ "vortex-scalar", "vortex-scan", "vortex-sequence", + "vortex-session", "vortex-sparse", "vortex-utils", "vortex-zigzag", @@ -8587,6 +8588,7 @@ dependencies = [ "vortex-mask", "vortex-metrics", "vortex-scalar", + "vortex-session", "vortex-utils", "vortex-vector", ] @@ -8677,12 +8679,12 @@ dependencies = [ "anyhow", "arrow-array", "arrow-schema", + "async-fs", "cxx", "cxx-build", "futures", "paste", "take_mut", - "tokio", "vortex", ] @@ -8881,6 +8883,7 @@ dependencies = [ "vortex-mask", "vortex-proto", "vortex-scalar", + "vortex-session", "vortex-utils", ] @@ -8916,7 +8919,9 @@ dependencies = [ name = "vortex-ffi" version = "0.1.0" dependencies = [ + "async-fs", "cbindgen", + "futures", "itertools 0.14.0", "log", "mimalloc", @@ -8927,8 +8932,6 @@ dependencies = [ "prost 0.14.1", "simplelog", "tempfile", - "tokio", - "tokio-stream", "url", "vortex", ] @@ -8972,6 +8975,7 @@ dependencies = [ "vortex-scalar", "vortex-scan", "vortex-sequence", + "vortex-session", "vortex-sparse", "vortex-utils", "vortex-zigzag", @@ -9012,6 +9016,7 @@ dependencies = [ "itertools 0.14.0", "libfuzzer-sys", "strum 0.27.2", + "vortex", "vortex-array", "vortex-btrblocks", "vortex-buffer", @@ -9023,6 +9028,7 @@ dependencies = [ "vortex-layout", "vortex-mask", "vortex-scalar", + "vortex-session", "vortex-utils", ] @@ -9089,6 +9095,7 @@ dependencies = [ "vortex-buffer", "vortex-error", "vortex-metrics", + "vortex-session", "wasm-bindgen-futures", ] @@ -9172,6 +9179,7 @@ dependencies = [ "vortex-pco", "vortex-scalar", "vortex-sequence", + "vortex-session", "vortex-utils", "vortex-zstd", ] @@ -9192,6 +9200,7 @@ version = "0.1.0" dependencies = [ "getrandom 0.3.4", "parking_lot", + "vortex-session", "witchcraft-metrics", ] @@ -9301,6 +9310,7 @@ dependencies = [ "vortex-layout", "vortex-mask", "vortex-metrics", + "vortex-session", ] [[package]] @@ -9326,6 +9336,15 @@ dependencies = [ "vortex-vector", ] +[[package]] +name = "vortex-session" +version = "0.1.0" +dependencies = [ + "dashmap", + "vortex-error", + "vortex-utils", +] + [[package]] name = "vortex-sparse" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index 10e55b861de..19d70963248 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,6 +28,7 @@ members = [ "vortex-python", "vortex-scan", "vortex-scalar", + "vortex-session", "vortex-tui", "vortex-utils", "vortex-vector", @@ -243,6 +244,7 @@ vortex-runend = { version = "0.1.0", path = "./encodings/runend", default-featur vortex-scalar = { version = "0.1.0", path = "./vortex-scalar", default-features = false } vortex-scan = { version = "0.1.0", path = "./vortex-scan", default-features = false } vortex-sequence = { version = "0.1.0", path = "encodings/sequence", default-features = false } +vortex-session = { version = "0.1.0", path = "./vortex-session", default-features = false } vortex-sparse = { version = "0.1.0", path = "./encodings/sparse", default-features = false } vortex-tui = { version = "0.1.0", path = "./vortex-tui", default-features = false } vortex-utils = { version = "0.1.0", path = "./vortex-utils", default-features = false } diff --git a/bench-vortex/src/clickbench/clickbench_data.rs b/bench-vortex/src/clickbench/clickbench_data.rs index 1c32ad18a6c..c0c53b6af88 100644 --- a/bench-vortex/src/clickbench/clickbench_data.rs +++ b/bench-vortex/src/clickbench/clickbench_data.rs @@ -26,14 +26,14 @@ use tokio::fs::{OpenOptions, create_dir_all}; use tracing::{Instrument, info, warn}; use url::Url; use vortex::error::VortexExpect; -use vortex::file::VortexWriteOptions; +use vortex::file::WriteOptionsSessionExt; use vortex_datafusion::VortexFormat; use crate::conversions::parquet_to_vortex; #[cfg(feature = "lance")] use crate::utils; use crate::utils::file_utils::{idempotent, idempotent_async}; -use crate::{CompactionStrategy, Format}; +use crate::{CompactionStrategy, Format, SESSION}; pub static HITS_SCHEMA: LazyLock = LazyLock::new(|| { use DataType::*; @@ -204,7 +204,7 @@ pub async fn convert_parquet_to_vortex( .open(&vtx_file) .await?; - let write_options = compaction.apply_options(VortexWriteOptions::default()); + let write_options = compaction.apply_options(SESSION.write_options()); write_options.write(&mut f, array_stream).await?; @@ -250,7 +250,7 @@ pub async fn register_vortex_files( glob_pattern: Option, ) -> anyhow::Result<()> { let vortex_path = input_path.join(&format!("{}/", Format::OnDiskVortex.name()))?; - let format = Arc::new(VortexFormat::default()); + let format = Arc::new(VortexFormat::new(SESSION.clone())); info!( "Registering table from {vortex_path} with glob {:?}", @@ -283,7 +283,7 @@ pub async fn register_vortex_compact_files( glob_pattern: Option, ) -> anyhow::Result<()> { let vortex_compact_path = input_path.join(&format!("{}/", Format::VortexCompact.name()))?; - let format = Arc::new(VortexFormat::default()); + let format = Arc::new(VortexFormat::new(SESSION.clone())); info!( "Registering vortex-compact table from {vortex_compact_path} with glob {:?}", diff --git a/bench-vortex/src/compress/vortex.rs b/bench-vortex/src/compress/vortex.rs index 9bdf3e0314a..f7ae3f19013 100644 --- a/bench-vortex/src/compress/vortex.rs +++ b/bench-vortex/src/compress/vortex.rs @@ -7,12 +7,15 @@ use std::sync::Arc; use bytes::Bytes; use futures::{StreamExt, pin_mut}; use vortex::Array; -use vortex::file::{VortexOpenOptions, VortexWriteOptions}; +use vortex::file::{OpenOptionsSessionExt, WriteOptionsSessionExt}; + +use crate::SESSION; #[inline(never)] pub async fn vortex_compress_write(array: &dyn Array, buf: &mut Vec) -> anyhow::Result { let mut cursor = Cursor::new(buf); - VortexWriteOptions::default() + SESSION + .write_options() .write(&mut cursor, array.to_array_stream()) .await?; Ok(cursor.position()) @@ -20,7 +23,7 @@ pub async fn vortex_compress_write(array: &dyn Array, buf: &mut Vec) -> anyh #[inline(never)] pub async fn vortex_decompress_read(buf: Bytes) -> anyhow::Result { - let scan = VortexOpenOptions::new().open_buffer(buf)?.scan()?; + let scan = SESSION.open_options().open_buffer(buf)?.scan()?; let schema = Arc::new(scan.dtype()?.to_arrow_schema()?); let stream = scan.into_record_batch_stream(schema)?; diff --git a/bench-vortex/src/datasets/file.rs b/bench-vortex/src/datasets/file.rs index 12b61b7d4a4..d2d3dd5cdbe 100644 --- a/bench-vortex/src/datasets/file.rs +++ b/bench-vortex/src/datasets/file.rs @@ -17,6 +17,7 @@ use vortex_datafusion::VortexFormat; #[cfg(feature = "lance")] use {crate::Format, lance::datafusion::LanceTableProvider, lance::dataset::Dataset}; +use crate::SESSION; use crate::datasets::BenchmarkDataset; pub async fn register_parquet_files( @@ -85,7 +86,7 @@ pub async fn register_vortex_files( &file_url, glob.as_ref().map(|g| g.as_str()).unwrap_or("") ); - let format = Arc::new(VortexFormat::default()); + let format = Arc::new(VortexFormat::new(SESSION.clone())); let table_url = ListingTableUrl::try_new(file_url.clone(), glob)?; let config = ListingTableConfig::new(table_url).with_listing_options( ListingOptions::new(format).with_session_config_options(session.state().config()), @@ -133,7 +134,7 @@ pub async fn register_vortex_compact_files( &file_url, glob.as_ref().map(|g| g.as_str()).unwrap_or("") ); - let format = Arc::new(VortexFormat::default()); + let format = Arc::new(VortexFormat::new(SESSION.clone())); let table_url = ListingTableUrl::try_new(file_url.clone(), glob)?; let config = ListingTableConfig::new(table_url).with_listing_options( ListingOptions::new(format).with_session_config_options(session.state().config()), diff --git a/bench-vortex/src/datasets/taxi_data.rs b/bench-vortex/src/datasets/taxi_data.rs index bcf41dc231c..2adc957a77d 100644 --- a/bench-vortex/src/datasets/taxi_data.rs +++ b/bench-vortex/src/datasets/taxi_data.rs @@ -8,7 +8,7 @@ use async_trait::async_trait; use tokio::fs::File as TokioFile; use tokio::io::AsyncWriteExt; use vortex::ArrayRef; -use vortex::file::{VortexOpenOptions, VortexWriteOptions}; +use vortex::file::{OpenOptionsSessionExt, WriteOptionsSessionExt}; use vortex::stream::ArrayStreamExt; #[cfg(feature = "lance")] use { @@ -21,7 +21,7 @@ use { use crate::conversions::parquet_to_vortex; use crate::datasets::Dataset; use crate::datasets::data_downloads::download_data; -use crate::{CompactionStrategy, IdempotentPath, idempotent_async}; +use crate::{CompactionStrategy, IdempotentPath, SESSION, idempotent_async}; pub struct TaxiData; @@ -45,7 +45,8 @@ pub async fn taxi_data_parquet() -> Result { pub async fn fetch_taxi_data() -> Result { let vortex_data = taxi_data_vortex().await?; - Ok(VortexOpenOptions::new() + Ok(SESSION + .open_options() .open(vortex_data) .await? .scan()? @@ -58,7 +59,8 @@ pub async fn taxi_data_vortex() -> Result { idempotent_async("taxi/taxi.vortex", |output_fname| async move { let buf = output_fname.to_path_buf(); let mut output_file = TokioFile::create(output_fname).await?; - VortexWriteOptions::default() + SESSION + .write_options() .write( &mut output_file, parquet_to_vortex(taxi_data_parquet().await?)?, @@ -76,8 +78,7 @@ pub async fn taxi_data_vortex_compact() -> Result { let mut output_file = TokioFile::create(output_fname).await?; // This is the only difference to `taxi_data_vortex`. - let write_options = - CompactionStrategy::Compact.apply_options(VortexWriteOptions::default()); + let write_options = CompactionStrategy::Compact.apply_options(SESSION.write_options()); write_options .write( diff --git a/bench-vortex/src/datasets/tpch_l_comment.rs b/bench-vortex/src/datasets/tpch_l_comment.rs index 5cb7c2472d8..5ec783d78c9 100644 --- a/bench-vortex/src/datasets/tpch_l_comment.rs +++ b/bench-vortex/src/datasets/tpch_l_comment.rs @@ -8,12 +8,12 @@ use glob::glob; use vortex::arrays::ChunkedArray; use vortex::dtype::Nullability::NonNullable; use vortex::expr::{col, pack}; -use vortex::file::VortexOpenOptions; +use vortex::file::OpenOptionsSessionExt; use vortex::{Array, ArrayRef, IntoArray, ToCanonical}; use crate::datasets::Dataset; use crate::tpch::tpchgen::{TpchGenOptions, generate_tpch_tables}; -use crate::{Format, IdempotentPath}; +use crate::{Format, IdempotentPath, SESSION}; pub struct TPCHLCommentChunked; @@ -44,7 +44,7 @@ impl Dataset for TPCHLCommentChunked { .to_string_lossy() .as_ref(), )? { - let file = VortexOpenOptions::new().open(path?).await?; + let file = SESSION.open_options().open(path?).await?; let file_chunks: Vec<_> = file .scan()? .with_projection(pack(vec![("l_comment", col("l_comment"))], NonNullable)) diff --git a/bench-vortex/src/downloadable_dataset.rs b/bench-vortex/src/downloadable_dataset.rs index 1c58062ce3f..bea396ea926 100644 --- a/bench-vortex/src/downloadable_dataset.rs +++ b/bench-vortex/src/downloadable_dataset.rs @@ -4,13 +4,13 @@ use async_trait::async_trait; use tokio::fs::File; use vortex::ArrayRef; -use vortex::file::{VortexOpenOptions, VortexWriteOptions}; +use vortex::file::{OpenOptionsSessionExt, WriteOptionsSessionExt}; use vortex::stream::ArrayStreamExt; use crate::conversions::parquet_to_vortex; use crate::datasets::Dataset; use crate::datasets::data_downloads::download_data; -use crate::{IdempotentPath, idempotent_async}; +use crate::{IdempotentPath, SESSION, idempotent_async}; /// Datasets which can be downloaded over HTTP in Parquet format. /// @@ -57,7 +57,8 @@ impl Dataset for DownloadableDataset { let vortex = dir.join(format!("{}.vortex", self.name())); download_data(parquet.clone(), self.parquet_url()).await?; idempotent_async(&vortex, async |path| -> anyhow::Result<()> { - VortexWriteOptions::default() + SESSION + .write_options() .write( &mut File::create(path) .await @@ -70,7 +71,8 @@ impl Dataset for DownloadableDataset { }) .await?; - Ok(VortexOpenOptions::new() + Ok(SESSION + .open_options() .open(vortex.as_path()) .await? .scan()? diff --git a/bench-vortex/src/fineweb/mod.rs b/bench-vortex/src/fineweb/mod.rs index 520b7a2584f..ec8b58975dd 100644 --- a/bench-vortex/src/fineweb/mod.rs +++ b/bench-vortex/src/fineweb/mod.rs @@ -14,13 +14,13 @@ use log::info; use parquet::arrow::async_writer::AsyncFileWriter; use url::Url; use vortex::compressor::CompactCompressor; -use vortex::file::{VortexWriteOptions, WriteStrategyBuilder}; +use vortex::file::{WriteOptionsSessionExt, WriteStrategyBuilder}; use vortex_datafusion::VortexFormat; use crate::benchmark_trait::Benchmark; use crate::conversions::parquet_to_vortex; use crate::engines::EngineCtx; -use crate::{BenchmarkDataset, Format, Target, idempotent_async}; +use crate::{BenchmarkDataset, Format, SESSION, Target, idempotent_async}; /// URL to the sample file const SAMPLE_URL: &str = "https://huggingface.co/datasets/HuggingFaceFW/fineweb/resolve/v1.4.0/sample/10BT/001_00000.parquet"; @@ -146,7 +146,8 @@ impl Benchmark for Fineweb { info!("Converting FineWeb to Vortex with default compressor"); let array_stream = parquet_to_vortex(parquet)?; let w = tokio::fs::File::create(vortex_path).await?; - VortexWriteOptions::default() + SESSION + .write_options() .write(w, array_stream) .await .map_err(|e| anyhow::anyhow!("Failed to write to VortexWriter: {e}")) @@ -158,7 +159,8 @@ impl Benchmark for Fineweb { info!("Converting FineWeb to Vortex with Compact compressor"); let array_stream = parquet_to_vortex(parquet)?; let w = tokio::fs::File::create(vortex_path).await?; - VortexWriteOptions::default() + SESSION + .write_options() .with_strategy( WriteStrategyBuilder::new() .with_compressor(CompactCompressor::default()) @@ -221,7 +223,9 @@ pub async fn register_table( .with_listing_options( ListingOptions::new(match format { Format::Parquet => Arc::from(ParquetFormat::new()), - Format::OnDiskVortex | Format::VortexCompact => Arc::from(VortexFormat::default()), + Format::OnDiskVortex | Format::VortexCompact => { + Arc::from(VortexFormat::new(SESSION.clone())) + } _ => anyhow::bail!("unsupported format for `fineweb` bench: {}", format), }) .with_session_config_options(session.state().config()), diff --git a/bench-vortex/src/lib.rs b/bench-vortex/src/lib.rs index 1071714c508..8cee6ae9ba9 100644 --- a/bench-vortex/src/lib.rs +++ b/bench-vortex/src/lib.rs @@ -7,6 +7,7 @@ use std::clone::Clone; use std::fmt::Display; use std::str::FromStr; +use std::sync::LazyLock; use clap::ValueEnum; use itertools::Itertools; @@ -42,12 +43,18 @@ pub mod utils; pub use datasets::{BenchmarkDataset, file}; pub use engines::df; +use vortex::VortexSessionDefault; pub use vortex::error::vortex_panic; +use vortex::io::session::RuntimeSessionExt; +use vortex::session::VortexSession; // All benchmarks run with mimalloc for consistency. #[global_allocator] static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc; +pub static SESSION: LazyLock = + LazyLock::new(|| VortexSession::default().with_tokio()); + #[derive(Clone, Copy, Debug, Hash, PartialEq, Eq, Serialize)] pub struct Target { engine: Engine, diff --git a/bench-vortex/src/public_bi.rs b/bench-vortex/src/public_bi.rs index 2112bcb2c10..41759fe9f51 100644 --- a/bench-vortex/src/public_bi.rs +++ b/bench-vortex/src/public_bi.rs @@ -31,7 +31,7 @@ use tracing::info; use url::Url; use vortex::ArrayRef; use vortex::error::{VortexResult, vortex_err}; -use vortex::file::{VortexOpenOptions, VortexWriteOptions}; +use vortex::file::{OpenOptionsSessionExt, WriteOptionsSessionExt}; use vortex::stream::ArrayStreamExt; use vortex::utils::aliases::hash_map::HashMap; use vortex_datafusion::VortexFormat; @@ -39,7 +39,7 @@ use vortex_datafusion::VortexFormat; use crate::conversions::parquet_to_vortex; use crate::datasets::Dataset; use crate::datasets::data_downloads::{decompress_bz2, download_data}; -use crate::{IdempotentPath, idempotent_async, vortex_panic}; +use crate::{IdempotentPath, SESSION, idempotent_async, vortex_panic}; pub static PBI_DATASETS: LazyLock = LazyLock::new(|| { PBIDatasets::try_new(fetch_schemas_and_queries().expect("failed to fetch public bi queries")) @@ -346,7 +346,8 @@ impl PBIData { async move { let vortex_file = idempotent_async(&vortex, async |output_path| -> anyhow::Result<()> { - VortexWriteOptions::default() + SESSION + .write_options() .write( &mut File::create(output_path) .await @@ -398,7 +399,7 @@ impl PBIData { .with_delimiter(b'|'), ), FileType::Parquet => Arc::new(ParquetFormat::default()), - FileType::Vortex => Arc::new(VortexFormat::default()), + FileType::Vortex => Arc::new(VortexFormat::new(SESSION.clone())), _ => vortex_panic!("unsupported file type: {file_type}"), }; @@ -439,7 +440,8 @@ impl Dataset for PBIBenchmark { .ok_or_else(|| anyhow!("must have at least one table"))? .clone(); - Ok(VortexOpenOptions::new() + Ok(SESSION + .open_options() .open(path.as_path()) .await? .scan()? diff --git a/bench-vortex/src/random_access/take.rs b/bench-vortex/src/random_access/take.rs index d4745e0a07d..c3f016afba5 100644 --- a/bench-vortex/src/random_access/take.rs +++ b/bench-vortex/src/random_access/take.rs @@ -18,11 +18,13 @@ use parquet::arrow::async_reader::AsyncFileReader; use parquet::file::metadata::RowGroupMetaData; use stream::StreamExt; use vortex::buffer::Buffer; -use vortex::file::VortexOpenOptions; +use vortex::file::OpenOptionsSessionExt; use vortex::stream::ArrayStreamExt; use vortex::utils::aliases::hash_map::HashMap; use vortex::{Array, ArrayRef, IntoArray}; +use crate::SESSION; + pub async fn take_vortex_tokio( path: &Path, indices: Buffer, @@ -34,7 +36,8 @@ pub async fn take_vortex_tokio( } async fn take_vortex(reader: impl AsRef, indices: Buffer) -> anyhow::Result { - Ok(VortexOpenOptions::new() + Ok(SESSION + .open_options() .open(reader.as_ref()) .await? .scan()? diff --git a/bench-vortex/src/realnest/gharchive.rs b/bench-vortex/src/realnest/gharchive.rs index 4dc11966bb1..ad6f9233f63 100644 --- a/bench-vortex/src/realnest/gharchive.rs +++ b/bench-vortex/src/realnest/gharchive.rs @@ -19,13 +19,13 @@ use log::info; use parquet::arrow::async_writer::AsyncFileWriter; use url::Url; use vortex::compressor::CompactCompressor; -use vortex::file::{VortexWriteOptions, WriteStrategyBuilder}; +use vortex::file::{WriteOptionsSessionExt, WriteStrategyBuilder}; use vortex_datafusion::VortexFormat; use crate::benchmark_trait::Benchmark; use crate::conversions::parquet_to_vortex; use crate::engines::EngineCtx; -use crate::{BenchmarkDataset, Format, Target, idempotent, idempotent_async}; +use crate::{BenchmarkDataset, Format, SESSION, Target, idempotent, idempotent_async}; /// Template URL for raw JSON dataset fn raw_json_url(hour: usize) -> String { @@ -171,7 +171,8 @@ impl Benchmark for GithubArchive { info!("Converting Parquet to Vortex with default compressor"); let array_stream = parquet_to_vortex(parquet)?; let w = tokio::fs::File::create(vortex_path).await?; - VortexWriteOptions::default() + SESSION + .write_options() .write(w, array_stream) .await .map_err(|e| anyhow::anyhow!("Failed to write to VortexWriter: {e}")) @@ -183,7 +184,8 @@ impl Benchmark for GithubArchive { info!("Converting FineWeb to Vortex with Compact compressor"); let array_stream = parquet_to_vortex(parquet)?; let w = tokio::fs::File::create(vortex_path).await?; - VortexWriteOptions::default() + SESSION + .write_options() .with_strategy( WriteStrategyBuilder::new() .with_compressor(CompactCompressor::default()) @@ -248,7 +250,9 @@ pub async fn register_table( .with_listing_options( ListingOptions::new(match format { Format::Parquet => Arc::from(ParquetFormat::new()), - Format::OnDiskVortex | Format::VortexCompact => Arc::from(VortexFormat::default()), + Format::OnDiskVortex | Format::VortexCompact => { + Arc::from(VortexFormat::new(SESSION.clone())) + } _ => anyhow::bail!("unsupported format for `gharchive` bench: {}", format), }) .with_session_config_options(session.state().config()), diff --git a/bench-vortex/src/statpopgen/download_vcf.rs b/bench-vortex/src/statpopgen/download_vcf.rs index 84aba2b5909..32ddf14fe90 100644 --- a/bench-vortex/src/statpopgen/download_vcf.rs +++ b/bench-vortex/src/statpopgen/download_vcf.rs @@ -19,13 +19,13 @@ use vortex::compressor::CompactCompressor; use vortex::dtype::DType; use vortex::dtype::arrow::FromArrowType; use vortex::error::VortexError; -use vortex::file::{VortexWriteOptions, WriteStrategyBuilder}; +use vortex::file::{WriteOptionsSessionExt, WriteStrategyBuilder}; use vortex::stream::ArrayStreamAdapter; use super::StatPopGenBenchmark; use crate::statpopgen::builder::GnomADBuilder; use crate::statpopgen::schema::schema_from_vcf_header; -use crate::{Format, idempotent_async}; +use crate::{Format, SESSION, idempotent_async}; // DuckDB parallelizes parquet at row-group granularity. Each of our rows are quite big (~4000 // genotypes each with tens of bytes of data). @@ -175,7 +175,8 @@ impl StatPopGenBenchmark { .wrap_stream(vortex_stream) .boxed(); - VortexWriteOptions::default() + SESSION + .write_options() .with_strategy(strategy.build()) .write( &mut File::create(output_path).await?, diff --git a/bench-vortex/src/statpopgen/statpopgen_benchmark.rs b/bench-vortex/src/statpopgen/statpopgen_benchmark.rs index c4aca22e02f..a9ba8c9c452 100644 --- a/bench-vortex/src/statpopgen/statpopgen_benchmark.rs +++ b/bench-vortex/src/statpopgen/statpopgen_benchmark.rs @@ -17,7 +17,7 @@ use vortex_datafusion::VortexFormat; use crate::benchmark_trait::Benchmark; use crate::engines::EngineCtx; -use crate::{BenchmarkDataset, Format, Target}; +use crate::{BenchmarkDataset, Format, SESSION, Target}; /// Statistical population genetics benchmark implementation. /// @@ -157,8 +157,8 @@ pub async fn register_table( )); } Format::Parquet => Arc::from(ParquetFormat::new()), - Format::OnDiskVortex => Arc::from(VortexFormat::default()), - Format::VortexCompact => Arc::from(VortexFormat::default()), + Format::OnDiskVortex => Arc::from(VortexFormat::new(SESSION.clone())), + Format::VortexCompact => Arc::from(VortexFormat::new(SESSION.clone())), Format::OnDiskDuckDB => { return Err(anyhow::anyhow!( "DuckDB format should not be registered through DataFusion" diff --git a/bench-vortex/src/tpch/tpchgen.rs b/bench-vortex/src/tpch/tpchgen.rs index e3d1dd600ef..7ae95824be8 100644 --- a/bench-vortex/src/tpch/tpchgen.rs +++ b/bench-vortex/src/tpch/tpchgen.rs @@ -27,11 +27,11 @@ use vortex::arrow::FromArrowArray; use vortex::dtype::DType; use vortex::dtype::arrow::FromArrowType; use vortex::error::VortexExpect; -use vortex::file::VortexWriteOptions; +use vortex::file::WriteOptionsSessionExt; use vortex::stream::ArrayStreamAdapter; use crate::utils::file_utils::idempotent_async; -use crate::{CompactionStrategy, Format, IdempotentPath}; +use crate::{CompactionStrategy, Format, IdempotentPath, SESSION}; type TableFuture<'a> = Pin> + Send + 'a>>; @@ -388,7 +388,7 @@ impl VortexWriter { let mut file = TokioFile::create(&file_path).await?; compaction_strategy - .apply_options(VortexWriteOptions::default()) + .apply_options(SESSION.write_options()) .write(&mut file, stream) .await .map_err(|e| anyhow!("Vortex write failed: {}", e))?; diff --git a/docs/api/python/arrays.rst b/docs/api/python/arrays.rst index a09da9b7e83..da6c6a7a449 100644 --- a/docs/api/python/arrays.rst +++ b/docs/api/python/arrays.rst @@ -139,8 +139,7 @@ Registry and Serde .. autodata:: vortex.registry -.. autoclass:: vortex.Registry - :members: +.. autofunction:: vortex.registry.register .. autoclass:: vortex.ArrayContext :members: diff --git a/encodings/pco/src/test.rs b/encodings/pco/src/test.rs index 6d778d589be..7e7a93c1712 100644 --- a/encodings/pco/src/test.rs +++ b/encodings/pco/src/test.rs @@ -8,7 +8,7 @@ use vortex_array::serde::{ArrayParts, SerializeOptions}; use vortex_array::validity::Validity; use vortex_array::vtable::ValidityHelper; use vortex_array::{ - ArrayContext, ArrayRegistry, EncodingRef, IntoArray, ToCanonical, assert_arrays_eq, + ArrayContext, ArraySession, EncodingRef, IntoArray, ToCanonical, assert_arrays_eq, }; use vortex_buffer::{Buffer, BufferMut}; use vortex_dtype::{DType, Nullability, PType}; @@ -135,12 +135,16 @@ fn test_serde() { let pco = PcoArray::from_primitive(&PrimitiveArray::new(data, Validity::NonNullable), 3, 100) .unwrap() .to_array(); - let context = ArrayContext::empty().with_many( - ArrayRegistry::canonical_only() - .vtables() - .cloned() - .chain([EncodingRef::new_ref(PcoEncoding.as_ref())]), + + let session = ArraySession::default(); + let context = ArrayContext::new( + session + .registry() + .items() + .chain([EncodingRef::new_ref(PcoEncoding.as_ref())]) + .collect(), ); + let bytes = pco .serialize( &context, diff --git a/encodings/sequence/src/serde.rs b/encodings/sequence/src/serde.rs index eea70345188..6ac222bf773 100644 --- a/encodings/sequence/src/serde.rs +++ b/encodings/sequence/src/serde.rs @@ -86,49 +86,3 @@ impl SerdeVTable for SequenceVTable { )) } } - -#[cfg(test)] -mod tests { - use std::sync::Arc; - - use vortex_array::ToCanonical; - use vortex_array::arrays::{PrimitiveArray, StructArray}; - use vortex_array::stream::ArrayStreamExt; - use vortex_dtype::Nullability; - use vortex_expr::{get_item, root}; - use vortex_file::{VortexOpenOptions, VortexWriteOptions}; - use vortex_layout::layouts::flat::writer::FlatLayoutStrategy; - - use crate::SequenceArray; - - #[tokio::test] - async fn round_trip_seq() { - let seq = SequenceArray::typed_new(2i8, 3, Nullability::NonNullable, 4).unwrap(); - let st = StructArray::from_fields(&[("a", seq.to_array())]).unwrap(); - - let mut file = tokio::fs::File::create("/tmp/abc.vx").await.unwrap(); - VortexWriteOptions::default() - .with_strategy(Arc::new(FlatLayoutStrategy::default())) - .write(&mut file, st.to_array_stream()) - .await - .unwrap(); - - let file = VortexOpenOptions::new().open("/tmp/abc.vx").await.unwrap(); - let array = file - .scan() - .unwrap() - .with_projection(get_item("a", root())) - .into_array_stream() - .unwrap() - .read_all() - .await - .unwrap(); - - let canon = PrimitiveArray::from_iter((0..4).map(|i| 2i8 + i * 3)); - - assert_eq!( - array.to_primitive().as_slice::(), - canon.as_slice::() - ) - } -} diff --git a/fuzz/Cargo.toml b/fuzz/Cargo.toml index ea3e43e9b2d..ee687a21c11 100644 --- a/fuzz/Cargo.toml +++ b/fuzz/Cargo.toml @@ -21,6 +21,7 @@ cargo-fuzz = true itertools = { workspace = true } libfuzzer-sys = { workspace = true } strum = { workspace = true, features = ["derive"] } +vortex = { workspace = true } vortex-array = { workspace = true, features = ["arbitrary", "test-harness"] } vortex-btrblocks = { workspace = true } vortex-buffer = { workspace = true } @@ -32,6 +33,7 @@ vortex-io = { workspace = true } vortex-layout = { workspace = true, features = ["zstd"] } vortex-mask = { workspace = true } vortex-scalar = { workspace = true, features = ["arbitrary"] } +vortex-session = { workspace = true } vortex-utils = { workspace = true } [lints] diff --git a/fuzz/fuzz_targets/file_io.rs b/fuzz/fuzz_targets/file_io.rs index 1b57f6a7f21..6301b821e69 100644 --- a/fuzz/fuzz_targets/file_io.rs +++ b/fuzz/fuzz_targets/file_io.rs @@ -13,9 +13,8 @@ use vortex_buffer::ByteBufferMut; use vortex_dtype::{DType, StructFields}; use vortex_error::{VortexExpect, VortexUnwrap, vortex_panic}; use vortex_expr::{Scope, lit, root}; -use vortex_file::{VortexOpenOptions, VortexWriteOptions, WriteStrategyBuilder}; -use vortex_fuzz::{CompressorStrategy, FuzzFileAction}; -use vortex_io::runtime::single::SingleThreadRuntime; +use vortex_file::{OpenOptionsSessionExt, WriteOptionsSessionExt, WriteStrategyBuilder}; +use vortex_fuzz::{CompressorStrategy, FuzzFileAction, RUNTIME, SESSION}; use vortex_layout::layouts::compact::CompactCompressor; use vortex_utils::aliases::DefaultHashBuilder; use vortex_utils::aliases::hash_set::HashSet; @@ -49,29 +48,30 @@ fuzz_target!(|fuzz: FuzzFileAction| -> Corpus { }; let write_options = match compressor_strategy { - CompressorStrategy::Default => VortexWriteOptions::default(), + CompressorStrategy::Default => SESSION.write_options(), CompressorStrategy::Compact => { let strategy = WriteStrategyBuilder::new() .with_compressor(CompactCompressor::default()) .build(); - VortexWriteOptions::default().with_strategy(strategy) + SESSION.write_options().with_strategy(strategy) } }; let mut full_buff = ByteBufferMut::empty(); let _footer = write_options - .blocking::() + .blocking(&*RUNTIME) .write(&mut full_buff, array_data.to_array_iterator()) .vortex_unwrap(); - let mut output = VortexOpenOptions::new() + let mut output = SESSION + .open_options() .open_buffer(full_buff) .vortex_unwrap() .scan() .vortex_unwrap() .with_projection(projection_expr.unwrap_or_else(|| root())) .with_some_filter(filter_expr) - .into_array_iter(&SingleThreadRuntime::default()) + .into_array_iter(&*RUNTIME) .vortex_unwrap() .try_collect::<_, Vec<_>, _>() .vortex_unwrap(); diff --git a/fuzz/src/lib.rs b/fuzz/src/lib.rs index 4d6b69a66c2..c73662af6b0 100644 --- a/fuzz/src/lib.rs +++ b/fuzz/src/lib.rs @@ -8,5 +8,16 @@ mod array; pub mod error; mod file; +use std::sync::LazyLock; + pub use array::{Action, CompressorStrategy, ExpectedValue, FuzzArrayAction, sort_canonical_array}; pub use file::FuzzFileAction; +use vortex::VortexSessionDefault; +use vortex_io::runtime::BlockingRuntime; +use vortex_io::runtime::current::CurrentThreadRuntime; +use vortex_io::session::RuntimeSessionExt; +use vortex_session::VortexSession; + +pub static RUNTIME: LazyLock = LazyLock::new(CurrentThreadRuntime::new); +pub static SESSION: LazyLock = + LazyLock::new(|| VortexSession::default().with_handle(RUNTIME.handle())); diff --git a/java/testfiles/Cargo.lock b/java/testfiles/Cargo.lock index f2f69697e94..75b74acd3e4 100644 --- a/java/testfiles/Cargo.lock +++ b/java/testfiles/Cargo.lock @@ -589,6 +589,26 @@ dependencies = [ "syn 2.0.106", ] +[[package]] +name = "enum-map" +version = "2.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6866f3bfdf8207509a033af1a75a7b08abda06bbaaeae6669323fd5a097df2e9" +dependencies = [ + "enum-map-derive", +] + +[[package]] +name = "enum-map-derive" +version = "0.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f282cfdfe92516eb26c2af8589c274c7c17681f5ecc03c18255fe741c6aa64eb" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.106", +] + [[package]] name = "equivalent" version = "1.0.2" @@ -2100,6 +2120,7 @@ dependencies = [ "vortex-scalar", "vortex-scan", "vortex-sequence", + "vortex-session", "vortex-sparse", "vortex-utils", "vortex-zigzag", @@ -2141,6 +2162,7 @@ dependencies = [ "bitvec", "cfg-if", "enum-iterator", + "enum-map", "flatbuffers", "futures", "getrandom 0.3.3", @@ -2161,13 +2183,17 @@ dependencies = [ "static_assertions", "termtree", "vortex-buffer", + "vortex-compute", "vortex-dtype", "vortex-error", "vortex-flatbuffers", + "vortex-io", "vortex-mask", "vortex-metrics", "vortex-scalar", + "vortex-session", "vortex-utils", + "vortex-vector", ] [[package]] @@ -2225,6 +2251,21 @@ dependencies = [ "vortex-scalar", ] +[[package]] +name = "vortex-compute" +version = "0.1.0" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-schema", + "num-traits", + "vortex-buffer", + "vortex-dtype", + "vortex-error", + "vortex-mask", + "vortex-vector", +] + [[package]] name = "vortex-datetime-parts" version = "0.1.0" @@ -2275,6 +2316,7 @@ dependencies = [ name = "vortex-dtype" version = "0.1.0" dependencies = [ + "arrow-buffer", "arrow-schema", "flatbuffers", "half", @@ -2324,6 +2366,7 @@ dependencies = [ "vortex-mask", "vortex-proto", "vortex-scalar", + "vortex-session", "vortex-utils", ] @@ -2383,6 +2426,7 @@ dependencies = [ "vortex-scalar", "vortex-scan", "vortex-sequence", + "vortex-session", "vortex-sparse", "vortex-utils", "vortex-zigzag", @@ -2417,6 +2461,7 @@ name = "vortex-io" version = "0.1.0" dependencies = [ "async-compat", + "async-fs", "async-stream", "async-trait", "bytes", @@ -2435,6 +2480,7 @@ dependencies = [ "vortex-buffer", "vortex-error", "vortex-metrics", + "vortex-session", "wasm-bindgen-futures", ] @@ -2493,6 +2539,7 @@ dependencies = [ "vortex-pco", "vortex-scalar", "vortex-sequence", + "vortex-session", "vortex-utils", "vortex-zstd", ] @@ -2512,6 +2559,7 @@ version = "0.1.0" dependencies = [ "getrandom 0.3.3", "parking_lot", + "vortex-session", "witchcraft-metrics", ] @@ -2519,6 +2567,7 @@ dependencies = [ name = "vortex-pco" version = "0.1.0" dependencies = [ + "itertools", "pco", "prost", "vortex-array", @@ -2559,7 +2608,6 @@ name = "vortex-scalar" version = "0.1.0" dependencies = [ "arrow-array", - "arrow-buffer", "bytes", "itertools", "num-traits", @@ -2593,6 +2641,7 @@ dependencies = [ "vortex-layout", "vortex-mask", "vortex-metrics", + "vortex-session", ] [[package]] @@ -2609,6 +2658,16 @@ dependencies = [ "vortex-mask", "vortex-proto", "vortex-scalar", + "vortex-vector", +] + +[[package]] +name = "vortex-session" +version = "0.1.0" +dependencies = [ + "dashmap", + "vortex-error", + "vortex-utils", ] [[package]] @@ -2634,10 +2693,22 @@ dependencies = [ "hashbrown 0.16.0", ] +[[package]] +name = "vortex-vector" +version = "0.1.0" +dependencies = [ + "static_assertions", + "vortex-buffer", + "vortex-dtype", + "vortex-error", + "vortex-mask", +] + [[package]] name = "vortex-zigzag" version = "0.1.0" dependencies = [ + "itertools", "vortex-array", "vortex-buffer", "vortex-dtype", diff --git a/java/testfiles/src/main.rs b/java/testfiles/src/main.rs index 59bf884aeb7..e40e09ba775 100644 --- a/java/testfiles/src/main.rs +++ b/java/testfiles/src/main.rs @@ -8,9 +8,13 @@ use std::path::Path; use vortex::arrays::StructArray; use vortex::builders::{ArrayBuilder, DecimalBuilder, VarBinViewBuilder}; use vortex::dtype::{DType, DecimalDType, Nullability}; -use vortex::file::VortexWriteOptions; -use vortex::io::runtime::single::SingleThreadRuntime; +use vortex::file::WriteOptionsSessionExt; +use vortex::io::runtime::current::CurrentThreadRuntime; +use vortex::io::runtime::BlockingRuntime; +use vortex::io::session::RuntimeSessionExt; +use vortex::session::VortexSession; use vortex::validity::Validity; +use vortex::VortexSessionDefault; /// Generate a test dataset with the following small set of rows: /// @@ -27,6 +31,9 @@ use vortex::validity::Validity; /// | Ida | 9000 | TX | /// | John | 10000 | VA | fn main() { + let runtime = CurrentThreadRuntime::new(); + let session = VortexSession::default().with_handle(runtime.handle()); + let mut names = VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::NonNullable), 10); names.append_value("Alice"); names.append_value("Bob"); @@ -73,8 +80,9 @@ fn main() { let minimal_path = Path::new(env!("CARGO_MANIFEST_DIR")) .join("../vortex-jni/src/test/resources/minimal.vortex"); let mut file = std::fs::File::create(&minimal_path).expect("opening Vortex file"); - VortexWriteOptions::default() - .blocking::() + session + .write_options() + .blocking(&runtime) .write(&mut file, rows.to_array_iterator()) .expect("writing Vortex file"); diff --git a/vortex-array/Cargo.toml b/vortex-array/Cargo.toml index 8d15e54aa7d..a20dc006493 100644 --- a/vortex-array/Cargo.toml +++ b/vortex-array/Cargo.toml @@ -69,6 +69,7 @@ vortex-io = { workspace = true } vortex-mask = { workspace = true } vortex-metrics = { workspace = true } vortex-scalar = { workspace = true } +vortex-session = { workspace = true } vortex-utils = { workspace = true } vortex-vector = { workspace = true } diff --git a/vortex-array/src/canonical.rs b/vortex-array/src/canonical.rs index 479bc84b16c..fd2b6beedc7 100644 --- a/vortex-array/src/canonical.rs +++ b/vortex-array/src/canonical.rs @@ -72,9 +72,7 @@ use crate::{Array, ArrayRef, IntoArray}; /// # For Developers /// /// If you add another variant to this enum, make sure to update [`Array::is_canonical`], -/// [`ArrayRegistry::canonical_only`], and the fuzzer in `fuzz/fuzz_targets/array_ops.rs`. -/// -/// [`ArrayRegistry::canonical_only`]: crate::ArrayRegistry::canonical_only +/// and the fuzzer in `fuzz/fuzz_targets/array_ops.rs`. #[derive(Debug, Clone)] pub enum Canonical { Null(NullArray), diff --git a/vortex-array/src/context.rs b/vortex-array/src/context.rs index 930bfda658a..a86070da677 100644 --- a/vortex-array/src/context.rs +++ b/vortex-array/src/context.rs @@ -6,50 +6,12 @@ use std::sync::Arc; use itertools::Itertools; use parking_lot::RwLock; -use vortex_error::{VortexExpect, VortexResult, vortex_err}; -use vortex_utils::aliases::hash_map::HashMap; +use vortex_error::{VortexExpect, VortexResult, vortex_bail, vortex_err}; +use vortex_session::registry::Registry; use crate::EncodingRef; -use crate::arrays::{ - BoolEncoding, ChunkedEncoding, ConstantEncoding, DecimalEncoding, ExtensionEncoding, - FixedSizeListEncoding, ListEncoding, ListViewEncoding, MaskedEncoding, NullEncoding, - PrimitiveEncoding, StructEncoding, VarBinEncoding, VarBinViewEncoding, -}; -/// A collection of array encodings. -// TODO(ngates): it feels weird that this has interior mutability. I think maybe it shouldn't. pub type ArrayContext = VTableContext; -pub type ArrayRegistry = VTableRegistry; - -impl ArrayRegistry { - pub fn canonical_only() -> Self { - let mut this = Self::empty(); - - // Register the canonical encodings. - this.register_many([ - EncodingRef::new_ref(NullEncoding.as_ref()), - EncodingRef::new_ref(BoolEncoding.as_ref()), - EncodingRef::new_ref(PrimitiveEncoding.as_ref()), - EncodingRef::new_ref(DecimalEncoding.as_ref()), - EncodingRef::new_ref(VarBinViewEncoding.as_ref()), - EncodingRef::new_ref(ListViewEncoding.as_ref()), - EncodingRef::new_ref(FixedSizeListEncoding.as_ref()), - EncodingRef::new_ref(StructEncoding.as_ref()), - EncodingRef::new_ref(ExtensionEncoding.as_ref()), - ]); - - // Register the utility encodings. - this.register_many([ - EncodingRef::new_ref(ChunkedEncoding.as_ref()), - EncodingRef::new_ref(ConstantEncoding.as_ref()), - EncodingRef::new_ref(MaskedEncoding.as_ref()), - EncodingRef::new_ref(ListEncoding.as_ref()), - EncodingRef::new_ref(VarBinEncoding.as_ref()), - ]); - - this - } -} /// A collection of encodings that can be addressed by a u16 positional index. /// This is used to map array encodings and layout encodings when reading from a file. @@ -57,6 +19,34 @@ impl ArrayRegistry { pub struct VTableContext(Arc>>); impl VTableContext { + pub fn new(encodings: Vec) -> Self { + Self(Arc::new(RwLock::new(encodings))) + } + + pub fn try_from_registry<'a>( + registry: &Registry, + ids: impl IntoIterator, + ) -> VortexResult + where + T: Display, + { + let items: Vec = ids + .into_iter() + .map(|id| { + registry + .find(id) + .ok_or_else(|| vortex_err!("Registry missing encoding with id {}", id)) + }) + .try_collect()?; + if items.len() > u16::MAX as usize { + vortex_bail!( + "Cannot create VTableContext: registry has more than u16::MAX ({}) items", + u16::MAX + ); + } + Ok(Self::new(items)) + } + pub fn empty() -> Self { Self(Arc::new(RwLock::new(Vec::new()))) } @@ -98,58 +88,3 @@ impl VTableContext { self.0.read().get(idx as usize).cloned() } } - -/// A registry of encodings that can be used to construct a context for serde. -/// -/// In the future, we will support loading encodings from shared libraries or even from within -/// the Vortex file itself. This registry will be used to manage the available encodings. -#[derive(Clone, Debug)] -pub struct VTableRegistry(HashMap); - -// TODO(ngates): define a trait for `T` that requires an `id` method returning a `Arc` and -// auto-implement `Display` and `Eq` for it. -impl VTableRegistry { - pub fn empty() -> Self { - Self(Default::default()) - } - - /// Create a new [`VTableContext`] with the provided encodings. - pub fn new_context<'a>( - &self, - encoding_ids: impl Iterator, - ) -> VortexResult> { - let mut ctx = VTableContext::::empty(); - for id in encoding_ids { - let encoding = self.0.get(id).ok_or_else(|| { - vortex_err!( - "Array encoding {} not found in registry {}", - id, - self.0.values().join(", ") - ) - })?; - ctx = ctx.with(encoding.clone()); - } - Ok(ctx) - } - - /// List the vtables in the registry. - pub fn vtables(&self) -> impl Iterator + '_ { - self.0.values() - } - - /// Find the encoding with the given ID. - pub fn get(&self, id: &str) -> Option<&T> { - self.0.get(id) - } - - /// Register a new encoding, replacing any existing encoding with the same ID. - pub fn register(&mut self, encoding: T) { - self.0.insert(encoding.to_string(), encoding); - } - - /// Register a new encoding, replacing any existing encoding with the same ID. - pub fn register_many>(&mut self, encodings: I) { - self.0 - .extend(encodings.into_iter().map(|e| (e.to_string(), e))); - } -} diff --git a/vortex-array/src/lib.rs b/vortex-array/src/lib.rs index 41c7c73bdbb..e296328689b 100644 --- a/vortex-array/src/lib.rs +++ b/vortex-array/src/lib.rs @@ -19,6 +19,14 @@ pub use encoding::*; pub use hash::*; pub use mask_future::*; pub use metadata::*; +use vortex_session::registry::Registry; +use vortex_session::{Ref, SessionExt}; + +use crate::arrays::{ + BoolEncoding, ChunkedEncoding, ConstantEncoding, DecimalEncoding, ExtensionEncoding, + FixedSizeListEncoding, ListEncoding, ListViewEncoding, MaskedEncoding, NullEncoding, + PrimitiveEncoding, StructEncoding, VarBinEncoding, VarBinViewEncoding, +}; pub mod accessor; #[doc(hidden)] @@ -55,3 +63,68 @@ pub mod flatbuffers { //! Re-exported autogenerated code from the core Vortex flatbuffer definitions. pub use vortex_flatbuffers::array::*; } + +pub type ArrayRegistry = Registry; + +#[derive(Debug)] +pub struct ArraySession { + /// The set of registered array encodings. + registry: ArrayRegistry, +} + +impl ArraySession { + pub fn registry(&self) -> &ArrayRegistry { + &self.registry + } + + /// Register a new array encoding, replacing any existing encoding with the same ID. + pub fn register(&self, encoding: EncodingRef) { + self.registry.register(encoding) + } + + /// Register many array encodings, replacing any existing encodings with the same ID. + pub fn register_many(&self, encodings: impl IntoIterator) { + self.registry.register_many(encodings); + } +} + +impl Default for ArraySession { + fn default() -> Self { + let encodings = ArrayRegistry::default(); + + // Register the canonical encodings. + encodings.register_many([ + EncodingRef::new_ref(NullEncoding.as_ref()), + EncodingRef::new_ref(BoolEncoding.as_ref()), + EncodingRef::new_ref(PrimitiveEncoding.as_ref()), + EncodingRef::new_ref(DecimalEncoding.as_ref()), + EncodingRef::new_ref(VarBinViewEncoding.as_ref()), + EncodingRef::new_ref(ListViewEncoding.as_ref()), + EncodingRef::new_ref(FixedSizeListEncoding.as_ref()), + EncodingRef::new_ref(StructEncoding.as_ref()), + EncodingRef::new_ref(ExtensionEncoding.as_ref()), + ]); + + // Register the utility encodings. + encodings.register_many([ + EncodingRef::new_ref(ChunkedEncoding.as_ref()), + EncodingRef::new_ref(ConstantEncoding.as_ref()), + EncodingRef::new_ref(MaskedEncoding.as_ref()), + EncodingRef::new_ref(ListEncoding.as_ref()), + EncodingRef::new_ref(VarBinEncoding.as_ref()), + ]); + + Self { + registry: encodings, + } + } +} + +/// Session data for Vortex arrays. +pub trait ArraySessionExt: SessionExt { + /// Returns the array encoding registry. + fn arrays(&self) -> Ref<'_, ArraySession> { + self.get::() + } +} +impl ArraySessionExt for S {} diff --git a/vortex-cxx/Cargo.toml b/vortex-cxx/Cargo.toml index 8084087d09d..805f79522dd 100644 --- a/vortex-cxx/Cargo.toml +++ b/vortex-cxx/Cargo.toml @@ -24,12 +24,12 @@ crate-type = ["staticlib"] anyhow = { workspace = true } arrow-array = { workspace = true, features = ["ffi"] } arrow-schema = { workspace = true } +async-fs = { workspace = true } cxx = "1.0" futures = { workspace = true } paste = { workspace = true } take_mut = { workspace = true } -tokio = { workspace = true, features = ["rt", "rt-multi-thread", "macros"] } -vortex = { workspace = true, features = ["tokio"] } +vortex = { workspace = true } [build-dependencies] cxx-build = "1.0" diff --git a/vortex-cxx/src/lib.rs b/vortex-cxx/src/lib.rs index 6867fbb7b43..089eda3d00b 100644 --- a/vortex-cxx/src/lib.rs +++ b/vortex-cxx/src/lib.rs @@ -14,7 +14,11 @@ use dtype::*; use expr::*; use read::*; use scalar::*; +use vortex::VortexSessionDefault; +use vortex::io::runtime::BlockingRuntime; use vortex::io::runtime::current::CurrentThreadRuntime; +use vortex::io::session::RuntimeSessionExt; +use vortex::session::VortexSession; use write::*; /// By default, the C++ API uses a current-thread runtime, providing control of the threading @@ -24,6 +28,8 @@ use write::*; // this runtime. pub(crate) static RUNTIME: LazyLock = LazyLock::new(CurrentThreadRuntime::new); +pub(crate) static SESSION: LazyLock = + LazyLock::new(|| VortexSession::default().with_handle(RUNTIME.handle())); #[cxx::bridge(namespace = "vortex::ffi")] mod ffi { diff --git a/vortex-cxx/src/read.rs b/vortex-cxx/src/read.rs index 466d1eb97d3..fff58a4a08c 100644 --- a/vortex-cxx/src/read.rs +++ b/vortex-cxx/src/read.rs @@ -13,13 +13,13 @@ use futures::stream::TryStreamExt; use vortex::ArrayRef; use vortex::arrow::IntoArrowArray; use vortex::buffer::Buffer; -use vortex::file::VortexOpenOptions; +use vortex::file::OpenOptionsSessionExt; use vortex::io::runtime::BlockingRuntime; use vortex::scan::ScanBuilder; use vortex::scan::arrow::RecordBatchIteratorAdapter; -use crate::RUNTIME; use crate::expr::Expr; +use crate::{RUNTIME, SESSION}; pub(crate) struct VortexFile { inner: vortex::file::VortexFile, @@ -41,17 +41,13 @@ impl VortexFile { /// File operations - using blocking operations for simplicity /// TODO(xinyu): object store (see vortex-ffi) pub(crate) fn open_file(path: &str) -> Result> { - let file = RUNTIME.block_on(|h| { - VortexOpenOptions::new() - .with_handle(h) - .open(std::path::Path::new(path)) - })?; + let file = RUNTIME.block_on(SESSION.open_options().open(std::path::Path::new(path)))?; Ok(Box::new(VortexFile { inner: file })) } pub(crate) fn open_file_from_buffer(data: &[u8]) -> Result> { let buffer = Buffer::from(data.to_vec()); - let file = VortexOpenOptions::new().open_buffer(buffer)?; + let file = SESSION.open_options().open_buffer(buffer)?; Ok(Box::new(VortexFile { inner: file })) } @@ -161,7 +157,6 @@ pub(crate) fn scan_builder_into_threadsafe_cloneable_reader( let stream = builder .inner - .with_handle(RUNTIME.handle()) .map(move |b| { b.into_arrow(&data_type) .map(|struct_array| RecordBatch::from(struct_array.as_struct())) diff --git a/vortex-cxx/src/write.rs b/vortex-cxx/src/write.rs index ed7de1f8207..5ec0a878585 100644 --- a/vortex-cxx/src/write.rs +++ b/vortex-cxx/src/write.rs @@ -1,29 +1,21 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors -use std::sync::LazyLock; - use anyhow::Result; use arrow_array::RecordBatchReader; use arrow_array::ffi_stream::{ArrowArrayStreamReader, FFI_ArrowArrayStream}; -use tokio::runtime::Runtime; use vortex::ArrayRef; use vortex::arrow::FromArrowArray; use vortex::dtype::DType; use vortex::dtype::arrow::FromArrowType; -use vortex::error::{VortexError, VortexExpect}; -use vortex::file::VortexWriteOptions as WriteOptions; +use vortex::error::VortexError; +use vortex::file::{VortexWriteOptions as WriteOptions, WriteOptionsSessionExt}; use vortex::io::VortexWrite; -use vortex::io::runtime::tokio::TokioRuntime; +use vortex::io::runtime::BlockingRuntime; use vortex::iter::{ArrayIteratorAdapter, ArrayIteratorExt}; use vortex::stream::ArrayStream; -/// The tokio runtime for the write-side. -static RUNTIME: LazyLock = LazyLock::new(|| { - Runtime::new() - .map_err(VortexError::from) - .vortex_expect("Failed to create tokio runtime") -}); +use crate::{RUNTIME, SESSION}; pub(crate) struct VortexWriteOptions { inner: WriteOptions, @@ -31,7 +23,7 @@ pub(crate) struct VortexWriteOptions { pub(crate) fn write_options_new() -> Box { Box::new(VortexWriteOptions { - inner: WriteOptions::default().with_handle(TokioRuntime::current()), + inner: SESSION.write_options(), }) } @@ -66,7 +58,7 @@ pub(crate) unsafe fn write_array_stream( let vortex_stream = arrow_stream_to_vortex_stream(stream_reader)?; RUNTIME.block_on(async { - let mut file = tokio::fs::File::create(path).await?; + let mut file = async_fs::File::create(path).await?; options.inner.write(&mut file, vortex_stream).await?; file.shutdown().await?; Ok(()) diff --git a/vortex-datafusion/examples/vortex_table.rs b/vortex-datafusion/examples/vortex_table.rs index cb48e415da3..71ce8bf8d5d 100644 --- a/vortex-datafusion/examples/vortex_table.rs +++ b/vortex-datafusion/examples/vortex_table.rs @@ -9,16 +9,20 @@ use datafusion::datasource::listing::{ use datafusion::prelude::SessionContext; use tempfile::tempdir; use tokio::fs::OpenOptions; -use vortex::IntoArray; use vortex::arrays::{ChunkedArray, StructArray, VarBinArray}; use vortex::buffer::buffer; use vortex::error::vortex_err; -use vortex::file::VortexWriteOptions; +use vortex::file::WriteOptionsSessionExt; +use vortex::io::session::RuntimeSessionExt; +use vortex::session::VortexSession; use vortex::validity::Validity; +use vortex::{IntoArray, VortexSessionDefault}; use vortex_datafusion::VortexFormat; #[tokio::main] async fn main() -> anyhow::Result<()> { + let session = VortexSession::default().with_tokio(); + let temp_dir = tempdir()?; let strings = ChunkedArray::from_iter([ VarBinArray::from(vec!["ab", "foo", "bar", "baz"]).into_array(), @@ -48,12 +52,13 @@ async fn main() -> anyhow::Result<()> { .open(&filepath) .await?; - VortexWriteOptions::default() + session + .write_options() .write(&mut f, st.to_array_stream()) .await?; let ctx = SessionContext::new(); - let format = Arc::new(VortexFormat::default()); + let format = Arc::new(VortexFormat::new(session)); let table_url = ListingTableUrl::parse( filepath .to_str() diff --git a/vortex-datafusion/src/persistent/cache.rs b/vortex-datafusion/src/persistent/cache.rs index dbcb4cf494e..446e5c44f22 100644 --- a/vortex-datafusion/src/persistent/cache.rs +++ b/vortex-datafusion/src/persistent/cache.rs @@ -12,8 +12,9 @@ use object_store::{ObjectMeta, ObjectStore}; use vortex::buffer::ByteBuffer; use vortex::dtype::DType; use vortex::error::{VortexError, VortexResult, vortex_err}; -use vortex::file::{Footer, SegmentSpec, VortexFile, VortexOpenOptions}; +use vortex::file::{Footer, OpenOptionsSessionExt, SegmentSpec, VortexFile}; use vortex::layout::segments::{SegmentCache, SegmentId}; +use vortex::metrics::MetricsSessionExt; use vortex::session::VortexSession; use vortex::stats::{Precision, Stat}; use vortex::utils::aliases::DefaultHashBuilder; @@ -22,7 +23,7 @@ use vortex::utils::aliases::DefaultHashBuilder; pub(crate) struct VortexFileCache { file_cache: Cache, segment_cache: Cache, - session: Arc, + session: VortexSession, } /// Cache key for a [`VortexFile`]. @@ -49,7 +50,7 @@ struct SegmentKey { } impl VortexFileCache { - pub fn new(size_mb: usize, segment_size_mb: usize, session: Arc) -> Self { + pub fn new(size_mb: usize, segment_size_mb: usize, session: VortexSession) -> Self { let file_cache = Cache::builder() .max_capacity(size_mb as u64 * (1 << 20)) .eviction_listener(|k: Arc, _v: VortexFile, cause| { @@ -84,10 +85,8 @@ impl VortexFileCache { self.file_cache .try_get_with( file_key.clone(), - VortexOpenOptions::new() - // FIXME(ngates): we don't really want to clone on every open... - .with_array_registry(Arc::new(self.session.arrays().clone())) - .with_layout_registry(Arc::new(self.session.layouts().clone())) + self.session + .open_options() .with_metrics( self.session .metrics() diff --git a/vortex-datafusion/src/persistent/format.rs b/vortex-datafusion/src/persistent/format.rs index 45f8607d782..2032aef92c0 100644 --- a/vortex-datafusion/src/persistent/format.rs +++ b/vortex-datafusion/src/persistent/format.rs @@ -33,11 +33,10 @@ use vortex::dtype::arrow::FromArrowType; use vortex::dtype::{DType, Nullability, PType}; use vortex::error::{VortexExpect, VortexResult, vortex_err}; use vortex::file::VORTEX_FILE_EXTENSION; -use vortex::metrics::VortexMetrics; use vortex::scalar::Scalar; use vortex::session::VortexSession; -use vortex::stats; use vortex::stats::{Stat, StatsSet}; +use vortex::{VortexSessionDefault, stats}; use super::cache::VortexFileCache; use super::sink::VortexSink; @@ -47,7 +46,7 @@ use crate::convert::TryToDataFusion; /// Vortex implementation of a DataFusion [`FileFormat`]. pub struct VortexFormat { - session: Arc, + session: VortexSession, file_cache: VortexFileCache, opts: VortexOptions, } @@ -79,7 +78,7 @@ impl Eq for VortexOptions {} /// Minimal factory to create [`VortexFormat`] instances. #[derive(Debug)] pub struct VortexFormatFactory { - session: Arc, + session: VortexSession, options: Option, } @@ -94,7 +93,7 @@ impl VortexFormatFactory { #[allow(clippy::new_without_default)] // FormatFactory defines `default` method, so having `Default` implementation is confusing. pub fn new() -> Self { Self { - session: Arc::new(VortexSession::default()), + session: VortexSession::default(), options: None, } } @@ -102,7 +101,7 @@ impl VortexFormatFactory { /// Creates a new instance with customized session and default options for all [`VortexFormat`] instances created from this factory. /// /// The options can be overridden by table-level configuration pass in [`FileFormatFactory::create`]. - pub fn new_with_options(session: Arc, options: VortexOptions) -> Self { + pub fn new_with_options(session: VortexSession, options: VortexOptions) -> Self { Self { session, options: Some(options), @@ -146,7 +145,7 @@ impl FileFormatFactory for VortexFormatFactory { } fn default(&self) -> Arc { - Arc::new(VortexFormat::default()) + Arc::new(VortexFormat::new(self.session.clone())) } fn as_any(&self) -> &dyn Any { @@ -154,20 +153,14 @@ impl FileFormatFactory for VortexFormatFactory { } } -impl Default for VortexFormat { - fn default() -> Self { - Self::new(Arc::new(VortexSession::default())) - } -} - impl VortexFormat { /// Create a new instance with default options. - pub fn new(session: Arc) -> Self { + pub fn new(session: VortexSession) -> Self { Self::new_with_options(session, VortexOptions::default()) } /// Creates a new instance with configured by a [`VortexOptions`]. - pub fn new_with_options(session: Arc, opts: VortexOptions) -> Self { + pub fn new_with_options(session: VortexSession, opts: VortexOptions) -> Self { Self { session: session.clone(), file_cache: VortexFileCache::new( @@ -373,7 +366,7 @@ impl FileFormat for VortexFormat { _state: &dyn Session, file_scan_config: FileScanConfig, ) -> DFResult> { - let source = VortexSource::new(self.file_cache.clone(), self.session.metrics().clone()); + let source = VortexSource::new(self.session.clone(), self.file_cache.clone()); let source = Arc::new(source); Ok(DataSourceExec::from_data_source( @@ -395,15 +388,15 @@ impl FileFormat for VortexFormat { } let schema = conf.output_schema().clone(); - let sink = Arc::new(VortexSink::new(conf, schema)); + let sink = Arc::new(VortexSink::new(conf, schema, self.session.clone())); Ok(Arc::new(DataSinkExec::new(input, sink, order_requirements)) as _) } fn file_source(&self) -> Arc { Arc::new(VortexSource::new( + self.session.clone(), self.file_cache.clone(), - VortexMetrics::default(), )) } } diff --git a/vortex-datafusion/src/persistent/metrics.rs b/vortex-datafusion/src/persistent/metrics.rs index a52d50ad44a..7b9ce4ad59e 100644 --- a/vortex-datafusion/src/persistent/metrics.rs +++ b/vortex-datafusion/src/persistent/metrics.rs @@ -12,7 +12,7 @@ use datafusion_physical_plan::metrics::{ use datafusion_physical_plan::{ ExecutionPlan, ExecutionPlanVisitor, Metric as DatafusionMetric, accept, }; -use vortex::metrics::{Metric, MetricId, Tags}; +use vortex::metrics::{Metric, MetricId, MetricsSessionExt, Tags}; use crate::persistent::source::VortexSource; @@ -51,7 +51,8 @@ impl ExecutionPlanVisitor for VortexMetricsFinder { { let mut set = MetricsSet::new(); for metric in scan - .metrics + .session + .metrics() .snapshot() .iter() .flat_map(|(id, metric)| metric_to_datafusion(id, metric)) diff --git a/vortex-datafusion/src/persistent/mod.rs b/vortex-datafusion/src/persistent/mod.rs index d3f9e886562..f9b472a6b93 100644 --- a/vortex-datafusion/src/persistent/mod.rs +++ b/vortex-datafusion/src/persistent/mod.rs @@ -49,12 +49,13 @@ mod tests { use rstest::rstest; use tempfile::{TempDir, tempdir}; use tokio::fs::OpenOptions; - use vortex::IntoArray; use vortex::arrays::{ChunkedArray, StructArray, VarBinArray}; use vortex::buffer::buffer; use vortex::error::vortex_err; - use vortex::file::VortexWriteOptions; + use vortex::file::WriteOptionsSessionExt; + use vortex::session::VortexSession; use vortex::validity::Validity; + use vortex::{IntoArray, VortexSessionDefault}; use crate::VortexFormatFactory; use crate::persistent::{VortexFormat, register_vortex_format_factory}; @@ -64,6 +65,7 @@ mod tests { #[case(None)] #[tokio::test] async fn query_file(#[case] limit: Option) -> anyhow::Result<()> { + let session = VortexSession::default(); let temp_dir = tempdir()?; let strings = ChunkedArray::from_iter([ VarBinArray::from(vec!["ab", "foo", "bar", "baz"]).into_array(), @@ -93,12 +95,13 @@ mod tests { .open(&filepath) .await?; - VortexWriteOptions::default() + session + .write_options() .write(&mut f, st.to_array_stream()) .await?; let ctx = SessionContext::default(); - let format = Arc::new(VortexFormat::default()); + let format = Arc::new(VortexFormat::new(session)); let table_url = ListingTableUrl::parse( temp_dir .path() @@ -166,7 +169,7 @@ mod tests { .sql(&format!( "CREATE EXTERNAL TABLE written_data \ (a TINYINT NOT NULL) \ - STORED AS vortex + STORED AS vortex LOCATION '{}/';", dir.path().to_str().unwrap() )) diff --git a/vortex-datafusion/src/persistent/opener.rs b/vortex-datafusion/src/persistent/opener.rs index 9fe2e2b63a9..7049436e3f2 100644 --- a/vortex-datafusion/src/persistent/opener.rs +++ b/vortex-datafusion/src/persistent/opener.rs @@ -28,6 +28,7 @@ use vortex::expr::{root, select}; use vortex::layout::LayoutReader; use vortex::metrics::VortexMetrics; use vortex::scan::ScanBuilder; +use vortex::session::VortexSession; use vortex_utils::aliases::dash_map::{DashMap, Entry}; use super::cache::VortexFileCache; @@ -35,6 +36,7 @@ use crate::convert::exprs::{can_be_pushed_down, make_vortex_predicate}; #[derive(Clone)] pub(crate) struct VortexOpener { + pub session: VortexSession, pub object_store: Arc, /// Projection by index of the file's columns pub projection: Option>, @@ -144,6 +146,7 @@ fn compute_logical_file_schema( impl FileOpener for VortexOpener { fn open(&self, file_meta: FileMeta, file: PartitionedFile) -> DFResult { + let session = self.session.clone(); let object_store = self.object_store.clone(); let projection = self.projection.clone(); let mut filter = self.filter.clone(); @@ -281,7 +284,7 @@ impl FileOpener for VortexOpener { } }; - let mut scan_builder = ScanBuilder::new(layout_reader); + let mut scan_builder = ScanBuilder::new(session, layout_reader); if let Some(file_range) = file_meta.range { scan_builder = apply_byte_range( file_range, @@ -386,6 +389,8 @@ fn byte_range_to_row_range(byte_range: Range, row_count: u64, total_size: u #[cfg(test)] mod tests { + use std::sync::LazyLock; + use arrow_schema::Fields; use chrono::Utc; use datafusion::arrow::array::{RecordBatch, StringArray, StructArray}; @@ -402,13 +407,16 @@ mod tests { use object_store::ObjectMeta; use object_store::memory::InMemory; use rstest::rstest; + use vortex::VortexSessionDefault; use vortex::arrow::FromArrowArray; - use vortex::file::VortexWriteOptions; + use vortex::file::WriteOptionsSessionExt; use vortex::io::{ObjectStoreWriter, VortexWrite}; use vortex::session::VortexSession; use super::*; + static SESSION: LazyLock = LazyLock::new(VortexSession::default); + #[rstest] #[case(0..100, 100, 100, 0..100)] #[case(0..105, 100, 105, 0..100)] @@ -460,7 +468,8 @@ mod tests { let path = Path::parse(path)?; let mut write = ObjectStoreWriter::new(object_store, &path).await?; - let summary = VortexWriteOptions::default() + let summary = SESSION + .write_options() .write(&mut write, array.to_array_stream()) .await?; write.shutdown().await?; @@ -493,7 +502,6 @@ mod tests { #[case] expected_result1: (usize, usize), #[case] expected_result2: (usize, usize), ) -> anyhow::Result<()> { - let vx_session = Arc::new(VortexSession::default()); let object_store = Arc::new(InMemory::new()) as Arc; let file_path = "part=1/file.vortex"; let batch = record_batch!(("a", Int32, vec![Some(1), Some(2), Some(3)])).unwrap(); @@ -510,6 +518,7 @@ mod tests { ])); let make_opener = |filter| VortexOpener { + session: SESSION.clone(), object_store: object_store.clone(), projection: Some([0].into()), filter: Some(filter), @@ -517,7 +526,7 @@ mod tests { expr_adapter_factory: expr_adapter_factory.clone(), schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory), partition_fields: vec![Arc::new(Field::new("part", DataType::Int32, false))], - file_cache: VortexFileCache::new(1, 1, vx_session.clone()), + file_cache: VortexFileCache::new(1, 1, SESSION.clone()), logical_schema: file_schema.clone(), batch_size: 100, limit: None, @@ -575,7 +584,6 @@ mod tests { ) -> anyhow::Result<()> { use datafusion::arrow::util::pretty::pretty_format_batches_with_options; - let vx_session = Arc::new(VortexSession::default()); let object_store = Arc::new(InMemory::new()) as Arc; let file1_path = "/path/file1.vortex"; let batch1 = record_batch!(("a", Int32, vec![Some(1), Some(2), Some(3)])).unwrap(); @@ -591,6 +599,7 @@ mod tests { let table_schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, true)])); let make_opener = |filter| VortexOpener { + session: SESSION.clone(), object_store: object_store.clone(), projection: Some([0].into()), filter: Some(filter), @@ -598,7 +607,7 @@ mod tests { expr_adapter_factory: expr_adapter_factory.clone(), schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory), partition_fields: vec![], - file_cache: VortexFileCache::new(1, 1, vx_session.clone()), + file_cache: VortexFileCache::new(1, 1, SESSION.clone()), logical_schema: table_schema.clone(), batch_size: 100, limit: None, @@ -654,7 +663,6 @@ mod tests { // a nested schema mismatch between the physical file schema and logical // table schema. async fn test_adapter_logical_physical_struct_mismatch() -> anyhow::Result<()> { - let vx_session = Arc::new(VortexSession::default()); let object_store = Arc::new(InMemory::new()) as Arc; let file_path = "/path/file.vortex"; let file_struct_fields = Fields::from(vec![ @@ -699,6 +707,7 @@ mod tests { )])); let opener = VortexOpener { + session: SESSION.clone(), object_store: object_store.clone(), projection: None, filter: Some(logical2physical( @@ -709,7 +718,7 @@ mod tests { expr_adapter_factory: Some(Arc::new(DefaultPhysicalExprAdapterFactory) as _), schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory), partition_fields: vec![], - file_cache: VortexFileCache::new(1, 1, vx_session), + file_cache: VortexFileCache::new(1, 1, SESSION.clone()), logical_schema: table_schema, batch_size: 100, limit: None, diff --git a/vortex-datafusion/src/persistent/sink.rs b/vortex-datafusion/src/persistent/sink.rs index 8a97b5118f8..e4bb14335f0 100644 --- a/vortex-datafusion/src/persistent/sink.rs +++ b/vortex-datafusion/src/persistent/sink.rs @@ -25,18 +25,24 @@ use vortex::arrow::FromArrowArray; use vortex::dtype::DType; use vortex::dtype::arrow::FromArrowType; use vortex::error::VortexResult; -use vortex::file::VortexWriteOptions; +use vortex::file::WriteOptionsSessionExt; use vortex::io::{ObjectStoreWriter, VortexWrite}; +use vortex::session::VortexSession; use vortex::stream::ArrayStreamAdapter; pub struct VortexSink { config: FileSinkConfig, schema: SchemaRef, + session: VortexSession, } impl VortexSink { - pub fn new(config: FileSinkConfig, schema: SchemaRef) -> Self { - Self { config, schema } + pub fn new(config: FileSinkConfig, schema: SchemaRef, session: VortexSession) -> Self { + Self { + config, + schema, + session, + } } } @@ -103,6 +109,7 @@ impl FileSink for VortexSink { // TODO(adamg): // 1. We can probably be better at signaling how much memory we're consuming (potentially when reading too), see ParquetSink::spawn_writer_tasks_and_join. while let Some((path, rx)) = file_stream_rx.recv().await { + let session = self.session.clone(); let row_counter = row_counter.clone(); let object_store = object_store.clone(); let writer_schema = get_writer_schema(&self.config); @@ -126,7 +133,8 @@ impl FileSink for VortexSink { )) })?; - VortexWriteOptions::default() + session + .write_options() .write(&mut sink, stream_adapter) .await .map_err(|e| { @@ -168,7 +176,6 @@ impl FileSink for VortexSink { #[cfg(test)] mod tests { - use std::sync::Arc; use arrow_schema::{DataType, Field, Schema}; diff --git a/vortex-datafusion/src/persistent/source.rs b/vortex-datafusion/src/persistent/source.rs index fc1fa76071b..455287637e5 100644 --- a/vortex-datafusion/src/persistent/source.rs +++ b/vortex-datafusion/src/persistent/source.rs @@ -27,7 +27,8 @@ use object_store::path::Path; use vortex::error::VortexExpect as _; use vortex::file::VORTEX_FILE_EXTENSION; use vortex::layout::LayoutReader; -use vortex::metrics::VortexMetrics; +use vortex::metrics::MetricsSessionExt; +use vortex::session::VortexSession; use vortex_utils::aliases::dash_map::DashMap; use super::cache::VortexFileCache; @@ -40,6 +41,7 @@ use crate::convert::exprs::can_be_pushed_down; /// [`DataSourceExec`]: datafusion_datasource::source::DataSourceExec #[derive(Clone)] pub struct VortexSource { + pub(crate) session: VortexSession, pub(crate) file_cache: VortexFileCache, /// Combined predicate expression containing all filters from DataFusion query planning. /// Used with FilePruner to skip files based on statistics and partition values. @@ -53,7 +55,6 @@ pub struct VortexSource { pub(crate) arrow_file_schema: Option, pub(crate) schema_adapter_factory: Option>, pub(crate) expr_adapter_factory: Option>, - pub(crate) metrics: VortexMetrics, _unused_df_metrics: ExecutionPlanMetricsSet, /// Shared layout readers, the source only lives as long as one scan. /// @@ -62,10 +63,10 @@ pub struct VortexSource { } impl VortexSource { - pub(crate) fn new(file_cache: VortexFileCache, metrics: VortexMetrics) -> Self { + pub(crate) fn new(session: VortexSession, file_cache: VortexFileCache) -> Self { Self { + session, file_cache, - metrics, full_predicate: None, vortex_predicate: None, batch_size: None, @@ -100,7 +101,8 @@ impl FileSource for VortexSource { partition: usize, ) -> Arc { let partition_metrics = self - .metrics + .session + .metrics() .child_with_tags([(PARTITION_LABEL, partition.to_string())].into_iter()); let batch_size = self @@ -135,6 +137,7 @@ impl FileSource for VortexSource { let projection = base_config.file_column_projection_indices().map(Arc::from); let opener = VortexOpener { + session: self.session.clone(), object_store, projection, filter: self.vortex_predicate.clone(), diff --git a/vortex-duckdb/src/copy.rs b/vortex-duckdb/src/copy.rs index fee0f1e35af..c45f78a57f7 100644 --- a/vortex-duckdb/src/copy.rs +++ b/vortex-duckdb/src/copy.rs @@ -12,14 +12,15 @@ use vortex::ArrayRef; use vortex::dtype::Nullability::{NonNullable, Nullable}; use vortex::dtype::{DType, StructFields}; use vortex::error::{VortexExpect, VortexResult, vortex_err}; -use vortex::file::{VortexWriteOptions, WriteSummary}; +use vortex::file::{WriteOptionsSessionExt, WriteSummary}; use vortex::io::runtime::current::CurrentThreadWorkerPool; use vortex::io::runtime::{BlockingRuntime, Task}; +use vortex::io::session::RuntimeSessionExt; use vortex::stream::ArrayStreamAdapter; -use crate::RUNTIME; use crate::convert::{data_chunk_to_arrow, from_duckdb_table}; use crate::duckdb::{CopyFunction, DataChunk, LogicalType}; +use crate::{RUNTIME, SESSION}; #[derive(Debug)] pub struct VortexCopyFunction; @@ -74,7 +75,7 @@ impl CopyFunction for VortexCopyFunction { chunk: &mut DataChunk, ) -> VortexResult<()> { let chunk = data_chunk_to_arrow(bind_data.fields.names(), chunk); - RUNTIME.block_on(|_h| async { + RUNTIME.block_on(async { init_global .sink .as_mut() @@ -91,7 +92,7 @@ impl CopyFunction for VortexCopyFunction { _bind_data: &Self::BindData, init_global: &mut Self::GlobalState, ) -> VortexResult<()> { - RUNTIME.block_on(|_h| async { + RUNTIME.block_on(async { if let Some(sink) = init_global.sink.take() { drop(sink) } @@ -113,12 +114,10 @@ impl CopyFunction for VortexCopyFunction { let (sink, rx) = mpsc::channel(32); let array_stream = ArrayStreamAdapter::new(bind_data.dtype.clone(), rx.into_stream()); - let writer = RUNTIME.handle().spawn_nested(|h| async move { + let handle = SESSION.handle(); + let writer = handle.spawn(async move { let mut file = async_fs::File::create(file_path).await?; - VortexWriteOptions::default() - .with_handle(h) - .write(&mut file, array_stream) - .await + SESSION.write_options().write(&mut file, array_stream).await }); let worker_pool = RUNTIME.new_pool(); diff --git a/vortex-duckdb/src/e2e_test/vortex_scan_test.rs b/vortex-duckdb/src/e2e_test/vortex_scan_test.rs index 52b7c229c0a..22b72e6057a 100644 --- a/vortex-duckdb/src/e2e_test/vortex_scan_test.rs +++ b/vortex-duckdb/src/e2e_test/vortex_scan_test.rs @@ -19,14 +19,14 @@ use vortex::arrays::{ VarBinArray, VarBinViewArray, }; use vortex::buffer::buffer; -use vortex::file::VortexWriteOptions; -use vortex::io::runtime::{BlockingRuntime, Handle}; +use vortex::file::WriteOptionsSessionExt; +use vortex::io::runtime::BlockingRuntime; use vortex::scalar::Scalar; use vortex::validity::Validity; use crate::cpp::{duckdb_string_t, duckdb_timestamp}; use crate::duckdb::{Connection, Database}; -use crate::{RUNTIME, cpp}; +use crate::{RUNTIME, SESSION, cpp}; fn database_connection() -> Connection { let db = Database::open_in_memory().unwrap(); @@ -39,24 +39,19 @@ fn create_temp_file() -> NamedTempFile { NamedTempFile::new().unwrap() } -async fn write_single_column_vortex_file( - handle: Handle, - field_name: &str, - array: impl IntoArray, -) -> NamedTempFile { - write_vortex_file(handle, [(field_name, array)].into_iter()).await +async fn write_single_column_vortex_file(field_name: &str, array: impl IntoArray) -> NamedTempFile { + write_vortex_file([(field_name, array)].into_iter()).await } async fn write_vortex_file( - handle: Handle, iter: impl Iterator, impl IntoArray)>, ) -> NamedTempFile { let temp_file_path = create_temp_file(); let struct_array = StructArray::try_from_iter(iter).unwrap(); let mut file = async_fs::File::create(&temp_file_path).await.unwrap(); - VortexWriteOptions::default() - .with_handle(handle) + SESSION + .write_options() .write(&mut file, struct_array.to_array_stream()) .await .unwrap(); @@ -138,7 +133,6 @@ fn scan_vortex_file>( } async fn write_vortex_file_to_dir( - handle: Handle, dir: &Path, field_name: &str, array: impl IntoArray, @@ -150,8 +144,8 @@ async fn write_vortex_file_to_dir( .unwrap(); let mut file = async_fs::File::create(&temp_file_path).await.unwrap(); - VortexWriteOptions::default() - .with_handle(handle) + SESSION + .write_options() .write(&mut file, struct_array.to_array_stream()) .await .unwrap(); @@ -176,9 +170,9 @@ fn test_scan_function_registration() { #[test] fn test_vortex_scan_strings() { - let file = RUNTIME.block_on(|h| async { + let file = RUNTIME.block_on(async { let strings = VarBinArray::from(vec!["Hello", "Hi", "Hey"]); - write_single_column_vortex_file(h, "strings", strings).await + write_single_column_vortex_file("strings", strings).await }); let result: String = scan_vortex_file_single_row( @@ -192,9 +186,9 @@ fn test_vortex_scan_strings() { #[test] fn test_vortex_scan_strings_contains() { - let file = RUNTIME.block_on(|h| async { + let file = RUNTIME.block_on(async { let strings = VarBinArray::from(vec!["Hello", "Hi", "Hey"]); - write_single_column_vortex_file(h, "strings", strings).await + write_single_column_vortex_file("strings", strings).await }); let result: String = scan_vortex_file_single_row( file, @@ -207,9 +201,9 @@ fn test_vortex_scan_strings_contains() { #[test] fn test_vortex_scan_integers() { - let file = RUNTIME.block_on(|h| async { + let file = RUNTIME.block_on(async { let numbers = buffer![1i32, 42, 100, -5, 0]; - write_single_column_vortex_file(h, "number", numbers).await + write_single_column_vortex_file("number", numbers).await }); let sum: i64 = scan_vortex_file_single_row::(file, "SELECT SUM(number) FROM vortex_scan(?)", 0); @@ -218,9 +212,9 @@ fn test_vortex_scan_integers() { #[test] fn test_vortex_scan_integers_in_list() { - let file = RUNTIME.block_on(|h| async { + let file = RUNTIME.block_on(async { let numbers = buffer![1i32, 42, 100, -5, 0]; - write_single_column_vortex_file(h, "number", numbers).await + write_single_column_vortex_file("number", numbers).await }); let sum: i64 = scan_vortex_file_single_row::( file, @@ -232,9 +226,9 @@ fn test_vortex_scan_integers_in_list() { #[test] fn test_vortex_scan_integers_between() { - let file = RUNTIME.block_on(|h| async { + let file = RUNTIME.block_on(async { let numbers = buffer![1i32, 42, 100, -5, 0]; - write_single_column_vortex_file(h, "number", numbers).await + write_single_column_vortex_file("number", numbers).await }); let sum: i64 = scan_vortex_file_single_row::( file, @@ -246,9 +240,9 @@ fn test_vortex_scan_integers_between() { #[test] fn test_vortex_scan_floats() { - let file = RUNTIME.block_on(|h| async { + let file = RUNTIME.block_on(async { let values = buffer![1.5f64, -2.5, 0.0, 42.42]; - write_single_column_vortex_file(h, "value", values).await + write_single_column_vortex_file("value", values).await }); let count: i64 = scan_vortex_file_single_row::( file, @@ -260,9 +254,9 @@ fn test_vortex_scan_floats() { #[test] fn test_vortex_scan_constant() { - let file = RUNTIME.block_on(|h| async { + let file = RUNTIME.block_on(async { let constant = ConstantArray::new(Scalar::from(42i32), 100); - write_single_column_vortex_file(h, "constant", constant).await + write_single_column_vortex_file("constant", constant).await }); let value: i32 = scan_vortex_file_single_row::( file, @@ -274,10 +268,10 @@ fn test_vortex_scan_constant() { #[test] fn test_vortex_scan_booleans() { - let file = RUNTIME.block_on(|h| async { + let file = RUNTIME.block_on(async { let flags = vec![true, false, true, true, false]; let flags_array = BoolArray::from_bit_buffer(flags.into(), Validity::NonNullable); - write_single_column_vortex_file(h, "flag", flags_array).await + write_single_column_vortex_file("flag", flags_array).await }); let true_count: i64 = scan_vortex_file_single_row::( file, @@ -289,7 +283,7 @@ fn test_vortex_scan_booleans() { #[test] fn test_vortex_multi_column() { - let file = RUNTIME.block_on(|h| async { + let file = RUNTIME.block_on(async { let f1 = BoolArray::from_bit_buffer( vec![true, false, true, true, false].into(), Validity::NonNullable, @@ -297,7 +291,7 @@ fn test_vortex_multi_column() { .to_array(); let f2 = (0..5).collect::().to_array(); let f3 = (100..105).collect::().to_array(); - write_vortex_file(h, [("f1", f1), ("f2", f2), ("f3", f3)].into_iter()).await + write_vortex_file([("f1", f1), ("f2", f2), ("f3", f3)].into_iter()).await }); let result: Vec = scan_vortex_file::( @@ -312,15 +306,12 @@ fn test_vortex_multi_column() { #[test] fn test_vortex_scan_multiple_files() { - let (tempdir, _file1, _file2) = RUNTIME.block_on(|h| async { + let (tempdir, _file1, _file2) = RUNTIME.block_on(async { let tempdir = tempfile::tempdir().unwrap(); - let file1 = - write_vortex_file_to_dir(h.clone(), tempdir.path(), "numbers", buffer![1i32, 2, 3]) - .await; + let file1 = write_vortex_file_to_dir(tempdir.path(), "numbers", buffer![1i32, 2, 3]).await; - let file2 = - write_vortex_file_to_dir(h, tempdir.path(), "numbers", buffer![4i32, 5, 6]).await; + let file2 = write_vortex_file_to_dir(tempdir.path(), "numbers", buffer![4i32, 5, 6]).await; (tempdir, file1, file2) }); @@ -396,7 +387,7 @@ fn test_write_timestamps() { fn test_vortex_scan_fixed_size_list_utf8() { // Test a simple FixedSizeList of Utf8 strings to ensure proper materialization. - let file = RUNTIME.block_on(|h| async { + let file = RUNTIME.block_on(async { // Create a large number of strings to stress test. let strings: Vec<&str> = (0..24) .map(|i| match i % 6 { @@ -419,7 +410,7 @@ fn test_vortex_scan_fixed_size_list_utf8() { 6, // 6 lists total ); - write_single_column_vortex_file(h, "string_lists", fsl).await + write_single_column_vortex_file("string_lists", fsl).await }); let conn = database_connection(); @@ -447,7 +438,8 @@ fn test_vortex_scan_nested_fixed_size_list_utf8() { // when running with `FixedSizeList` instead of `List`. // Test FixedSizeList of FixedSizeList of Utf8 to ensure proper materialization. - let file = RUNTIME.block_on(|h| async { + + let file = RUNTIME.block_on(async { // Create a large number of strings to stress test. let strings: Vec<&str> = (0..24) .map(|i| match i % 6 { @@ -478,7 +470,7 @@ fn test_vortex_scan_nested_fixed_size_list_utf8() { 2, // 2 outer lists ); - write_single_column_vortex_file(h, "nested_string_lists", outer_fsl).await + write_single_column_vortex_file("nested_string_lists", outer_fsl).await }); let conn = database_connection(); @@ -504,7 +496,7 @@ fn test_vortex_scan_nested_fixed_size_list_utf8() { fn test_vortex_scan_list_of_ints() { // Test a simple List of integers. - let file = RUNTIME.block_on(|h| async { + let file = RUNTIME.block_on(async { // Create integers that will be grouped into lists. let integers = PrimitiveArray::from_iter([ 10i32, 20, 30, 40, 50, 60, 70, 80, 90, 100, 110, 120, 130, 140, 150, @@ -524,7 +516,7 @@ fn test_vortex_scan_list_of_ints() { ) .unwrap(); - write_single_column_vortex_file(h, "int_list", list_array).await + write_single_column_vortex_file("int_list", list_array).await }); let conn = database_connection(); @@ -556,7 +548,7 @@ fn test_vortex_scan_list_of_ints() { fn test_vortex_scan_list_of_utf8() { // Test a simple List of UTF8 strings. - let file = RUNTIME.block_on(|h| async { + let file = RUNTIME.block_on(async { // Create UTF8 strings that will be grouped into lists. let strings = VarBinViewArray::from_iter_str(vec![ "apple", @@ -586,7 +578,7 @@ fn test_vortex_scan_list_of_utf8() { ) .unwrap(); - write_single_column_vortex_file(h, "string_list", list_array).await + write_single_column_vortex_file("string_list", list_array).await }); let conn = database_connection(); @@ -621,7 +613,7 @@ fn test_vortex_scan_ultra_deep_nesting() { // Test ultra-deep nesting: Multiple levels of FSL and List combinations with UTF8. // FSL[List[FSL[List[FSL[UTF8]]]]] - let file = RUNTIME.block_on(|h| async { + let file = RUNTIME.block_on(async { // Level 1: Create base UTF8 strings - need a lot for deep nesting. let strings = VarBinViewArray::from_iter_str( (0..360) @@ -682,7 +674,7 @@ fn test_vortex_scan_ultra_deep_nesting() { 1, // 1 outermost FSL ); - write_single_column_vortex_file(h, "ultra_deep", outermost_fsl).await + write_single_column_vortex_file("ultra_deep", outermost_fsl).await }); let conn = database_connection(); diff --git a/vortex-duckdb/src/lib.rs b/vortex-duckdb/src/lib.rs index b85930163eb..39dad2de72a 100644 --- a/vortex-duckdb/src/lib.rs +++ b/vortex-duckdb/src/lib.rs @@ -2,12 +2,16 @@ // SPDX-FileCopyrightText: Copyright the Vortex contributors #![allow(clippy::missing_safety_doc)] -// **WARNING end + use std::ffi::{CStr, c_char}; use std::sync::LazyLock; +use vortex::VortexSessionDefault; use vortex::error::{VortexExpect, VortexResult}; +use vortex::io::runtime::BlockingRuntime; use vortex::io::runtime::current::CurrentThreadRuntime; +use vortex::io::session::RuntimeSessionExt; +use vortex::session::VortexSession; use crate::copy::VortexCopyFunction; use crate::duckdb::Config; @@ -32,6 +36,8 @@ mod e2e_test; // A global runtime for Vortex operations within DuckDB. static RUNTIME: LazyLock = LazyLock::new(CurrentThreadRuntime::new); +static SESSION: LazyLock = + LazyLock::new(|| VortexSession::default().with_handle(RUNTIME.handle())); /// Register Vortex extension configuration options with DuckDB. /// This must be called before `register_table_functions` to take effect. diff --git a/vortex-duckdb/src/scan.rs b/vortex-duckdb/src/scan.rs index 10b7583f6d4..21e78d3ffed 100644 --- a/vortex-duckdb/src/scan.rs +++ b/vortex-duckdb/src/scan.rs @@ -19,12 +19,11 @@ use url::Url; use vortex::dtype::FieldNames; use vortex::error::{VortexExpect, VortexResult, vortex_bail, vortex_err}; use vortex::expr::{ExprRef, and, and_collect, col, lit, root, select}; -use vortex::file::{VortexFile, VortexOpenOptions}; +use vortex::file::{OpenOptionsSessionExt, VortexFile, VortexOpenOptions}; use vortex::io::runtime::BlockingRuntime; use vortex::io::runtime::current::ThreadSafeIterator; use vortex::{ArrayRef, ToCanonical}; -use crate::RUNTIME; use crate::convert::{try_from_bound_expression, try_from_table_filter}; use crate::duckdb::footer_cache::FooterCache; use crate::duckdb::{ @@ -34,6 +33,7 @@ use crate::duckdb::{ use crate::exporter::{ArrayExporter, ConversionCache}; use crate::utils::glob::expand_glob; use crate::utils::object_store::s3_store; +use crate::{RUNTIME, SESSION}; pub struct VortexBindData { first_file: VortexFile, @@ -245,8 +245,9 @@ impl TableFunction for VortexTableFunction { log::trace!("running scan with max_threads {max_threads}"); - let (file_urls, _metadata) = RUNTIME - .block_on(|_h| Compat::new(expand_glob(file_glob_string.as_ref().as_string())))?; + let (file_urls, _metadata) = RUNTIME.block_on(Compat::new(expand_glob( + file_glob_string.as_ref().as_string(), + )))?; // The first file is skipped in `create_file_paths_queue`. let Some(first_file_url) = file_urls.first() else { @@ -255,8 +256,8 @@ impl TableFunction for VortexTableFunction { let footer_cache = FooterCache::new(ctx.object_cache()); let entry = footer_cache.entry(first_file_url.as_ref()); - let first_file = RUNTIME.block_on(|h| async move { - let options = entry.apply_to_file(VortexOpenOptions::new().with_handle(h)); + let first_file = RUNTIME.block_on(async move { + let options = entry.apply_to_file(SESSION.open_options()); let file = open_file(first_file_url.clone(), options).await?; entry.put_if_absent(|| file.footer().clone()); VortexResult::Ok(file) @@ -355,7 +356,7 @@ impl TableFunction for VortexTableFunction { let object_cache = object_cache; handle - .spawn_nested(move |handle| async move { + .spawn(async move { let vxf = if idx == 0 { // The first path from `file_paths` is skipped as // the first file was already opened during bind. @@ -363,9 +364,7 @@ impl TableFunction for VortexTableFunction { } else { let cache = FooterCache::new(object_cache); let entry = cache.entry(url.as_ref()); - let options = entry.apply_to_file( - VortexOpenOptions::new().with_handle(handle.clone()), - ); + let options = entry.apply_to_file(SESSION.open_options()); let file = open_file(url.clone(), options).await?; entry.put_if_absent(|| file.footer().clone()); VortexResult::Ok(file) @@ -379,7 +378,6 @@ impl TableFunction for VortexTableFunction { let scan = vxf .scan()? - .with_handle(handle) .with_some_filter(filter_expr) .with_projection(projection_expr) .with_ordered(false) diff --git a/vortex-expr/Cargo.toml b/vortex-expr/Cargo.toml index abbdc30c520..4fd5b44ebb5 100644 --- a/vortex-expr/Cargo.toml +++ b/vortex-expr/Cargo.toml @@ -38,6 +38,7 @@ vortex-error = { workspace = true, features = ["prost"] } vortex-mask = { workspace = true } vortex-proto = { workspace = true, features = ["expr"] } vortex-scalar = { workspace = true } +vortex-session = { workspace = true } vortex-utils = { workspace = true } [dev-dependencies] diff --git a/vortex-expr/src/lib.rs b/vortex-expr/src/lib.rs index 1846f47e743..4da2f27a58a 100644 --- a/vortex-expr/src/lib.rs +++ b/vortex-expr/src/lib.rs @@ -28,9 +28,9 @@ mod field; pub mod forms; pub mod proto; pub mod pruning; -mod registry; mod scope; mod scope_vars; +pub mod session; pub mod transform; pub mod traversal; mod vtable; @@ -50,7 +50,6 @@ pub use merge::*; pub use not::*; pub use operators::*; pub use pack::*; -pub use registry::*; pub use root::*; pub use scope::*; pub use scope_vars::*; diff --git a/vortex-expr/src/proto.rs b/vortex-expr/src/proto.rs index a81d8124784..4f06a8a7b51 100644 --- a/vortex-expr/src/proto.rs +++ b/vortex-expr/src/proto.rs @@ -5,7 +5,7 @@ use itertools::Itertools; use vortex_error::{VortexResult, vortex_err}; use vortex_proto::expr as pb; -use crate::registry::ExprRegistry; +use crate::session::ExprRegistry; use crate::{ExprRef, VortexExpr}; pub trait ExprSerializeProtoExt { @@ -37,7 +37,7 @@ impl ExprSerializeProtoExt for dyn VortexExpr + '_ { pub fn deserialize_expr_proto(expr: &pb::Expr, registry: &ExprRegistry) -> VortexResult { let expr_id = expr.id.as_str(); let encoding = registry - .get(expr_id) + .find(expr_id) .ok_or_else(|| vortex_err!("unknown expression id: {}", expr_id))?; let children = expr @@ -56,12 +56,12 @@ mod tests { use vortex_proto::expr as pb; use crate::proto::{ExprSerializeProtoExt, deserialize_expr_proto}; - use crate::registry::ExprRegistryExt; - use crate::{ExprRef, ExprRegistry, and, between, eq, get_item, lit, or, root}; + use crate::session::ExprSession; + use crate::{ExprRef, and, between, eq, get_item, lit, or, root}; #[test] fn expression_serde() { - let registry = ExprRegistry::default(); + let registry = ExprSession::default().registry().clone(); let expr: ExprRef = or( and( between( diff --git a/vortex-expr/src/registry.rs b/vortex-expr/src/registry.rs deleted file mode 100644 index 47a07b5b0f3..00000000000 --- a/vortex-expr/src/registry.rs +++ /dev/null @@ -1,40 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 -// SPDX-FileCopyrightText: Copyright the Vortex contributors - -use vortex_array::VTableRegistry; - -use crate::{ - BetweenExprEncoding, BinaryExprEncoding, CastExprEncoding, ExprEncodingRef, - GetItemExprEncoding, IsNullExprEncoding, LikeExprEncoding, ListContainsExprEncoding, - LiteralExprEncoding, MergeExprEncoding, NotExprEncoding, PackExprEncoding, RootExprEncoding, - SelectExprEncoding, -}; - -pub type ExprRegistry = VTableRegistry; - -pub trait ExprRegistryExt { - /// Creates a default expression registry with built-in Vortex expressions pre-registered. - fn default() -> Self; -} - -impl ExprRegistryExt for ExprRegistry { - fn default() -> Self { - let mut this = Self::empty(); - this.register_many([ - ExprEncodingRef::new_ref(BetweenExprEncoding.as_ref()), - ExprEncodingRef::new_ref(BinaryExprEncoding.as_ref()), - ExprEncodingRef::new_ref(CastExprEncoding.as_ref()), - ExprEncodingRef::new_ref(GetItemExprEncoding.as_ref()), - ExprEncodingRef::new_ref(IsNullExprEncoding.as_ref()), - ExprEncodingRef::new_ref(LikeExprEncoding.as_ref()), - ExprEncodingRef::new_ref(ListContainsExprEncoding.as_ref()), - ExprEncodingRef::new_ref(LiteralExprEncoding.as_ref()), - ExprEncodingRef::new_ref(MergeExprEncoding.as_ref()), - ExprEncodingRef::new_ref(NotExprEncoding.as_ref()), - ExprEncodingRef::new_ref(PackExprEncoding.as_ref()), - ExprEncodingRef::new_ref(RootExprEncoding.as_ref()), - ExprEncodingRef::new_ref(SelectExprEncoding.as_ref()), - ]); - this - } -} diff --git a/vortex-expr/src/session.rs b/vortex-expr/src/session.rs new file mode 100644 index 00000000000..8665fdf7c4f --- /dev/null +++ b/vortex-expr/src/session.rs @@ -0,0 +1,73 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use vortex_session::registry::Registry; +use vortex_session::{Ref, SessionExt}; + +use crate::{ + BetweenExprEncoding, BinaryExprEncoding, CastExprEncoding, ExprEncodingRef, + GetItemExprEncoding, IsNullExprEncoding, LikeExprEncoding, ListContainsExprEncoding, + LiteralExprEncoding, MergeExprEncoding, NotExprEncoding, PackExprEncoding, RootExprEncoding, + SelectExprEncoding, +}; + +/// Registry of expression encodings. +pub type ExprRegistry = Registry; + +/// Session state for expression encodings. +#[derive(Debug)] +pub struct ExprSession { + registry: ExprRegistry, +} + +impl ExprSession { + pub fn registry(&self) -> &ExprRegistry { + &self.registry + } + + /// Register an expression encoding in the session, replacing any existing encoding with the same ID. + pub fn register(&self, expr: ExprEncodingRef) { + self.registry.register(expr) + } + + /// Register expression encodings in the session, replacing any existing encodings with the same IDs. + pub fn register_many(&self, exprs: impl IntoIterator) { + self.registry.register_many(exprs); + } +} + +impl Default for ExprSession { + fn default() -> Self { + let expressions = ExprRegistry::default(); + + // Register built-in expressions here if needed. + expressions.register_many([ + ExprEncodingRef::new_ref(BetweenExprEncoding.as_ref()), + ExprEncodingRef::new_ref(BinaryExprEncoding.as_ref()), + ExprEncodingRef::new_ref(CastExprEncoding.as_ref()), + ExprEncodingRef::new_ref(GetItemExprEncoding.as_ref()), + ExprEncodingRef::new_ref(IsNullExprEncoding.as_ref()), + ExprEncodingRef::new_ref(LikeExprEncoding.as_ref()), + ExprEncodingRef::new_ref(ListContainsExprEncoding.as_ref()), + ExprEncodingRef::new_ref(LiteralExprEncoding.as_ref()), + ExprEncodingRef::new_ref(MergeExprEncoding.as_ref()), + ExprEncodingRef::new_ref(NotExprEncoding.as_ref()), + ExprEncodingRef::new_ref(PackExprEncoding.as_ref()), + ExprEncodingRef::new_ref(RootExprEncoding.as_ref()), + ExprEncodingRef::new_ref(SelectExprEncoding.as_ref()), + ]); + + Self { + registry: expressions, + } + } +} + +/// Extension trait for accessing expression session data. +pub trait ExprSessionExt: SessionExt { + /// Returns the expression encoding registry. + fn expressions(&self) -> Ref<'_, ExprSession> { + self.get::() + } +} +impl ExprSessionExt for S {} diff --git a/vortex-expr/src/vtable.rs b/vortex-expr/src/vtable.rs index e332c9bb25e..d81f1554827 100644 --- a/vortex-expr/src/vtable.rs +++ b/vortex-expr/src/vtable.rs @@ -131,12 +131,13 @@ mod tests { use super::*; use crate::proto::{ExprSerializeProtoExt, deserialize_expr_proto}; + use crate::session::{ExprRegistry, ExprSession}; use crate::*; #[fixture] #[once] fn registry() -> ExprRegistry { - ExprRegistry::default() + ExprSession::default().registry().clone() } #[rstest] diff --git a/vortex-ffi/Cargo.toml b/vortex-ffi/Cargo.toml index 03b9e33bf13..c197e9e15e9 100644 --- a/vortex-ffi/Cargo.toml +++ b/vortex-ffi/Cargo.toml @@ -20,6 +20,8 @@ categories = { workspace = true } all-features = true [dependencies] +async-fs = { workspace = true } +futures = { workspace = true } itertools = { workspace = true } log = { workspace = true } mimalloc = { workspace = true, optional = true } @@ -29,8 +31,6 @@ parking_lot = { workspace = true } paste = { workspace = true } prost = { workspace = true } simplelog = { workspace = true } -tokio = { workspace = true, features = ["rt-multi-thread"] } -tokio-stream = { workspace = true } url = { workspace = true, features = [] } vortex = { workspace = true, features = ["object_store"] } diff --git a/vortex-ffi/cinclude/vortex.h b/vortex-ffi/cinclude/vortex.h index d91f10c9d30..0a671fb7a44 100644 --- a/vortex-ffi/cinclude/vortex.h +++ b/vortex-ffi/cinclude/vortex.h @@ -379,12 +379,6 @@ typedef struct { extern "C" { #endif // __cplusplus -/** - * Attempt to shutdown the shared tokio runtime if no sessions are active. - * May block indefinitely if the runtime is still running tasks. - */ -void vx_try_shutdown_runtime(void); - /** * Clone a borrowed [`vx_array`], returning an owned [`vx_array`]. * @@ -668,11 +662,14 @@ void vx_file_free(const vx_file *ptr); /** * Open a file at the given path on the file system. */ -const vx_file *vx_file_open_reader(const vx_file_open_options *options, - const vx_session *session, +const vx_file *vx_file_open_reader(const vx_session *session, + const vx_file_open_options *options, vx_error **error_out); -void vx_file_write_array(const char *path, const vx_array *array, vx_error **error_out); +void vx_file_write_array(const vx_session *session, + const char *path, + const vx_array *array, + vx_error **error_out); uint64_t vx_file_row_count(const vx_file *file); @@ -687,7 +684,8 @@ const vx_dtype *vx_file_dtype(const vx_file *file); /** * Can we prune the whole file using file stats and an expression */ -bool vx_file_can_prune(const vx_file *file, +bool vx_file_can_prune(const vx_session *session, + const vx_file *file, const char *filter_expression, unsigned int filter_expression_len, vx_error **error_out); @@ -695,7 +693,8 @@ bool vx_file_can_prune(const vx_file *file, /** * Build a new `vx_array_iterator` that returns a series of `vx_array`s from a scan over a `vx_layout_reader`. */ -vx_array_iterator *vx_file_scan(const vx_file *file, +vx_array_iterator *vx_file_scan(const vx_session *session, + const vx_file *file, const vx_file_scan_options *opts, vx_error **error_out); @@ -722,7 +721,8 @@ vx_session *vx_session_new(void); * Opens a writable array stream, where sink is used to push values into the stream. * To close the stream close the sink with `vx_array_sink_close`. */ -vx_array_sink *vx_array_sink_open_file(const char *path, +vx_array_sink *vx_array_sink_open_file(const vx_session *session, + const char *path, const vx_dtype *dtype, vx_error **error_out); diff --git a/vortex-ffi/examples/hello-vortex.c b/vortex-ffi/examples/hello-vortex.c index 06b45503ab9..192e4d73ae3 100644 --- a/vortex-ffi/examples/hello-vortex.c +++ b/vortex-ffi/examples/hello-vortex.c @@ -30,12 +30,11 @@ int main(int argc, char *argv[]) { .property_len = 0, }; - const vx_file *file = vx_file_open_reader(&open_opts, session, &error); + const vx_file *file = vx_file_open_reader(session, &open_opts, &error); if (error != NULL) { fprintf(stderr, "Failed to open file: %s\n%s", uri, vx_string_ptr(vx_error_get_message(error))); vx_error_free(error); vx_session_free(session); - vx_try_shutdown_runtime(); return -1; } @@ -52,13 +51,12 @@ int main(int argc, char *argv[]) { // Start scanning printf("\nScanning file...\n"); - vx_array_iterator *scan = vx_file_scan(file, NULL, &error); + vx_array_iterator *scan = vx_file_scan(session, file, NULL, &error); if (error != NULL) { fprintf(stderr, "Failed to create file scan iterator\n"); vx_error_free(error); vx_file_free(file); vx_session_free(session); - vx_try_shutdown_runtime(); return -1; } @@ -108,14 +106,11 @@ int main(int argc, char *argv[]) { fprintf(stderr, "Error during scan operation\n"); vx_error_free(error); vx_session_free(session); - vx_try_shutdown_runtime(); return -1; } printf("Scanning completed successfully\n"); vx_session_free(session); - // Attempt to shutdown the shared runtime for clean exit - vx_try_shutdown_runtime(); return 0; } diff --git a/vortex-ffi/examples/hello_vortex.rs b/vortex-ffi/examples/hello_vortex.rs index eb4a043065d..b720be30fbc 100644 --- a/vortex-ffi/examples/hello_vortex.rs +++ b/vortex-ffi/examples/hello_vortex.rs @@ -10,21 +10,26 @@ //!cargo run -p vortex-ffi --example hello_vortex //! ``` +use std::clone::Clone; use std::env; use std::path::{Path, PathBuf}; use std::process::Command; use std::sync::LazyLock; -use tokio::fs::File as TokioFile; -use tokio::runtime::Runtime; use vortex::arrays::{ChunkedArray, StructArray}; use vortex::buffer::Buffer; use vortex::error::{VortexResult, vortex_err}; -use vortex::file::VortexWriteOptions; +use vortex::file::WriteOptionsSessionExt; use vortex::io::VortexWrite; -use vortex::{Array, ArrayRef, IntoArray}; +use vortex::io::runtime::BlockingRuntime; +use vortex::io::runtime::current::CurrentThreadRuntime; +use vortex::io::session::RuntimeSessionExt; +use vortex::session::VortexSession; +use vortex::{Array, ArrayRef, IntoArray, VortexSessionDefault}; -static RUNTIME: LazyLock = LazyLock::new(|| Runtime::new().unwrap()); +static RUNTIME: LazyLock = LazyLock::new(CurrentThreadRuntime::new); +static SESSION: LazyLock = + LazyLock::new(|| VortexSession::default().with_handle(RUNTIME.handle())); const BIN_NAME: &str = "hello_vortex"; @@ -128,7 +133,7 @@ pub fn main() -> VortexResult<()> { } async fn write_vortex_file(path: impl AsRef) -> VortexResult<()> { - let mut file = TokioFile::create(path).await?; + let mut file = async_fs::File::create(path).await?; let chunk1 = chunk((0..1000).collect(), (0..1000).map(|x| x as f32).collect()); let chunk2 = chunk( @@ -143,7 +148,8 @@ async fn write_vortex_file(path: impl AsRef) -> VortexResult<()> { let test_data = ChunkedArray::try_new(vec![chunk1, chunk2, chunk3], dtype)?; - VortexWriteOptions::default() + SESSION + .write_options() .write(&mut file, test_data.to_array_stream()) .await?; file.shutdown().await?; diff --git a/vortex-ffi/src/file.rs b/vortex-ffi/src/file.rs index 0fe8f31c944..38d892dbec1 100644 --- a/vortex-ffi/src/file.rs +++ b/vortex-ffi/src/file.rs @@ -7,7 +7,7 @@ use std::ffi::{CStr, c_char, c_int, c_uint, c_ulong}; use std::ops::Range; use std::slice; use std::str::FromStr; -use std::sync::{Arc, LazyLock}; +use std::sync::Arc; use itertools::Itertools; use object_store::aws::{AmazonS3Builder, AmazonS3ConfigKey}; @@ -18,19 +18,23 @@ use object_store::{ObjectStore, ObjectStoreScheme}; use prost::Message; use url::Url; use vortex::error::{VortexError, VortexResult, vortex_bail, vortex_err}; +use vortex::expr::ExprRef; use vortex::expr::proto::deserialize_expr_proto; -use vortex::expr::{ExprRef, ExprRegistryExt}; -use vortex::file::{VortexFile, VortexOpenOptions, VortexWriteOptions}; -use vortex::io::runtime::tokio::TokioRuntime; +use vortex::expr::session::{ExprRegistry, ExprSessionExt}; +use vortex::file::{OpenOptionsSessionExt, VortexFile, WriteOptionsSessionExt}; +use vortex::io::runtime::BlockingRuntime; +use vortex::iter::ArrayIteratorAdapter; use vortex::proto::expr::Expr; use vortex::scan::{ScanBuilder, SplitBy}; +use vortex::session::VortexSession; +use vortex::stream::ArrayStream; use crate::array::vx_array; use crate::array_iterator::vx_array_iterator; use crate::dtype::vx_dtype; use crate::error::{try_or_default, vx_error}; -use crate::session::{FileKey, vx_session}; -use crate::{arc_wrapper, get_runtime, get_vx_runtime, to_string_vec}; +use crate::session::vx_session; +use crate::{RUNTIME, arc_wrapper, to_string_vec}; arc_wrapper!( /// A handle to a Vortex file encapsulating the footer and logic for instantiating a reader. @@ -85,11 +89,8 @@ pub struct vx_file_scan_options { pub row_offset: c_ulong, } -// FIXME(ngates): API should require a VortexSession to be passed in instead. -static EXPR_REGISTRY: LazyLock = - LazyLock::new(vortex::expr::ExprRegistry::default); - fn extract_expression( + registry: &ExprRegistry, expression: *const c_char, expression_len: c_uint, ) -> VortexResult> { @@ -98,7 +99,7 @@ fn extract_expression( unsafe { slice::from_raw_parts(expression as *const u8, expression_len as usize) }; // Decode the protobuf message. - deserialize_expr_proto(&Expr::decode(bytes)?, &EXPR_REGISTRY) + deserialize_expr_proto(&Expr::decode(bytes)?, registry) .map_err(|e| e.with_context("deserializing expr"))? })) } @@ -107,12 +108,19 @@ impl vx_file_scan_options { /// Processes FFI scan options. /// /// Extracts and converts a scan configuration from an FFI options struct. - fn process_scan_options(&self) -> VortexResult { + fn process_scan_options(&self, session: &VortexSession) -> VortexResult { // Extract field names for projection. - let projection_expr = - extract_expression(self.projection_expression, self.projection_expr_len)?; + let projection_expr = extract_expression( + session.expressions().registry(), + self.projection_expression, + self.projection_expr_len, + )?; - let filter_expr = extract_expression(self.filter_expression, self.filter_expression_len)?; + let filter_expr = extract_expression( + session.expressions().registry(), + self.filter_expression, + self.filter_expression_len, + )?; let row_range = (self.row_range_end > self.row_range_start) .then_some(self.row_range_start..self.row_range_end); @@ -133,8 +141,8 @@ impl vx_file_scan_options { /// Open a file at the given path on the file system. #[unsafe(no_mangle)] pub unsafe extern "C-unwind" fn vx_file_open_reader( - options: *const vx_file_open_options, session: *const vx_session, + options: *const vx_file_open_options, error_out: *mut *mut vx_error, ) -> *const vx_file { let session = vx_session::as_ref(session); @@ -159,47 +167,34 @@ pub unsafe extern "C-unwind" fn vx_file_open_reader( let object_store = make_object_store(&uri, &prop_keys, &prop_vals)?; - let mut file = VortexOpenOptions::new().with_handle(TokioRuntime::current()); - let mut cache_hit = false; - if let Some(footer) = session.get_footer(&FileKey { - location: uri_str.to_string(), - }) { - file = file.with_footer(footer); - cache_hit = true; - } - - let vxf = get_runtime() + let file = session.open_options(); + let vxf = RUNTIME .block_on(async move { file.open_object_store(&object_store, uri.path()).await })?; - if !cache_hit { - session.put_footer( - FileKey { - location: uri_str.to_string(), - }, - vxf.footer().clone(), - ); - } - Ok(vx_file::new(Arc::new(vxf))) }) } #[unsafe(no_mangle)] pub unsafe extern "C-unwind" fn vx_file_write_array( + session: *const vx_session, path: *const c_char, array: *const vx_array, error_out: *mut *mut vx_error, ) { let array = vx_array::as_ref(array); try_or_default(error_out, || { + let session = vx_session::as_ref(session); + let path = unsafe { CStr::from_ptr(path) } .to_str() .map_err(|e| vortex_err!("invalid utf-8: {e}"))?; - get_runtime().block_on(async { - VortexWriteOptions::default() + let options = session.write_options(); + RUNTIME.block_on(async move { + options .write( - &mut tokio::fs::File::create(path).await?, + &mut async_fs::File::create(path).await?, array.to_array_stream(), ) .await?; @@ -234,14 +229,20 @@ pub unsafe extern "C-unwind" fn vx_file_dtype(file: *const vx_file) -> *const vx /// Can we prune the whole file using file stats and an expression #[unsafe(no_mangle)] pub unsafe extern "C-unwind" fn vx_file_can_prune( + session: *const vx_session, file: *const vx_file, filter_expression: *const c_char, filter_expression_len: c_uint, error_out: *mut *mut vx_error, ) -> bool { try_or_default(error_out, || { + let session = vx_session::as_ref(session); let file = vx_file::as_ref(file); - let filter_expr = extract_expression(filter_expression, filter_expression_len)?; + let filter_expr = extract_expression( + session.expressions().registry(), + filter_expression, + filter_expression_len, + )?; Ok(filter_expr .map(|expr| file.can_prune(&expr)) .transpose()? @@ -252,21 +253,22 @@ pub unsafe extern "C-unwind" fn vx_file_can_prune( /// Build a new `vx_array_iterator` that returns a series of `vx_array`s from a scan over a `vx_layout_reader`. #[unsafe(no_mangle)] pub unsafe extern "C-unwind" fn vx_file_scan( + session: *const vx_session, file: *const vx_file, opts: *const vx_file_scan_options, error_out: *mut *mut vx_error, ) -> *mut vx_array_iterator { try_or_default(error_out, || { + let session = vx_session::as_ref(session); let file = vx_file::as_ref(file); let scan_options = unsafe { opts.as_ref() }.map_or_else( || Ok(ScanOptions::default()), - |options| options.process_scan_options(), + |options| options.process_scan_options(session), )?; let layout_reader = file.layout_reader()?; - let mut scan_builder = ScanBuilder::new(layout_reader) - .with_handle(TokioRuntime::current()) + let mut scan_builder = ScanBuilder::new(session.clone(), layout_reader) .with_row_offset(scan_options.row_offset); // Apply options if provided. @@ -286,7 +288,9 @@ pub unsafe extern "C-unwind" fn vx_file_scan( scan_builder = scan_builder.with_split_by(split_by_value); } - let iter = scan_builder.into_array_iter(&get_vx_runtime())?; + let stream = scan_builder.into_array_stream()?; + let iter = + ArrayIteratorAdapter::new(stream.dtype().clone(), RUNTIME.block_on_stream(stream)); Ok(vx_array_iterator::new(Box::new(iter))) }) diff --git a/vortex-ffi/src/lib.rs b/vortex-ffi/src/lib.rs index 34208f5fefb..95f5d92660e 100644 --- a/vortex-ffi/src/lib.rs +++ b/vortex-ffi/src/lib.rs @@ -20,59 +20,18 @@ mod string; mod struct_fields; use std::ffi::{CStr, c_char, c_int}; -use std::sync::Arc; +use std::sync::LazyLock; pub use log::vx_log_level; -use parking_lot::Mutex; -use tokio::runtime; -use tokio::runtime::Runtime; -use vortex::error::VortexExpect; -use vortex::io::runtime::tokio::TokioRuntime; +use vortex::io::runtime::current::CurrentThreadRuntime; #[cfg(all(feature = "mimalloc", not(miri)))] #[global_allocator] static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc; -// Shared runtime for all sessions; may be dropped by calling `try_shutdown_runtime` -// if no more sessions are active -static RUNTIME_STATE: Mutex>> = Mutex::new(None); - -pub(crate) fn get_runtime() -> Arc { - let mut state = RUNTIME_STATE.lock(); - - if let Some(runtime) = state.as_ref() { - runtime.clone() - } else { - let runtime = Arc::new( - runtime::Builder::new_multi_thread() - .enable_all() - .build() - .vortex_expect("Cannot start runtime"), - ); - *state = Some(runtime.clone()); - runtime - } -} - -pub(crate) fn get_vx_runtime() -> TokioRuntime { - TokioRuntime::from(get_runtime().handle()) -} - -/// Attempt to shutdown the runtime by calling `drop` if no other references exist -/// (e.g., no more VortexSessions are active). May block indefinitely if the runtime -/// is still running tasks. -pub fn try_shutdown_runtime() { - let mut state = RUNTIME_STATE.lock(); - - if let Some(runtime) = state.take() { - match Arc::try_unwrap(runtime) { - // We have the only reference, safe to shut down - Ok(runtime) => drop(runtime), - // There are other live references, so put it back - Err(runtime) => *state = Some(runtime), - } - } -} +/// A shared runtime for all FFI operations. +// TODO(ngates): also create a CurrentThreadPool to manage background worker threads. +static RUNTIME: LazyLock = LazyLock::new(CurrentThreadRuntime::new); pub(crate) unsafe fn to_string(ptr: *const c_char) -> String { let c_str = unsafe { CStr::from_ptr(ptr) }; @@ -84,10 +43,3 @@ pub(crate) unsafe fn to_string_vec(ptr: *const *const c_char, len: c_int) -> Vec .map(|i| unsafe { to_string(*ptr.offset(i as isize)) }) .collect() } - -/// Attempt to shutdown the shared tokio runtime if no sessions are active. -/// May block indefinitely if the runtime is still running tasks. -#[unsafe(no_mangle)] -pub extern "C" fn vx_try_shutdown_runtime() { - try_shutdown_runtime(); -} diff --git a/vortex-ffi/src/session.rs b/vortex-ffi/src/session.rs index 144ebebcd8c..8d3d2d937c5 100644 --- a/vortex-ffi/src/session.rs +++ b/vortex-ffi/src/session.rs @@ -1,17 +1,12 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors -use std::sync::Arc; +use vortex::VortexSessionDefault; +use vortex::io::runtime::BlockingRuntime; +use vortex::io::session::RuntimeSessionExt; +use vortex::session::VortexSession; -use moka::sync::Cache; -use vortex::dtype::DType; -use vortex::file::{Footer, SegmentSpec}; -use vortex::layout::segments::SegmentId; -use vortex::scalar::ScalarValue; -use vortex::stats::{Precision, Stat}; -use vortex::utils::aliases::DefaultHashBuilder; - -use crate::box_wrapper; +use crate::{RUNTIME, box_wrapper}; box_wrapper!( /// A handle to a Vortex session. @@ -24,77 +19,7 @@ box_wrapper!( /// The caller is responsible for freeing the session with [`vx_session_free`]. #[unsafe(no_mangle)] pub unsafe extern "C-unwind" fn vx_session_new() -> *mut vx_session { - vx_session::new(Box::new(VortexSession::new())) -} - -/// A Vortex session stores registries of extensible types, various caches, and other -/// top-level configuration. -/// -/// Extensible types include array encodings, layouts, extension dtypes, compute functions, etc. -/// -/// Multiple sessions may be created in a single process, and individual arrays are not tied to a -/// specific session. -/// -/// The session holds a reference to a shared tokio runtime. When the last session is dropped, -/// the runtime may be shut down by calling `crate::try_shutdown_runtime()`. -pub struct VortexSession { - file_cache: Cache, - _runtime: Arc, -} - -/// Cache key for a [`VortexFile`]. -#[derive(Hash, Eq, PartialEq, Debug, Clone)] -pub struct FileKey { - // TODO: support last modified ts. - pub location: String, -} - -impl VortexSession { - pub fn new() -> Self { - let file_cache = Cache::builder() - .max_capacity(64u64 * (1 << 20)) - .eviction_listener(|k: Arc, _v: Footer, cause| { - log::trace!("Removed {k:?} due to {cause:?}"); - }) - .weigher(|_k, footer| u32::try_from(estimate_layout_size(footer)).unwrap_or(u32::MAX)) - .build_with_hasher(DefaultHashBuilder::default()); - - // Get a runtime reference that will be held for the lifetime of this session - let _runtime = crate::get_runtime(); - - Self { - file_cache, - _runtime, - } - } - - pub fn get_footer(&self, file_key: &FileKey) -> Option