Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 71 additions & 0 deletions arrow-select/src/interleave.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,12 @@ pub fn interleave(
DataType::Struct(fields) => interleave_struct(fields, values, indices),
DataType::List(field) => interleave_list::<i32>(values, indices, field),
DataType::LargeList(field) => interleave_list::<i64>(values, indices, field),
DataType::RunEndEncoded(r, _) => match r.data_type() {
DataType::Int16 => interleave_run_end::<Int16Type>(values, indices),
DataType::Int32 => interleave_run_end::<Int32Type>(values, indices),
DataType::Int64 => interleave_run_end::<Int64Type>(values, indices),
t => unreachable!("illegal run-end type {t}"),
},
_ => interleave_fallback(values, indices)
}
}
Expand Down Expand Up @@ -411,6 +417,71 @@ fn interleave_list<O: OffsetSizeTrait>(
Ok(Arc::new(list_array))
}

/// Specialized [`interleave`] for [`RunArray`].
fn interleave_run_end<R: RunEndIndexType>(
values: &[&dyn Array],
indices: &[(usize, usize)],
) -> Result<ArrayRef, ArrowError> {
if indices.is_empty() {
return Ok(new_empty_array(values[0].data_type()));
}

let n = indices.len();
R::Native::from_usize(n).ok_or_else(|| {
Comment thread
Jefffrey marked this conversation as resolved.
ArrowError::ComputeError(format!(
"interleave_run_end: output length {n} does not fit run-end type"
))
})?;

let runs: Vec<&RunArray<R>> = values.iter().map(|a| a.as_run::<R>()).collect();
let value_arrays: Vec<&dyn Array> = runs.iter().map(|r| r.values().as_ref()).collect();

// Resolve each (array, logical_row) to (array, physical_row), so we can
// lookup physical indices by batch.
let mut phys_pairs: Vec<(usize, usize)> = vec![(0, 0); n];
let mut grouped: Vec<(Vec<R::Native>, Vec<usize>)> =
(0..runs.len()).map(|_| (Vec::new(), Vec::new())).collect();
for (out_pos, &(arr, row)) in indices.iter().enumerate() {
let row = R::Native::from_usize(row).ok_or_else(|| {
ArrowError::InvalidArgumentError(format!(
"interleave_run_end: row index {row} not representable as run-end type {}",
R::DATA_TYPE
))
})?;
grouped[arr].0.push(row);
grouped[arr].1.push(out_pos);
}
for (arr_idx, (logical_rows, out_positions)) in grouped.into_iter().enumerate() {
let phys = runs[arr_idx].get_physical_indices(&logical_rows)?;
for (p, out_pos) in phys.iter().zip(out_positions.iter()) {
phys_pairs[*out_pos] = (arr_idx, *p);
}
}

// Coalesce by physical-pair equality only: emit a new run when the
// (array_idx, physical_idx) pair changes between adjacent output rows.
// TODO: We could perform an equality check across sources to extend the
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose this is what #9865 (and its issue #7710) are meant to address?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, exactly. That PR would make sense in this block so we don't compact in the interleave fallback. This also means that the equality cost is only paid when interleave pairs select from different input run arrays (assumption is input run arrays are well formed). I'm concerned about the per-row slicing cost though. I think ideally you would have a cache of comparators but I believe that require some crate readjusting.

// output run, but we can't call make_comparator from this crate.
let mut run_ends_buf: Vec<R::Native> = Vec::with_capacity(n);
let mut dedup_pairs: Vec<(usize, usize)> = Vec::with_capacity(n);
dedup_pairs.push(phys_pairs[0]);
for i in 1..n {
if phys_pairs[i] != phys_pairs[i - 1] {
run_ends_buf.push(R::Native::from_usize(i).unwrap());
dedup_pairs.push(phys_pairs[i]);
}
}
run_ends_buf.push(R::Native::from_usize(n).unwrap());

let taken_values = interleave(&value_arrays, &dedup_pairs)?;
let run_ends = PrimitiveArray::<R>::from_iter_values(run_ends_buf);

Ok(Arc::new(RunArray::<R>::try_new(
&run_ends,
taken_values.as_ref(),
)?))
}

/// Fallback implementation of interleave using [`MutableArrayData`]
fn interleave_fallback(
values: &[&dyn Array],
Expand Down
Loading