Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Perf: Support Utf8View datatype single column comparisons for SortPreservingMergeStream #15348

Merged
merged 5 commits into from
Mar 25, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 29 additions & 2 deletions datafusion/physical-plan/src/sorts/cursor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
use std::cmp::Ordering;

use arrow::array::{
types::ByteArrayType, Array, ArrowPrimitiveType, GenericByteArray, OffsetSizeTrait,
PrimitiveArray,
types::ByteArrayType, Array, ArrowPrimitiveType, GenericByteArray,
GenericByteViewArray, OffsetSizeTrait, PrimitiveArray, StringViewArray,
};
use arrow::buffer::{Buffer, OffsetBuffer, ScalarBuffer};
use arrow::compute::SortOptions;
Expand Down Expand Up @@ -281,6 +281,33 @@ impl<T: ByteArrayType> CursorArray for GenericByteArray<T> {
}
}

impl CursorArray for StringViewArray {
type Values = StringViewArray;
fn values(&self) -> Self {
self.clone()
}
}

impl CursorValues for StringViewArray {
fn len(&self) -> usize {
self.views().len()
}

fn eq(l: &Self, l_idx: usize, r: &Self, r_idx: usize) -> bool {
unsafe { GenericByteViewArray::compare_unchecked(l, l_idx, r, r_idx).is_eq() }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a 'safety:' note to say why is is ok to use unsafe here. An example

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @Omega359 for review, good example, i will address it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree it would be good to justify the use of unchecked (which I think is ok here)

The docs say https://docs.rs/arrow/latest/arrow/array/struct.GenericByteViewArray.html#method.compare_unchecked

SO maybe the safety argument is mostly "The left/right_idx must within range of each array"

It also seems like we need to be comparing the Null masks too 🤔 like checking if the values are null before comparing

Given that this comparison is typically the hottest part of a merge operation maybe we should try using unchecked comparisions elswhere

}

fn eq_to_previous(cursor: &Self, idx: usize) -> bool {
unsafe {
GenericByteViewArray::compare_unchecked(cursor, idx, cursor, idx - 1).is_eq()
}
}

fn compare(l: &Self, l_idx: usize, r: &Self, r_idx: usize) -> Ordering {
unsafe { GenericByteViewArray::compare_unchecked(l, l_idx, r, r_idx) }
}
}

/// A collection of sorted, nullable [`CursorValues`]
///
/// Note: comparing cursors with different `SortOptions` will yield an arbitrary ordering
Expand Down
1 change: 1 addition & 0 deletions datafusion/physical-plan/src/sorts/streaming_merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ impl<'a> StreamingMergeBuilder<'a> {
downcast_primitive! {
data_type => (primitive_merge_helper, sort, streams, schema, metrics, batch_size, fetch, reservation, enable_round_robin_tie_breaker),
DataType::Utf8 => merge_helper!(StringArray, sort, streams, schema, metrics, batch_size, fetch, reservation, enable_round_robin_tie_breaker)
DataType::Utf8View => merge_helper!(StringViewArray, sort, streams, schema, metrics, batch_size, fetch, reservation, enable_round_robin_tie_breaker)
DataType::LargeUtf8 => merge_helper!(LargeStringArray, sort, streams, schema, metrics, batch_size, fetch, reservation, enable_round_robin_tie_breaker)
DataType::Binary => merge_helper!(BinaryArray, sort, streams, schema, metrics, batch_size, fetch, reservation, enable_round_robin_tie_breaker)
DataType::LargeBinary => merge_helper!(LargeBinaryArray, sort, streams, schema, metrics, batch_size, fetch, reservation, enable_round_robin_tie_breaker)
Expand Down