Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
180 changes: 180 additions & 0 deletions arrow-select/src/interleave.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use arrow_array::cast::AsArray;
use arrow_array::types::*;
use arrow_array::*;
use arrow_buffer::{ArrowNativeType, BooleanBuffer, MutableBuffer, NullBuffer, OffsetBuffer};
use arrow_data::ArrayData;
use arrow_data::ByteView;
use arrow_data::transform::MutableArrayData;
use arrow_schema::{ArrowError, DataType, FieldRef, Fields};
Expand Down Expand Up @@ -108,6 +109,12 @@ pub fn interleave(
DataType::Struct(fields) => interleave_struct(fields, values, indices),
DataType::List(field) => interleave_list::<i32>(values, indices, field),
DataType::LargeList(field) => interleave_list::<i64>(values, indices, field),
DataType::RunEndEncoded(r, _) => match r.data_type() {
DataType::Int16 => interleave_run_end::<Int16Type>(values, indices),
DataType::Int32 => interleave_run_end::<Int32Type>(values, indices),
DataType::Int64 => interleave_run_end::<Int64Type>(values, indices),
t => unreachable!("illegal run-end type {t}"),
},
_ => interleave_fallback(values, indices)
}
}
Expand Down Expand Up @@ -411,6 +418,78 @@ fn interleave_list<O: OffsetSizeTrait>(
Ok(Arc::new(list_array))
}

/// Specialized [`interleave`] for [`RunArray`].
fn interleave_run_end<R: RunEndIndexType>(
values: &[&dyn Array],
indices: &[(usize, usize)],
) -> Result<ArrayRef, ArrowError> {
if indices.is_empty() {
return Ok(new_empty_array(values[0].data_type()));
}

let n = indices.len();
R::Native::from_usize(n).ok_or_else(|| {
ArrowError::ComputeError(format!(
"interleave_run_end: output length {n} does not fit run-end type"
))
})?;

let runs: Vec<&RunArray<R>> = values.iter().map(|a| a.as_run::<R>()).collect();
let value_arrays: Vec<&dyn Array> = runs.iter().map(|r| r.values().as_ref()).collect();
// hoist value data
let value_data: Vec<ArrayData> = value_arrays.iter().map(|a| a.to_data()).collect();

// Resolve each (array, logical_row) to (array, physical_row), so we can
// lookup physical indices by batch.
let mut phys_pairs: Vec<(usize, usize)> = vec![(0, 0); n];
let mut grouped: Vec<(Vec<R::Native>, Vec<usize>)> =
(0..runs.len()).map(|_| (Vec::new(), Vec::new())).collect();
for (out_pos, &(arr, row)) in indices.iter().enumerate() {
if row >= runs[arr].len() {
return Err(ArrowError::InvalidArgumentError(format!(
"interleave_run_end: row index {row} out of range for array {arr} with length {}",
runs[arr].len()
)));
};
let row = R::Native::from_usize(row).ok_or_else(|| {
ArrowError::InvalidArgumentError(format!(
"interleave_run_end: row index {row} out of range"
))
})?;
grouped[arr].0.push(row);
grouped[arr].1.push(out_pos);
}
for (arr_idx, (logical_rows, out_positions)) in grouped.into_iter().enumerate() {
let phys = runs[arr_idx].get_physical_indices(&logical_rows)?;
for (p, out_pos) in phys.iter().zip(out_positions.iter()) {
phys_pairs[*out_pos] = (arr_idx, *p);
}
}
let mut run_ends_buf: Vec<R::Native> = Vec::with_capacity(n);
let mut dedup_pairs: Vec<(usize, usize)> = Vec::with_capacity(n);
dedup_pairs.push(phys_pairs[0]);
for i in 1..n {
// this doesnt check for equality of values.
let (prev_arr, prev_row) = phys_pairs[i - 1];
let (curr_arr, curr_row) = phys_pairs[i];
let prev_value = value_data[prev_arr].slice(prev_row, 1);
let current_value = value_data[curr_arr].slice(curr_row, 1);
if phys_pairs[i] != phys_pairs[i - 1] && prev_value != current_value {
run_ends_buf.push(R::Native::from_usize(i).unwrap());
dedup_pairs.push(phys_pairs[i]);
}
}
run_ends_buf.push(R::Native::from_usize(n).unwrap());

let taken_values = interleave(&value_arrays, &dedup_pairs)?;
let run_ends = PrimitiveArray::<R>::from_iter_values(run_ends_buf);

Ok(Arc::new(RunArray::<R>::try_new(
&run_ends,
taken_values.as_ref(),
)?))
}

/// Fallback implementation of interleave using [`MutableArrayData`]
fn interleave_fallback(
values: &[&dyn Array],
Expand Down Expand Up @@ -1544,4 +1623,105 @@ mod tests {
&[3]
);
}
#[test]
fn test_interleave_run_end_encoded_merges_identical_runs() {
let mut builder = PrimitiveRunBuilder::<Int32Type, Int32Type>::new();
builder.extend([0, 0, 0, 1, 1, 0, 0, 1, 1, 1].into_iter().map(Some));
let a = builder.finish();

let mut builder = PrimitiveRunBuilder::<Int32Type, Int32Type>::new();
builder.extend([2, 2, 1, 1, 1, 0, 1, 0, 0, 0].into_iter().map(Some));
let b = builder.finish();

// logical: [1, 1, 1, 1, 1] across an a→b boundary; should compact to one run.
let result: Arc<dyn Array> =
interleave(&[&a, &b], &[(0, 3), (0, 4), (1, 2), (1, 3), (1, 4)]).unwrap();
let result = result.as_run::<Int32Type>();

assert_eq!(result.run_ends().values(), &[5]);
let values = result.values().as_primitive::<Int32Type>();
assert_eq!(values.values(), &[1]);
}
#[test]
fn test_break_intent() {
let mut builder = PrimitiveRunBuilder::<Int16Type, Int16Type>::new();
builder.extend([0, 0, 0, 1, 1, 0, 0, 1, 1, 1].into_iter().map(Some));
let a = builder.finish();

let mut builder = PrimitiveRunBuilder::<Int16Type, Int16Type>::new();
builder.extend([2, 2, 1, 1, 1, 0, 1, 0, 0, 0].into_iter().map(Some));
let b = builder.finish();

// logical: [1, 1, 1, 1, 1] across an a→b boundary; should compact to one run.
// greater than int16::max
assert!(interleave(&[&a, &b], &[(0, 32766), (0, 4), (1, 2), (1, 3), (1, 4)]).is_err())
}
#[test]
fn test_interleave_run_end_encoded_partial_compaction() {
let mut builder = PrimitiveRunBuilder::<Int32Type, Int32Type>::new();
builder.extend([1, 1, 2, 2].into_iter().map(Some));
let a = builder.finish();

let mut builder = PrimitiveRunBuilder::<Int32Type, Int32Type>::new();
builder.extend([1, 1, 2, 2].into_iter().map(Some));
let b = builder.finish();

// logical: [1, 1, 1, 2, 2, 1, 1] — fallback emits 5 raw runs;
// compaction must merge adjacent equal pairs but keep the trailing 1s
// distinct from the leading 1s (separated by 2s).
let indices = &[(0, 0), (0, 1), (1, 0), (0, 2), (1, 3), (1, 0), (1, 1)];
let result = interleave(&[&a, &b], indices).unwrap();
let result = result.as_run::<Int32Type>();

assert_eq!(result.run_ends().values(), &[3, 5, 7]);
let values = result.values().as_primitive::<Int32Type>();
assert_eq!(values.values(), &[1, 2, 1]);
}
#[test]
fn test_interleave_run_end_encoded_pulls_identical_values() {
use arrow_array::builder::StringRunBuilder;

let mut builder = StringRunBuilder::<Int32Type>::new();
builder.extend(
[
"alice", "alice", "bob", "bob", "charlie", "charlie", "david",
]
.into_iter()
.map(Some),
);
let a = builder.finish();

let mut builder = StringRunBuilder::<Int32Type>::new();
builder.extend(
["alice", "bob", "charlie", "david", "david", "eve"]
.into_iter()
.map(Some),
);
let b = builder.finish();

// logical: ["alice","alice","bob","bob","charlie","charlie","david","david","david","alice","alice"]
let result = interleave(
&[&a, &b],
&[
(0, 0),
(1, 0),
(0, 2),
(1, 1),
(0, 4),
(1, 2),
(0, 6),
(1, 3),
(1, 4),
(0, 0),
(1, 0),
],
)
.unwrap();
let result = result.as_run::<Int32Type>();

assert_eq!(result.run_ends().values(), &[2, 4, 6, 9, 11]);
let values = result.values().as_string::<i32>();
let values: Vec<_> = values.into_iter().map(Option::unwrap).collect();
assert_eq!(values, vec!["alice", "bob", "charlie", "david", "alice"]);
}
}
Loading