Skip to content

Commit a05514c

Browse files
authored
Add support for DISTINCT + ORDER BY in ARRAY_AGG (#14413)
* Add support for DISTINCT and ORDER BY in ARRAY_AGG * Add DISTINCT + ORDER BY docs * Add some more sqllogictests * Update aggregate_functions.md
1 parent 6ccad40 commit a05514c

File tree

4 files changed

+623
-136
lines changed

4 files changed

+623
-136
lines changed

datafusion/functions-aggregate-common/src/merge_arrays.rs

+146
Original file line numberDiff line numberDiff line change
@@ -193,3 +193,149 @@ pub fn merge_ordered_arrays(
193193

194194
Ok((merged_values, merged_orderings))
195195
}
196+
197+
#[cfg(test)]
198+
mod tests {
199+
use super::*;
200+
201+
use std::collections::VecDeque;
202+
use std::sync::Arc;
203+
204+
use arrow::array::{ArrayRef, Int64Array};
205+
206+
use datafusion_common::utils::get_row_at_idx;
207+
use datafusion_common::{Result, ScalarValue};
208+
209+
#[test]
210+
fn test_merge_asc() -> Result<()> {
211+
let lhs_arrays: Vec<ArrayRef> = vec![
212+
Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2])),
213+
Arc::new(Int64Array::from(vec![0, 1, 2, 3, 4])),
214+
];
215+
let n_row = lhs_arrays[0].len();
216+
let lhs_orderings = (0..n_row)
217+
.map(|idx| get_row_at_idx(&lhs_arrays, idx))
218+
.collect::<Result<VecDeque<_>>>()?;
219+
220+
let rhs_arrays: Vec<ArrayRef> = vec![
221+
Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2])),
222+
Arc::new(Int64Array::from(vec![0, 1, 2, 3, 4])),
223+
];
224+
let n_row = rhs_arrays[0].len();
225+
let rhs_orderings = (0..n_row)
226+
.map(|idx| get_row_at_idx(&rhs_arrays, idx))
227+
.collect::<Result<VecDeque<_>>>()?;
228+
let sort_options = vec![
229+
SortOptions {
230+
descending: false,
231+
nulls_first: false,
232+
},
233+
SortOptions {
234+
descending: false,
235+
nulls_first: false,
236+
},
237+
];
238+
239+
let lhs_vals_arr = Arc::new(Int64Array::from(vec![0, 1, 2, 3, 4])) as ArrayRef;
240+
let lhs_vals = (0..lhs_vals_arr.len())
241+
.map(|idx| ScalarValue::try_from_array(&lhs_vals_arr, idx))
242+
.collect::<Result<VecDeque<_>>>()?;
243+
244+
let rhs_vals_arr = Arc::new(Int64Array::from(vec![0, 1, 2, 3, 4])) as ArrayRef;
245+
let rhs_vals = (0..rhs_vals_arr.len())
246+
.map(|idx| ScalarValue::try_from_array(&rhs_vals_arr, idx))
247+
.collect::<Result<VecDeque<_>>>()?;
248+
let expected =
249+
Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2, 2, 3, 3, 4, 4])) as ArrayRef;
250+
let expected_ts = vec![
251+
Arc::new(Int64Array::from(vec![0, 0, 0, 0, 1, 1, 1, 1, 2, 2])) as ArrayRef,
252+
Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2, 2, 3, 3, 4, 4])) as ArrayRef,
253+
];
254+
255+
let (merged_vals, merged_ts) = merge_ordered_arrays(
256+
&mut [lhs_vals, rhs_vals],
257+
&mut [lhs_orderings, rhs_orderings],
258+
&sort_options,
259+
)?;
260+
let merged_vals = ScalarValue::iter_to_array(merged_vals.into_iter())?;
261+
let merged_ts = (0..merged_ts[0].len())
262+
.map(|col_idx| {
263+
ScalarValue::iter_to_array(
264+
(0..merged_ts.len())
265+
.map(|row_idx| merged_ts[row_idx][col_idx].clone()),
266+
)
267+
})
268+
.collect::<Result<Vec<_>>>()?;
269+
270+
assert_eq!(&merged_vals, &expected);
271+
assert_eq!(&merged_ts, &expected_ts);
272+
273+
Ok(())
274+
}
275+
276+
#[test]
277+
fn test_merge_desc() -> Result<()> {
278+
let lhs_arrays: Vec<ArrayRef> = vec![
279+
Arc::new(Int64Array::from(vec![2, 1, 1, 0, 0])),
280+
Arc::new(Int64Array::from(vec![4, 3, 2, 1, 0])),
281+
];
282+
let n_row = lhs_arrays[0].len();
283+
let lhs_orderings = (0..n_row)
284+
.map(|idx| get_row_at_idx(&lhs_arrays, idx))
285+
.collect::<Result<VecDeque<_>>>()?;
286+
287+
let rhs_arrays: Vec<ArrayRef> = vec![
288+
Arc::new(Int64Array::from(vec![2, 1, 1, 0, 0])),
289+
Arc::new(Int64Array::from(vec![4, 3, 2, 1, 0])),
290+
];
291+
let n_row = rhs_arrays[0].len();
292+
let rhs_orderings = (0..n_row)
293+
.map(|idx| get_row_at_idx(&rhs_arrays, idx))
294+
.collect::<Result<VecDeque<_>>>()?;
295+
let sort_options = vec![
296+
SortOptions {
297+
descending: true,
298+
nulls_first: false,
299+
},
300+
SortOptions {
301+
descending: true,
302+
nulls_first: false,
303+
},
304+
];
305+
306+
// Values (which will be merged) doesn't have to be ordered.
307+
let lhs_vals_arr = Arc::new(Int64Array::from(vec![0, 1, 2, 1, 2])) as ArrayRef;
308+
let lhs_vals = (0..lhs_vals_arr.len())
309+
.map(|idx| ScalarValue::try_from_array(&lhs_vals_arr, idx))
310+
.collect::<Result<VecDeque<_>>>()?;
311+
312+
let rhs_vals_arr = Arc::new(Int64Array::from(vec![0, 1, 2, 1, 2])) as ArrayRef;
313+
let rhs_vals = (0..rhs_vals_arr.len())
314+
.map(|idx| ScalarValue::try_from_array(&rhs_vals_arr, idx))
315+
.collect::<Result<VecDeque<_>>>()?;
316+
let expected =
317+
Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2, 2, 1, 1, 2, 2])) as ArrayRef;
318+
let expected_ts = vec![
319+
Arc::new(Int64Array::from(vec![2, 2, 1, 1, 1, 1, 0, 0, 0, 0])) as ArrayRef,
320+
Arc::new(Int64Array::from(vec![4, 4, 3, 3, 2, 2, 1, 1, 0, 0])) as ArrayRef,
321+
];
322+
let (merged_vals, merged_ts) = merge_ordered_arrays(
323+
&mut [lhs_vals, rhs_vals],
324+
&mut [lhs_orderings, rhs_orderings],
325+
&sort_options,
326+
)?;
327+
let merged_vals = ScalarValue::iter_to_array(merged_vals.into_iter())?;
328+
let merged_ts = (0..merged_ts[0].len())
329+
.map(|col_idx| {
330+
ScalarValue::iter_to_array(
331+
(0..merged_ts.len())
332+
.map(|row_idx| merged_ts[row_idx][col_idx].clone()),
333+
)
334+
})
335+
.collect::<Result<Vec<_>>>()?;
336+
337+
assert_eq!(&merged_vals, &expected);
338+
assert_eq!(&merged_ts, &expected_ts);
339+
Ok(())
340+
}
341+
}

0 commit comments

Comments
 (0)