From f118dd2eb09236495a89ba61633127c7db13602d Mon Sep 17 00:00:00 2001 From: gabotechs Date: Fri, 31 Jan 2025 14:08:26 +0100 Subject: [PATCH 1/4] Add support for DISTINCT and ORDER BY in ARRAY_AGG --- .../src/merge_arrays.rs | 146 +++++ .../functions-aggregate/src/array_agg.rs | 548 +++++++++++++----- .../sqllogictest/test_files/aggregate.slt | 29 + 3 files changed, 590 insertions(+), 133 deletions(-) diff --git a/datafusion/functions-aggregate-common/src/merge_arrays.rs b/datafusion/functions-aggregate-common/src/merge_arrays.rs index 9b9a1240c1a1..0cfea662497e 100644 --- a/datafusion/functions-aggregate-common/src/merge_arrays.rs +++ b/datafusion/functions-aggregate-common/src/merge_arrays.rs @@ -193,3 +193,149 @@ pub fn merge_ordered_arrays( Ok((merged_values, merged_orderings)) } + +#[cfg(test)] +mod tests { + use super::*; + + use std::collections::VecDeque; + use std::sync::Arc; + + use arrow::array::{ArrayRef, Int64Array}; + + use datafusion_common::utils::get_row_at_idx; + use datafusion_common::{Result, ScalarValue}; + + #[test] + fn test_merge_asc() -> Result<()> { + let lhs_arrays: Vec = vec![ + Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2])), + Arc::new(Int64Array::from(vec![0, 1, 2, 3, 4])), + ]; + let n_row = lhs_arrays[0].len(); + let lhs_orderings = (0..n_row) + .map(|idx| get_row_at_idx(&lhs_arrays, idx)) + .collect::>>()?; + + let rhs_arrays: Vec = vec![ + Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2])), + Arc::new(Int64Array::from(vec![0, 1, 2, 3, 4])), + ]; + let n_row = rhs_arrays[0].len(); + let rhs_orderings = (0..n_row) + .map(|idx| get_row_at_idx(&rhs_arrays, idx)) + .collect::>>()?; + let sort_options = vec![ + SortOptions { + descending: false, + nulls_first: false, + }, + SortOptions { + descending: false, + nulls_first: false, + }, + ]; + + let lhs_vals_arr = Arc::new(Int64Array::from(vec![0, 1, 2, 3, 4])) as ArrayRef; + let lhs_vals = (0..lhs_vals_arr.len()) + .map(|idx| ScalarValue::try_from_array(&lhs_vals_arr, idx)) + .collect::>>()?; + + let rhs_vals_arr = Arc::new(Int64Array::from(vec![0, 1, 2, 3, 4])) as ArrayRef; + let rhs_vals = (0..rhs_vals_arr.len()) + .map(|idx| ScalarValue::try_from_array(&rhs_vals_arr, idx)) + .collect::>>()?; + let expected = + Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2, 2, 3, 3, 4, 4])) as ArrayRef; + let expected_ts = vec![ + Arc::new(Int64Array::from(vec![0, 0, 0, 0, 1, 1, 1, 1, 2, 2])) as ArrayRef, + Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2, 2, 3, 3, 4, 4])) as ArrayRef, + ]; + + let (merged_vals, merged_ts) = merge_ordered_arrays( + &mut [lhs_vals, rhs_vals], + &mut [lhs_orderings, rhs_orderings], + &sort_options, + )?; + let merged_vals = ScalarValue::iter_to_array(merged_vals.into_iter())?; + let merged_ts = (0..merged_ts[0].len()) + .map(|col_idx| { + ScalarValue::iter_to_array( + (0..merged_ts.len()) + .map(|row_idx| merged_ts[row_idx][col_idx].clone()), + ) + }) + .collect::>>()?; + + assert_eq!(&merged_vals, &expected); + assert_eq!(&merged_ts, &expected_ts); + + Ok(()) + } + + #[test] + fn test_merge_desc() -> Result<()> { + let lhs_arrays: Vec = vec![ + Arc::new(Int64Array::from(vec![2, 1, 1, 0, 0])), + Arc::new(Int64Array::from(vec![4, 3, 2, 1, 0])), + ]; + let n_row = lhs_arrays[0].len(); + let lhs_orderings = (0..n_row) + .map(|idx| get_row_at_idx(&lhs_arrays, idx)) + .collect::>>()?; + + let rhs_arrays: Vec = vec![ + Arc::new(Int64Array::from(vec![2, 1, 1, 0, 0])), + Arc::new(Int64Array::from(vec![4, 3, 2, 1, 0])), + ]; + let n_row = rhs_arrays[0].len(); + let rhs_orderings = (0..n_row) + .map(|idx| get_row_at_idx(&rhs_arrays, idx)) + .collect::>>()?; + let sort_options = vec![ + SortOptions { + descending: true, + nulls_first: false, + }, + SortOptions { + descending: true, + nulls_first: false, + }, + ]; + + // Values (which will be merged) doesn't have to be ordered. + let lhs_vals_arr = Arc::new(Int64Array::from(vec![0, 1, 2, 1, 2])) as ArrayRef; + let lhs_vals = (0..lhs_vals_arr.len()) + .map(|idx| ScalarValue::try_from_array(&lhs_vals_arr, idx)) + .collect::>>()?; + + let rhs_vals_arr = Arc::new(Int64Array::from(vec![0, 1, 2, 1, 2])) as ArrayRef; + let rhs_vals = (0..rhs_vals_arr.len()) + .map(|idx| ScalarValue::try_from_array(&rhs_vals_arr, idx)) + .collect::>>()?; + let expected = + Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2, 2, 1, 1, 2, 2])) as ArrayRef; + let expected_ts = vec![ + Arc::new(Int64Array::from(vec![2, 2, 1, 1, 1, 1, 0, 0, 0, 0])) as ArrayRef, + Arc::new(Int64Array::from(vec![4, 4, 3, 3, 2, 2, 1, 1, 0, 0])) as ArrayRef, + ]; + let (merged_vals, merged_ts) = merge_ordered_arrays( + &mut [lhs_vals, rhs_vals], + &mut [lhs_orderings, rhs_orderings], + &sort_options, + )?; + let merged_vals = ScalarValue::iter_to_array(merged_vals.into_iter())?; + let merged_ts = (0..merged_ts[0].len()) + .map(|col_idx| { + ScalarValue::iter_to_array( + (0..merged_ts.len()) + .map(|row_idx| merged_ts[row_idx][col_idx].clone()), + ) + }) + .collect::>>()?; + + assert_eq!(&merged_vals, &expected); + assert_eq!(&merged_ts, &expected_ts); + Ok(()) + } +} diff --git a/datafusion/functions-aggregate/src/array_agg.rs b/datafusion/functions-aggregate/src/array_agg.rs index 0f12ac34bfd2..3ee459efeddb 100644 --- a/datafusion/functions-aggregate/src/array_agg.rs +++ b/datafusion/functions-aggregate/src/array_agg.rs @@ -18,6 +18,7 @@ //! `ARRAY_AGG` aggregate implementation: [`ArrayAgg`] use arrow::array::{new_empty_array, Array, ArrayRef, AsArray, ListArray, StructArray}; +use arrow::compute::SortOptions; use arrow::datatypes::{DataType, Field, Fields}; use datafusion_common::cast::as_list_array; @@ -32,6 +33,7 @@ use datafusion_functions_aggregate_common::merge_arrays::merge_ordered_arrays; use datafusion_functions_aggregate_common::utils::ordering_fields; use datafusion_macros::user_doc; use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr}; +use std::cmp::Ordering; use std::collections::{HashSet, VecDeque}; use std::mem::{size_of, size_of_val}; use std::sync::Arc; @@ -131,7 +133,34 @@ impl AggregateUDFImpl for ArrayAgg { let data_type = acc_args.exprs[0].data_type(acc_args.schema)?; if acc_args.is_distinct { - return Ok(Box::new(DistinctArrayAggAccumulator::try_new(&data_type)?)); + // Limitation similar to Postgres. The aggregation function can only mix + // DISTINCT and ORDER BY if all the expressions in the ORDER BY appear + // also in the arguments of the function. This implies that if the + // aggregation function only accepts one argument, only one argument + // can be used in the ORDER BY, For example: + // + // ARRAY_AGG(DISTINCT col) + // + // can only be mixed with an ORDER BY if the order expression is "col". + // + // ARRAY_AGG(DISTINCT col ORDER BY col) <- Valid + // ARRAY_AGG(DISTINCT concat(col, '') ORDER BY concat(col, '')) <- Valid + // ARRAY_AGG(DISTINCT col ORDER BY other_col) <- Invalid + // ARRAY_AGG(DISTINCT col ORDER BY concat(col, '')) <- Invalid + if acc_args.ordering_req.len() > 1 { + return exec_err!("In an aggregate with DISTINCT, ORDER BY expressions must appear in argument list"); + } + let mut sort_option: Option = None; + if let Some(order) = acc_args.ordering_req.first() { + if !order.expr.eq(&acc_args.exprs[0]) { + return exec_err!("In an aggregate with DISTINCT, ORDER BY expressions must appear in argument list"); + } + sort_option = Some(order.options) + } + return Ok(Box::new(DistinctArrayAggAccumulator::try_new( + &data_type, + sort_option, + )?)); } if acc_args.ordering_req.is_empty() { @@ -321,13 +350,18 @@ impl Accumulator for ArrayAggAccumulator { struct DistinctArrayAggAccumulator { values: HashSet, datatype: DataType, + sort_options: Option, } impl DistinctArrayAggAccumulator { - pub fn try_new(datatype: &DataType) -> Result { + pub fn try_new( + datatype: &DataType, + sort_options: Option, + ) -> Result { Ok(Self { values: HashSet::new(), datatype: datatype.clone(), + sort_options, }) } } @@ -338,8 +372,8 @@ impl Accumulator for DistinctArrayAggAccumulator { } fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { - if values.len() != 1 { - return internal_err!("expects single batch"); + if values.is_empty() { + return Ok(()); } let array = &values[0]; @@ -369,10 +403,32 @@ impl Accumulator for DistinctArrayAggAccumulator { } fn evaluate(&mut self) -> Result { - let values: Vec = self.values.iter().cloned().collect(); + let mut values: Vec = self.values.iter().cloned().collect(); if values.is_empty() { return Ok(ScalarValue::new_null_list(self.datatype.clone(), true, 1)); } + + if let Some(opts) = self.sort_options { + values.sort_by(|a, b| { + if a.is_null() { + return match opts.nulls_first { + true => Ordering::Less, + false => Ordering::Greater, + }; + } + if b.is_null() { + return match opts.nulls_first { + true => Ordering::Greater, + false => Ordering::Less, + }; + } + match opts.descending { + true => b.partial_cmp(a).unwrap_or(Ordering::Equal), + false => a.partial_cmp(b).unwrap_or(Ordering::Equal), + } + }); + }; + let arr = ScalarValue::new_list(&values, &self.datatype, true); Ok(ScalarValue::List(arr)) } @@ -382,6 +438,8 @@ impl Accumulator for DistinctArrayAggAccumulator { - size_of_val(&self.values) + self.datatype.size() - size_of_val(&self.datatype) + - size_of_val(&self.sort_options) + + size_of::>() } } @@ -598,146 +656,370 @@ impl OrderSensitiveArrayAggAccumulator { #[cfg(test)] mod tests { use super::*; - - use std::collections::VecDeque; + use arrow::datatypes::{FieldRef, Schema}; + use datafusion_common::cast::as_generic_string_array; + use datafusion_common::internal_err; + use datafusion_physical_expr::expressions::Column; + use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr}; use std::sync::Arc; - use arrow::array::Int64Array; - use arrow::compute::SortOptions; + #[test] + fn no_duplicates_no_distinct() -> Result<()> { + let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string().build_two()?; + + acc1.update_batch(&[data(["a", "b", "c"])])?; + acc2.update_batch(&[data(["d", "e", "f"])])?; + acc1 = merge(acc1, acc2)?; - use datafusion_common::utils::get_row_at_idx; - use datafusion_common::{Result, ScalarValue}; + let result = print_nulls(str_arr(acc1.evaluate()?)?); + + assert_eq!(result, vec!["a", "b", "c", "d", "e", "f"]); + + Ok(()) + } #[test] - fn test_merge_asc() -> Result<()> { - let lhs_arrays: Vec = vec![ - Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2])), - Arc::new(Int64Array::from(vec![0, 1, 2, 3, 4])), - ]; - let n_row = lhs_arrays[0].len(); - let lhs_orderings = (0..n_row) - .map(|idx| get_row_at_idx(&lhs_arrays, idx)) - .collect::>>()?; - - let rhs_arrays: Vec = vec![ - Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2])), - Arc::new(Int64Array::from(vec![0, 1, 2, 3, 4])), - ]; - let n_row = rhs_arrays[0].len(); - let rhs_orderings = (0..n_row) - .map(|idx| get_row_at_idx(&rhs_arrays, idx)) - .collect::>>()?; - let sort_options = vec![ - SortOptions { - descending: false, - nulls_first: false, - }, - SortOptions { - descending: false, - nulls_first: false, - }, - ]; - - let lhs_vals_arr = Arc::new(Int64Array::from(vec![0, 1, 2, 3, 4])) as ArrayRef; - let lhs_vals = (0..lhs_vals_arr.len()) - .map(|idx| ScalarValue::try_from_array(&lhs_vals_arr, idx)) - .collect::>>()?; - - let rhs_vals_arr = Arc::new(Int64Array::from(vec![0, 1, 2, 3, 4])) as ArrayRef; - let rhs_vals = (0..rhs_vals_arr.len()) - .map(|idx| ScalarValue::try_from_array(&rhs_vals_arr, idx)) - .collect::>>()?; - let expected = - Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2, 2, 3, 3, 4, 4])) as ArrayRef; - let expected_ts = vec![ - Arc::new(Int64Array::from(vec![0, 0, 0, 0, 1, 1, 1, 1, 2, 2])) as ArrayRef, - Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2, 2, 3, 3, 4, 4])) as ArrayRef, - ]; - - let (merged_vals, merged_ts) = merge_ordered_arrays( - &mut [lhs_vals, rhs_vals], - &mut [lhs_orderings, rhs_orderings], - &sort_options, - )?; - let merged_vals = ScalarValue::iter_to_array(merged_vals.into_iter())?; - let merged_ts = (0..merged_ts[0].len()) - .map(|col_idx| { - ScalarValue::iter_to_array( - (0..merged_ts.len()) - .map(|row_idx| merged_ts[row_idx][col_idx].clone()), - ) - }) - .collect::>>()?; + fn no_duplicates_distinct() -> Result<()> { + let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string() + .distinct() + .build_two()?; + + acc1.update_batch(&[data(["a", "b", "c"])])?; + acc2.update_batch(&[data(["d", "e", "f"])])?; + acc1 = merge(acc1, acc2)?; + + let mut result = print_nulls(str_arr(acc1.evaluate()?)?); + result.sort(); - assert_eq!(&merged_vals, &expected); - assert_eq!(&merged_ts, &expected_ts); + assert_eq!(result, vec!["a", "b", "c", "d", "e", "f"]); Ok(()) } #[test] - fn test_merge_desc() -> Result<()> { - let lhs_arrays: Vec = vec![ - Arc::new(Int64Array::from(vec![2, 1, 1, 0, 0])), - Arc::new(Int64Array::from(vec![4, 3, 2, 1, 0])), - ]; - let n_row = lhs_arrays[0].len(); - let lhs_orderings = (0..n_row) - .map(|idx| get_row_at_idx(&lhs_arrays, idx)) - .collect::>>()?; - - let rhs_arrays: Vec = vec![ - Arc::new(Int64Array::from(vec![2, 1, 1, 0, 0])), - Arc::new(Int64Array::from(vec![4, 3, 2, 1, 0])), - ]; - let n_row = rhs_arrays[0].len(); - let rhs_orderings = (0..n_row) - .map(|idx| get_row_at_idx(&rhs_arrays, idx)) - .collect::>>()?; - let sort_options = vec![ - SortOptions { - descending: true, - nulls_first: false, - }, - SortOptions { - descending: true, - nulls_first: false, - }, - ]; - - // Values (which will be merged) doesn't have to be ordered. - let lhs_vals_arr = Arc::new(Int64Array::from(vec![0, 1, 2, 1, 2])) as ArrayRef; - let lhs_vals = (0..lhs_vals_arr.len()) - .map(|idx| ScalarValue::try_from_array(&lhs_vals_arr, idx)) - .collect::>>()?; - - let rhs_vals_arr = Arc::new(Int64Array::from(vec![0, 1, 2, 1, 2])) as ArrayRef; - let rhs_vals = (0..rhs_vals_arr.len()) - .map(|idx| ScalarValue::try_from_array(&rhs_vals_arr, idx)) - .collect::>>()?; - let expected = - Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2, 2, 1, 1, 2, 2])) as ArrayRef; - let expected_ts = vec![ - Arc::new(Int64Array::from(vec![2, 2, 1, 1, 1, 1, 0, 0, 0, 0])) as ArrayRef, - Arc::new(Int64Array::from(vec![4, 4, 3, 3, 2, 2, 1, 1, 0, 0])) as ArrayRef, - ]; - let (merged_vals, merged_ts) = merge_ordered_arrays( - &mut [lhs_vals, rhs_vals], - &mut [lhs_orderings, rhs_orderings], - &sort_options, - )?; - let merged_vals = ScalarValue::iter_to_array(merged_vals.into_iter())?; - let merged_ts = (0..merged_ts[0].len()) - .map(|col_idx| { - ScalarValue::iter_to_array( - (0..merged_ts.len()) - .map(|row_idx| merged_ts[row_idx][col_idx].clone()), - ) - }) - .collect::>>()?; + fn duplicates_no_distinct() -> Result<()> { + let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string().build_two()?; + + acc1.update_batch(&[data(["a", "b", "c"])])?; + acc2.update_batch(&[data(["a", "b", "c"])])?; + acc1 = merge(acc1, acc2)?; + + let result = print_nulls(str_arr(acc1.evaluate()?)?); + + assert_eq!(result, vec!["a", "b", "c", "a", "b", "c"]); + + Ok(()) + } + + #[test] + fn duplicates_distinct() -> Result<()> { + let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string() + .distinct() + .build_two()?; + + acc1.update_batch(&[data(["a", "b", "c"])])?; + acc2.update_batch(&[data(["a", "b", "c"])])?; + acc1 = merge(acc1, acc2)?; + + let mut result = print_nulls(str_arr(acc1.evaluate()?)?); + result.sort(); + + assert_eq!(result, vec!["a", "b", "c"]); + + Ok(()) + } + + #[test] + fn duplicates_on_second_batch_distinct() -> Result<()> { + let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string() + .distinct() + .build_two()?; + + acc1.update_batch(&[data(["a", "c"])])?; + acc2.update_batch(&[data(["d", "a", "b", "c"])])?; + acc1 = merge(acc1, acc2)?; + + let mut result = print_nulls(str_arr(acc1.evaluate()?)?); + result.sort(); + + assert_eq!(result, vec!["a", "b", "c", "d"]); - assert_eq!(&merged_vals, &expected); - assert_eq!(&merged_ts, &expected_ts); Ok(()) } + + #[test] + fn no_duplicates_distinct_sort_asc() -> Result<()> { + let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string() + .distinct() + .order_by_col("col", SortOptions::new(false, false)) + .build_two()?; + + acc1.update_batch(&[data(["e", "b", "d"])])?; + acc2.update_batch(&[data(["f", "a", "c"])])?; + acc1 = merge(acc1, acc2)?; + + let result = print_nulls(str_arr(acc1.evaluate()?)?); + + assert_eq!(result, vec!["a", "b", "c", "d", "e", "f"]); + + Ok(()) + } + + #[test] + fn no_duplicates_distinct_sort_desc() -> Result<()> { + let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string() + .distinct() + .order_by_col("col", SortOptions::new(true, false)) + .build_two()?; + + acc1.update_batch(&[data(["e", "b", "d"])])?; + acc2.update_batch(&[data(["f", "a", "c"])])?; + acc1 = merge(acc1, acc2)?; + + let result = print_nulls(str_arr(acc1.evaluate()?)?); + + assert_eq!(result, vec!["f", "e", "d", "c", "b", "a"]); + + Ok(()) + } + + #[test] + fn duplicates_distinct_sort_asc() -> Result<()> { + let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string() + .distinct() + .order_by_col("col", SortOptions::new(false, false)) + .build_two()?; + + acc1.update_batch(&[data(["a", "c", "b"])])?; + acc2.update_batch(&[data(["b", "c", "a"])])?; + acc1 = merge(acc1, acc2)?; + + let result = print_nulls(str_arr(acc1.evaluate()?)?); + + assert_eq!(result, vec!["a", "b", "c"]); + + Ok(()) + } + + #[test] + fn duplicates_distinct_sort_desc() -> Result<()> { + let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string() + .distinct() + .order_by_col("col", SortOptions::new(true, false)) + .build_two()?; + + acc1.update_batch(&[data(["a", "c", "b"])])?; + acc2.update_batch(&[data(["b", "c", "a"])])?; + acc1 = merge(acc1, acc2)?; + + let result = print_nulls(str_arr(acc1.evaluate()?)?); + + assert_eq!(result, vec!["c", "b", "a"]); + + Ok(()) + } + + #[test] + fn no_duplicates_distinct_sort_asc_nulls_first() -> Result<()> { + let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string() + .distinct() + .order_by_col("col", SortOptions::new(false, true)) + .build_two()?; + + acc1.update_batch(&[data([Some("e"), Some("b"), None])])?; + acc2.update_batch(&[data([Some("f"), Some("a"), None])])?; + acc1 = merge(acc1, acc2)?; + + let result = print_nulls(str_arr(acc1.evaluate()?)?); + + assert_eq!(result, vec!["NULL", "a", "b", "e", "f"]); + + Ok(()) + } + + #[test] + fn no_duplicates_distinct_sort_asc_nulls_last() -> Result<()> { + let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string() + .distinct() + .order_by_col("col", SortOptions::new(false, false)) + .build_two()?; + + acc1.update_batch(&[data([Some("e"), Some("b"), None])])?; + acc2.update_batch(&[data([Some("f"), Some("a"), None])])?; + acc1 = merge(acc1, acc2)?; + + let result = print_nulls(str_arr(acc1.evaluate()?)?); + + assert_eq!(result, vec!["a", "b", "e", "f", "NULL"]); + + Ok(()) + } + + #[test] + fn no_duplicates_distinct_sort_desc_nulls_first() -> Result<()> { + let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string() + .distinct() + .order_by_col("col", SortOptions::new(true, true)) + .build_two()?; + + acc1.update_batch(&[data([Some("e"), Some("b"), None])])?; + acc2.update_batch(&[data([Some("f"), Some("a"), None])])?; + acc1 = merge(acc1, acc2)?; + + let result = print_nulls(str_arr(acc1.evaluate()?)?); + + assert_eq!(result, vec!["NULL", "f", "e", "b", "a"]); + + Ok(()) + } + + #[test] + fn no_duplicates_distinct_sort_desc_nulls_last() -> Result<()> { + let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string() + .distinct() + .order_by_col("col", SortOptions::new(true, false)) + .build_two()?; + + acc1.update_batch(&[data([Some("e"), Some("b"), None])])?; + acc2.update_batch(&[data([Some("f"), Some("a"), None])])?; + acc1 = merge(acc1, acc2)?; + + let result = print_nulls(str_arr(acc1.evaluate()?)?); + + assert_eq!(result, vec!["f", "e", "b", "a", "NULL"]); + + Ok(()) + } + + #[test] + fn all_nulls_on_first_batch_with_distinct() -> Result<()> { + let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string() + .distinct() + .build_two()?; + + acc1.update_batch(&[data::, 3>([None, None, None])])?; + acc2.update_batch(&[data([Some("a"), None, None, None])])?; + acc1 = merge(acc1, acc2)?; + + let mut result = print_nulls(str_arr(acc1.evaluate()?)?); + result.sort(); + assert_eq!(result, vec!["NULL", "a"]); + Ok(()) + } + + #[test] + fn all_nulls_on_both_batches_with_distinct() -> Result<()> { + let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string() + .distinct() + .build_two()?; + + acc1.update_batch(&[data::, 3>([None, None, None])])?; + acc2.update_batch(&[data::, 4>([None, None, None, None])])?; + acc1 = merge(acc1, acc2)?; + + let result = print_nulls(str_arr(acc1.evaluate()?)?); + assert_eq!(result, vec!["NULL"]); + Ok(()) + } + + struct ArrayAggAccumulatorBuilder { + data_type: DataType, + distinct: bool, + ordering: LexOrdering, + schema: Schema, + } + + impl ArrayAggAccumulatorBuilder { + fn string() -> Self { + Self::new(DataType::Utf8) + } + + fn new(data_type: DataType) -> Self { + Self { + data_type: data_type.clone(), + distinct: Default::default(), + ordering: Default::default(), + schema: Schema { + fields: Fields::from(vec![Field::new( + "col", + DataType::List(FieldRef::new(Field::new( + "item", data_type, true, + ))), + true, + )]), + metadata: Default::default(), + }, + } + } + + fn distinct(mut self) -> Self { + self.distinct = true; + self + } + + fn order_by_col(mut self, col: &str, sort_options: SortOptions) -> Self { + self.ordering.extend([PhysicalSortExpr::new( + Arc::new( + Column::new_with_schema(col, &self.schema) + .expect("column not available in schema"), + ), + sort_options, + )]); + self + } + + fn build(&self) -> Result> { + ArrayAgg::default().accumulator(AccumulatorArgs { + return_type: &self.data_type, + schema: &self.schema, + ignore_nulls: false, + ordering_req: &self.ordering, + is_reversed: false, + name: "", + is_distinct: self.distinct, + exprs: &[Arc::new(Column::new("col", 0))], + }) + } + + fn build_two(&self) -> Result<(Box, Box)> { + Ok((self.build()?, self.build()?)) + } + } + + fn str_arr(value: ScalarValue) -> Result>> { + let ScalarValue::List(list) = value else { + return internal_err!("ScalarValue was not a List"); + }; + Ok(as_generic_string_array::(list.values())? + .iter() + .map(|v| v.map(|v| v.to_string())) + .collect()) + } + + fn print_nulls(sort: Vec>) -> Vec { + sort.into_iter() + .map(|v| v.unwrap_or("NULL".to_string())) + .collect() + } + + fn data(list: [T; N]) -> ArrayRef + where + ScalarValue: From, + { + let values: Vec<_> = list.into_iter().map(ScalarValue::from).collect(); + ScalarValue::iter_to_array(values).expect("Cannot convert to array") + } + + fn merge( + mut acc1: Box, + mut acc2: Box, + ) -> Result> { + let intermediate_state = acc2.state().and_then(|e| { + e.iter() + .map(|v| v.to_array()) + .collect::>>() + })?; + acc1.merge_batch(&intermediate_state)?; + Ok(acc1) + } } diff --git a/datafusion/sqllogictest/test_files/aggregate.slt b/datafusion/sqllogictest/test_files/aggregate.slt index 9d8620b100f3..535ed03aba2c 100644 --- a/datafusion/sqllogictest/test_files/aggregate.slt +++ b/datafusion/sqllogictest/test_files/aggregate.slt @@ -176,6 +176,25 @@ FROM ---- [0VVIHzxWtNOFLtnhjHEKjXaJOSLJfm, 0keZ5G8BffGwgF2RwQD59TFzMStxCB, 0og6hSkhbX8AC1ktFS4kounvTzy8Vo, 1aOcrEGd0cOqZe2I5XBOm0nDcwtBZO, 2T3wSlHdEmASmO0xcXHnndkKEt6bz8] +# array agg can use order by with distinct +query ? +SELECT array_agg(DISTINCT c13 ORDER BY c13) +FROM + (SELECT * + FROM aggregate_test_100 + ORDER BY c13 + LIMIT 5) as t1 +---- +[0VVIHzxWtNOFLtnhjHEKjXaJOSLJfm, 0keZ5G8BffGwgF2RwQD59TFzMStxCB, 0og6hSkhbX8AC1ktFS4kounvTzy8Vo, 1aOcrEGd0cOqZe2I5XBOm0nDcwtBZO, 2T3wSlHdEmASmO0xcXHnndkKEt6bz8] + +query error Execution error: In an aggregate with DISTINCT, ORDER BY expressions must appear in argument list +SELECT array_agg(DISTINCT c13 ORDER BY c12) +FROM aggregate_test_100 + +query error Execution error: In an aggregate with DISTINCT, ORDER BY expressions must appear in argument list +SELECT array_agg(DISTINCT c13 ORDER BY c13, c12) +FROM aggregate_test_100 + statement ok CREATE EXTERNAL TABLE agg_order ( c1 INT NOT NULL, @@ -234,6 +253,16 @@ select column1, nth_value(column3, 2 order by column2, column4 desc) from array_ b [4, 5, 6] w [9, 5, 2] +query ? +select array_agg(DISTINCT column2 order by column2 desc) from array_agg_order_list_table; +---- +[2, 1] + +query ? +select array_agg(DISTINCT column2 + 1 order by column2 + 1 desc) from array_agg_order_list_table; +---- +[3, 2] + statement ok drop table array_agg_order_list_table; From c020bcc6ce138d4749298bdfe27c5a3b4022e4d7 Mon Sep 17 00:00:00 2001 From: Gabriel Musat Mestre Date: Mon, 24 Mar 2025 20:06:09 +0100 Subject: [PATCH 2/4] Add DISTINCT + ORDER BY docs --- datafusion/functions-aggregate/src/array_agg.rs | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/datafusion/functions-aggregate/src/array_agg.rs b/datafusion/functions-aggregate/src/array_agg.rs index 3ee459efeddb..573624ce4d49 100644 --- a/datafusion/functions-aggregate/src/array_agg.rs +++ b/datafusion/functions-aggregate/src/array_agg.rs @@ -48,16 +48,25 @@ make_udaf_expr_and_func!( #[user_doc( doc_section(label = "General Functions"), - description = "Returns an array created from the expression elements. If ordering is required, elements are inserted in the specified order.", + description = r#"Returns an array created from the expression elements. If ordering is required, elements are inserted in the specified order. +This aggregation function can only mix DISTINCT and ORDER BY if the ordering expression is exactly the same as the argument expression."#, syntax_example = "array_agg(expression [ORDER BY expression])", - sql_example = r#"```sql + sql_example = r#" +```sql > SELECT array_agg(column_name ORDER BY other_column) FROM table_name; +-----------------------------------------------+ | array_agg(column_name ORDER BY other_column) | +-----------------------------------------------+ | [element1, element2, element3] | +-----------------------------------------------+ -```"#, +> SELECT array_agg(DISTINCT column_name ORDER BY column_name) FROM table_name; ++--------------------------------------------------------+ +| array_agg(DISTINCT column_name ORDER BY column_name) | ++--------------------------------------------------------+ +| [element1, element2, element3] | ++--------------------------------------------------------+ +``` +"#, standard_argument(name = "expression",) )] #[derive(Debug)] From f194b23f27012306d587f0d5899917a29c254645 Mon Sep 17 00:00:00 2001 From: Gabriel Musat Mestre Date: Tue, 25 Mar 2025 10:40:44 +0100 Subject: [PATCH 3/4] Add some more sqllogictests --- datafusion/sqllogictest/test_files/aggregate.slt | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/datafusion/sqllogictest/test_files/aggregate.slt b/datafusion/sqllogictest/test_files/aggregate.slt index 535ed03aba2c..621e212ebc71 100644 --- a/datafusion/sqllogictest/test_files/aggregate.slt +++ b/datafusion/sqllogictest/test_files/aggregate.slt @@ -253,6 +253,11 @@ select column1, nth_value(column3, 2 order by column2, column4 desc) from array_ b [4, 5, 6] w [9, 5, 2] +query ? +select array_agg(DISTINCT column2 order by column2) from array_agg_order_list_table; +---- +[1, 2] + query ? select array_agg(DISTINCT column2 order by column2 desc) from array_agg_order_list_table; ---- @@ -263,6 +268,15 @@ select array_agg(DISTINCT column2 + 1 order by column2 + 1 desc) from array_agg_ ---- [3, 2] +query ? +select array_agg(DISTINCT column2 order by column2) from array_agg_order_list_table GROUP BY column1; +---- +[1, 2] +[1, 2] + +statement error In an aggregate with DISTINCT, ORDER BY expressions must appear in argument list +select array_agg(DISTINCT column2 order by column1) from array_agg_order_list_table; + statement ok drop table array_agg_order_list_table; From b7dc2e87a2b15724e481bd5a07bf1b4a07e00209 Mon Sep 17 00:00:00 2001 From: Gabriel Musat Mestre Date: Tue, 25 Mar 2025 11:03:35 +0100 Subject: [PATCH 4/4] Update aggregate_functions.md --- docs/source/user-guide/sql/aggregate_functions.md | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/docs/source/user-guide/sql/aggregate_functions.md b/docs/source/user-guide/sql/aggregate_functions.md index 7d88d3168d23..c7f5c5f67442 100644 --- a/docs/source/user-guide/sql/aggregate_functions.md +++ b/docs/source/user-guide/sql/aggregate_functions.md @@ -57,6 +57,7 @@ Aggregate functions operate on a set of values to compute a single result. ### `array_agg` Returns an array created from the expression elements. If ordering is required, elements are inserted in the specified order. +This aggregation function can only mix DISTINCT and ORDER BY if the ordering expression is exactly the same as the argument expression. ```sql array_agg(expression [ORDER BY expression]) @@ -75,6 +76,12 @@ array_agg(expression [ORDER BY expression]) +-----------------------------------------------+ | [element1, element2, element3] | +-----------------------------------------------+ +> SELECT array_agg(DISTINCT column_name ORDER BY column_name) FROM table_name; ++--------------------------------------------------------+ +| array_agg(DISTINCT column_name ORDER BY column_name) | ++--------------------------------------------------------+ +| [element1, element2, element3] | ++--------------------------------------------------------+ ``` ### `avg`