diff --git a/benchmarks/src/sort_tpch.rs b/benchmarks/src/sort_tpch.rs index b1997b40e09e..956bb92b6c78 100644 --- a/benchmarks/src/sort_tpch.rs +++ b/benchmarks/src/sort_tpch.rs @@ -92,7 +92,7 @@ impl RunOpt { /// Payload Columns: /// - Thin variant: `l_partkey` column with `BIGINT` type (1 column) /// - Wide variant: all columns except for possible key columns (12 columns) - const SORT_QUERIES: [&'static str; 10] = [ + const SORT_QUERIES: [&'static str; 11] = [ // Q1: 1 sort key (type: INTEGER, cardinality: 7) + 1 payload column r#" SELECT l_linenumber, l_partkey @@ -159,6 +159,12 @@ impl RunOpt { FROM lineitem ORDER BY l_orderkey, l_suppkey, l_linenumber, l_comment "#, + // Q11: 1 sort key (type: VARCHAR, cardinality: 4.5M) + 1 payload column + r#" + SELECT l_shipmode, l_comment, l_partkey + FROM lineitem + ORDER BY l_shipmode; + "#, ]; /// If query is specified from command line, run only that query. diff --git a/datafusion/core/benches/sort.rs b/datafusion/core/benches/sort.rs index 8f0b3753f67c..85f456ce5dc2 100644 --- a/datafusion/core/benches/sort.rs +++ b/datafusion/core/benches/sort.rs @@ -68,13 +68,13 @@ use std::sync::Arc; +use arrow::array::StringViewArray; use arrow::{ array::{DictionaryArray, Float64Array, Int64Array, StringArray}, compute::SortOptions, datatypes::{Int32Type, Schema}, record_batch::RecordBatch, }; - use datafusion::physical_plan::sorts::sort::SortExec; use datafusion::{ execution::context::TaskContext, @@ -114,11 +114,24 @@ fn criterion_benchmark(c: &mut Criterion) { ("f64", &f64_streams), ("utf8 low cardinality", &utf8_low_cardinality_streams), ("utf8 high cardinality", &utf8_high_cardinality_streams), + ( + "utf8 view low cardinality", + &utf8_view_low_cardinality_streams, + ), + ( + "utf8 view high cardinality", + &utf8_view_high_cardinality_streams, + ), ("utf8 tuple", &utf8_tuple_streams), + ("utf8 view tuple", &utf8_view_tuple_streams), ("utf8 dictionary", &dictionary_streams), ("utf8 dictionary tuple", &dictionary_tuple_streams), ("mixed dictionary tuple", &mixed_dictionary_tuple_streams), ("mixed tuple", &mixed_tuple_streams), + ( + "mixed tuple with utf8 view", + &mixed_tuple_with_utf8_view_streams, + ), ]; for (name, f) in cases { @@ -308,6 +321,30 @@ fn utf8_low_cardinality_streams(sorted: bool) -> PartitionedBatches { }) } +/// Create streams of random low cardinality utf8_view values +fn utf8_view_low_cardinality_streams(sorted: bool) -> PartitionedBatches { + let mut values = DataGenerator::new().utf8_low_cardinality_values(); + if sorted { + values.sort_unstable(); + } + split_tuples(values, |v| { + let array: StringViewArray = v.into_iter().collect(); + RecordBatch::try_from_iter(vec![("utf_view_low", Arc::new(array) as _)]).unwrap() + }) +} + +/// Create streams of high cardinality (~ no duplicates) utf8_view values +fn utf8_view_high_cardinality_streams(sorted: bool) -> PartitionedBatches { + let mut values = DataGenerator::new().utf8_high_cardinality_values(); + if sorted { + values.sort_unstable(); + } + split_tuples(values, |v| { + let array: StringViewArray = v.into_iter().collect(); + RecordBatch::try_from_iter(vec![("utf_view_high", Arc::new(array) as _)]).unwrap() + }) +} + /// Create streams of high cardinality (~ no duplicates) utf8 values fn utf8_high_cardinality_streams(sorted: bool) -> PartitionedBatches { let mut values = DataGenerator::new().utf8_high_cardinality_values(); @@ -353,6 +390,39 @@ fn utf8_tuple_streams(sorted: bool) -> PartitionedBatches { }) } +/// Create a batch of (utf8_view_low, utf8_view_low, utf8_view_high) +fn utf8_view_tuple_streams(sorted: bool) -> PartitionedBatches { + let mut gen = DataGenerator::new(); + + // need to sort by the combined key, so combine them together + let mut tuples: Vec<_> = gen + .utf8_low_cardinality_values() + .into_iter() + .zip(gen.utf8_low_cardinality_values()) + .zip(gen.utf8_high_cardinality_values()) + .collect(); + + if sorted { + tuples.sort_unstable(); + } + + split_tuples(tuples, |tuples| { + let (tuples, utf8_high): (Vec<_>, Vec<_>) = tuples.into_iter().unzip(); + let (utf8_low1, utf8_low2): (Vec<_>, Vec<_>) = tuples.into_iter().unzip(); + + let utf8_view_high: StringViewArray = utf8_high.into_iter().collect(); + let utf8_view_low1: StringViewArray = utf8_low1.into_iter().collect(); + let utf8_view_low2: StringViewArray = utf8_low2.into_iter().collect(); + + RecordBatch::try_from_iter(vec![ + ("utf_view_low1", Arc::new(utf8_view_low1) as _), + ("utf_view_low2", Arc::new(utf8_view_low2) as _), + ("utf_view_high", Arc::new(utf8_view_high) as _), + ]) + .unwrap() + }) +} + /// Create a batch of (f64, utf8_low, utf8_low, i64) fn mixed_tuple_streams(sorted: bool) -> PartitionedBatches { let mut gen = DataGenerator::new(); @@ -391,6 +461,44 @@ fn mixed_tuple_streams(sorted: bool) -> PartitionedBatches { }) } +/// Create a batch of (f64, utf8_view_low, utf8_view_low, i64) +fn mixed_tuple_with_utf8_view_streams(sorted: bool) -> PartitionedBatches { + let mut gen = DataGenerator::new(); + + // need to sort by the combined key, so combine them together + let mut tuples: Vec<_> = gen + .i64_values() + .into_iter() + .zip(gen.utf8_low_cardinality_values()) + .zip(gen.utf8_low_cardinality_values()) + .zip(gen.i64_values()) + .collect(); + + if sorted { + tuples.sort_unstable(); + } + + split_tuples(tuples, |tuples| { + let (tuples, i64_values): (Vec<_>, Vec<_>) = tuples.into_iter().unzip(); + let (tuples, utf8_low2): (Vec<_>, Vec<_>) = tuples.into_iter().unzip(); + let (f64_values, utf8_low1): (Vec<_>, Vec<_>) = tuples.into_iter().unzip(); + + let f64_values: Float64Array = f64_values.into_iter().map(|v| v as f64).collect(); + + let utf8_view_low1: StringViewArray = utf8_low1.into_iter().collect(); + let utf8_view_low2: StringViewArray = utf8_low2.into_iter().collect(); + let i64_values: Int64Array = i64_values.into_iter().collect(); + + RecordBatch::try_from_iter(vec![ + ("f64", Arc::new(f64_values) as _), + ("utf_view_low1", Arc::new(utf8_view_low1) as _), + ("utf_view_low2", Arc::new(utf8_view_low2) as _), + ("i64", Arc::new(i64_values) as _), + ]) + .unwrap() + }) +} + /// Create a batch of (utf8_dict) fn dictionary_streams(sorted: bool) -> PartitionedBatches { let mut gen = DataGenerator::new(); @@ -402,7 +510,6 @@ fn dictionary_streams(sorted: bool) -> PartitionedBatches { split_tuples(values, |v| { let dictionary: DictionaryArray = v.iter().map(Option::as_deref).collect(); - RecordBatch::try_from_iter(vec![("dict", Arc::new(dictionary) as _)]).unwrap() }) } diff --git a/datafusion/physical-plan/src/sorts/cursor.rs b/datafusion/physical-plan/src/sorts/cursor.rs index 8ea7c43d2613..3d3bd81948e0 100644 --- a/datafusion/physical-plan/src/sorts/cursor.rs +++ b/datafusion/physical-plan/src/sorts/cursor.rs @@ -18,8 +18,8 @@ use std::cmp::Ordering; use arrow::array::{ - types::ByteArrayType, Array, ArrowPrimitiveType, GenericByteArray, OffsetSizeTrait, - PrimitiveArray, + types::ByteArrayType, Array, ArrowPrimitiveType, GenericByteArray, + GenericByteViewArray, OffsetSizeTrait, PrimitiveArray, StringViewArray, }; use arrow::buffer::{Buffer, OffsetBuffer, ScalarBuffer}; use arrow::compute::SortOptions; @@ -281,6 +281,59 @@ impl CursorArray for GenericByteArray { } } +impl CursorArray for StringViewArray { + type Values = StringViewArray; + fn values(&self) -> Self { + self.clone() + } +} + +impl CursorValues for StringViewArray { + fn len(&self) -> usize { + self.views().len() + } + + fn eq(l: &Self, l_idx: usize, r: &Self, r_idx: usize) -> bool { + // SAFETY: Both l_idx and r_idx are guaranteed to be within bounds, + // and any null-checks are handled in the outer layers. + // Fast path: Compare the lengths before full byte comparison. + + let l_view = unsafe { l.views().get_unchecked(l_idx) }; + let l_len = *l_view as u32; + let r_view = unsafe { r.views().get_unchecked(r_idx) }; + let r_len = *r_view as u32; + if l_len != r_len { + return false; + } + + unsafe { GenericByteViewArray::compare_unchecked(l, l_idx, r, r_idx).is_eq() } + } + + fn eq_to_previous(cursor: &Self, idx: usize) -> bool { + // SAFETY: The caller guarantees that idx > 0 and the indices are valid. + // Already checked it in is_eq_to_prev_one function + // Fast path: Compare the lengths of the current and previous views. + let l_view = unsafe { cursor.views().get_unchecked(idx) }; + let l_len = *l_view as u32; + let r_view = unsafe { cursor.views().get_unchecked(idx - 1) }; + let r_len = *r_view as u32; + if l_len != r_len { + return false; + } + + unsafe { + GenericByteViewArray::compare_unchecked(cursor, idx, cursor, idx - 1).is_eq() + } + } + + fn compare(l: &Self, l_idx: usize, r: &Self, r_idx: usize) -> Ordering { + // SAFETY: Prior assertions guarantee that l_idx and r_idx are valid indices. + // Null-checks are assumed to have been handled in the wrapper (e.g., ArrayValues). + // And the bound is checked in is_finished, it is safe to call get_unchecked + unsafe { GenericByteViewArray::compare_unchecked(l, l_idx, r, r_idx) } + } +} + /// A collection of sorted, nullable [`CursorValues`] /// /// Note: comparing cursors with different `SortOptions` will yield an arbitrary ordering diff --git a/datafusion/physical-plan/src/sorts/streaming_merge.rs b/datafusion/physical-plan/src/sorts/streaming_merge.rs index a541f79dc717..3f022ec6095a 100644 --- a/datafusion/physical-plan/src/sorts/streaming_merge.rs +++ b/datafusion/physical-plan/src/sorts/streaming_merge.rs @@ -177,6 +177,7 @@ impl<'a> StreamingMergeBuilder<'a> { downcast_primitive! { data_type => (primitive_merge_helper, sort, streams, schema, metrics, batch_size, fetch, reservation, enable_round_robin_tie_breaker), DataType::Utf8 => merge_helper!(StringArray, sort, streams, schema, metrics, batch_size, fetch, reservation, enable_round_robin_tie_breaker) + DataType::Utf8View => merge_helper!(StringViewArray, sort, streams, schema, metrics, batch_size, fetch, reservation, enable_round_robin_tie_breaker) DataType::LargeUtf8 => merge_helper!(LargeStringArray, sort, streams, schema, metrics, batch_size, fetch, reservation, enable_round_robin_tie_breaker) DataType::Binary => merge_helper!(BinaryArray, sort, streams, schema, metrics, batch_size, fetch, reservation, enable_round_robin_tie_breaker) DataType::LargeBinary => merge_helper!(LargeBinaryArray, sort, streams, schema, metrics, batch_size, fetch, reservation, enable_round_robin_tie_breaker)