diff --git a/arrow-select/src/interleave.rs b/arrow-select/src/interleave.rs index f5904bc171ee..1bd34a7f3a58 100644 --- a/arrow-select/src/interleave.rs +++ b/arrow-select/src/interleave.rs @@ -108,6 +108,12 @@ pub fn interleave( DataType::Struct(fields) => interleave_struct(fields, values, indices), DataType::List(field) => interleave_list::(values, indices, field), DataType::LargeList(field) => interleave_list::(values, indices, field), + DataType::RunEndEncoded(r, _) => match r.data_type() { + DataType::Int16 => interleave_run_end::(values, indices), + DataType::Int32 => interleave_run_end::(values, indices), + DataType::Int64 => interleave_run_end::(values, indices), + t => unreachable!("illegal run-end type {t}"), + }, _ => interleave_fallback(values, indices) } } @@ -411,6 +417,71 @@ fn interleave_list( Ok(Arc::new(list_array)) } +/// Specialized [`interleave`] for [`RunArray`]. +fn interleave_run_end( + values: &[&dyn Array], + indices: &[(usize, usize)], +) -> Result { + if indices.is_empty() { + return Ok(new_empty_array(values[0].data_type())); + } + + let n = indices.len(); + R::Native::from_usize(n).ok_or_else(|| { + ArrowError::ComputeError(format!( + "interleave_run_end: output length {n} does not fit run-end type" + )) + })?; + + let runs: Vec<&RunArray> = values.iter().map(|a| a.as_run::()).collect(); + let value_arrays: Vec<&dyn Array> = runs.iter().map(|r| r.values().as_ref()).collect(); + + // Resolve each (array, logical_row) to (array, physical_row), so we can + // lookup physical indices by batch. + let mut phys_pairs: Vec<(usize, usize)> = vec![(0, 0); n]; + let mut grouped: Vec<(Vec, Vec)> = + (0..runs.len()).map(|_| (Vec::new(), Vec::new())).collect(); + for (out_pos, &(arr, row)) in indices.iter().enumerate() { + let row = R::Native::from_usize(row).ok_or_else(|| { + ArrowError::InvalidArgumentError(format!( + "interleave_run_end: row index {row} not representable as run-end type {}", + R::DATA_TYPE + )) + })?; + grouped[arr].0.push(row); + grouped[arr].1.push(out_pos); + } + for (arr_idx, (logical_rows, out_positions)) in grouped.into_iter().enumerate() { + let phys = runs[arr_idx].get_physical_indices(&logical_rows)?; + for (p, out_pos) in phys.iter().zip(out_positions.iter()) { + phys_pairs[*out_pos] = (arr_idx, *p); + } + } + + // Coalesce by physical-pair equality only: emit a new run when the + // (array_idx, physical_idx) pair changes between adjacent output rows. + // TODO: We could perform an equality check across sources to extend the + // output run, but we can't call make_comparator from this crate. + let mut run_ends_buf: Vec = Vec::with_capacity(n); + let mut dedup_pairs: Vec<(usize, usize)> = Vec::with_capacity(n); + dedup_pairs.push(phys_pairs[0]); + for i in 1..n { + if phys_pairs[i] != phys_pairs[i - 1] { + run_ends_buf.push(R::Native::from_usize(i).unwrap()); + dedup_pairs.push(phys_pairs[i]); + } + } + run_ends_buf.push(R::Native::from_usize(n).unwrap()); + + let taken_values = interleave(&value_arrays, &dedup_pairs)?; + let run_ends = PrimitiveArray::::from_iter_values(run_ends_buf); + + Ok(Arc::new(RunArray::::try_new( + &run_ends, + taken_values.as_ref(), + )?)) +} + /// Fallback implementation of interleave using [`MutableArrayData`] fn interleave_fallback( values: &[&dyn Array],