@@ -76,38 +76,17 @@ impl LogicalAgg {
76
76
let mut core = self . core . clone ( ) ;
77
77
78
78
// ====== Handle approx percentile aggs
79
- let SeparatedAggInfo { normal, approx } = self . separate_normal_and_special_agg ( ) ;
80
-
81
- let AggInfo {
82
- calls : non_approx_percentile_agg_calls,
83
- col_mapping : non_approx_percentile_col_mapping,
84
- } = normal;
85
- let AggInfo {
86
- calls : approx_percentile_agg_calls,
87
- col_mapping : approx_percentile_col_mapping,
88
- } = approx;
89
-
90
- let needs_row_merge = ( !non_approx_percentile_agg_calls. is_empty ( )
91
- && !approx_percentile_agg_calls. is_empty ( ) )
92
- || approx_percentile_agg_calls. len ( ) >= 2 ;
93
- core. input = if needs_row_merge {
94
- // If there's row merge, we need to share the input.
95
- StreamShare :: new_from_input ( stream_input. clone ( ) ) . into ( )
96
- } else {
97
- stream_input
98
- } ;
99
- core. agg_calls = non_approx_percentile_agg_calls;
79
+ let ( non_approx_percentile_col_mapping, approx_percentile_col_mapping, approx_percentile) =
80
+ self . prepare_approx_percentile ( & mut core, stream_input. clone ( ) ) ?;
100
81
101
- let approx_percentile =
102
- self . build_approx_percentile_aggs ( core. input . clone ( ) , & approx_percentile_agg_calls) ?;
103
-
104
- // ====== Handle normal aggs
105
82
if core. agg_calls . is_empty ( ) {
106
83
if let Some ( approx_percentile) = approx_percentile {
107
84
return Ok ( approx_percentile) ;
108
85
} ;
109
86
bail ! ( "expected at least one agg call" ) ;
110
87
}
88
+
89
+ // ====== Handle normal aggs
111
90
let total_agg_calls = core
112
91
. agg_calls
113
92
. iter ( )
@@ -123,21 +102,12 @@ impl LogicalAgg {
123
102
new_stream_simple_agg ( Agg :: new ( total_agg_calls, IndexSet :: empty ( ) , exchange) ) ;
124
103
125
104
// ====== Merge approx percentile and normal aggs
126
- if let Some ( approx_percentile) = approx_percentile {
127
- if needs_row_merge {
128
- let row_merge = StreamRowMerge :: new (
129
- approx_percentile,
130
- global_agg. into ( ) ,
131
- approx_percentile_col_mapping,
132
- non_approx_percentile_col_mapping,
133
- ) ?;
134
- Ok ( row_merge. into ( ) )
135
- } else {
136
- Ok ( approx_percentile)
137
- }
138
- } else {
139
- Ok ( global_agg. into ( ) )
140
- }
105
+ Self :: add_row_merge_if_needed (
106
+ approx_percentile,
107
+ global_agg. into ( ) ,
108
+ approx_percentile_col_mapping,
109
+ non_approx_percentile_col_mapping,
110
+ )
141
111
}
142
112
143
113
/// Generate plan for stateless/stateful 2-phase streaming agg.
@@ -148,10 +118,21 @@ impl LogicalAgg {
148
118
stream_input : PlanRef ,
149
119
dist_key : & [ usize ] ,
150
120
) -> Result < PlanRef > {
151
- let input_col_num = stream_input. schema ( ) . len ( ) ;
121
+ let mut core = self . core . clone ( ) ;
122
+
123
+ let ( non_approx_percentile_col_mapping, approx_percentile_col_mapping, approx_percentile) =
124
+ self . prepare_approx_percentile ( & mut core, stream_input. clone ( ) ) ?;
125
+
126
+ if core. agg_calls . is_empty ( ) {
127
+ if let Some ( approx_percentile) = approx_percentile {
128
+ return Ok ( approx_percentile) ;
129
+ } ;
130
+ bail ! ( "expected at least one agg call" ) ;
131
+ }
152
132
153
133
// Generate vnode via project
154
134
// TODO(kwannoel): We should apply Project optimization rules here.
135
+ let input_col_num = stream_input. schema ( ) . len ( ) ; // get schema len before moving `stream_input`.
155
136
let project = StreamProject :: new ( generic:: Project :: with_vnode_col ( stream_input, dist_key) ) ;
156
137
let vnode_col_idx = project. base . schema ( ) . len ( ) - 1 ;
157
138
@@ -160,7 +141,7 @@ impl LogicalAgg {
160
141
local_group_key. insert ( vnode_col_idx) ;
161
142
let n_local_group_key = local_group_key. len ( ) ;
162
143
let local_agg = new_stream_hash_agg (
163
- Agg :: new ( self . agg_calls ( ) . to_vec ( ) , local_group_key, project. into ( ) ) ,
144
+ Agg :: new ( core . agg_calls . to_vec ( ) , local_group_key, project. into ( ) ) ,
164
145
Some ( vnode_col_idx) ,
165
146
) ;
166
147
// Global group key excludes vnode.
@@ -173,11 +154,11 @@ impl LogicalAgg {
173
154
. expect ( "some input group key could not be mapped" ) ;
174
155
175
156
// Generate global agg step
176
- if self . group_key ( ) . is_empty ( ) {
157
+ let global_agg = if self . group_key ( ) . is_empty ( ) {
177
158
let exchange =
178
159
RequiredDist :: single ( ) . enforce_if_not_satisfies ( local_agg. into ( ) , & Order :: any ( ) ) ?;
179
160
let global_agg = new_stream_simple_agg ( Agg :: new (
180
- self . agg_calls ( )
161
+ core . agg_calls
181
162
. iter ( )
182
163
. enumerate ( )
183
164
. map ( |( partial_output_idx, agg_call) | {
@@ -187,15 +168,15 @@ impl LogicalAgg {
187
168
global_group_key. into_iter ( ) . collect ( ) ,
188
169
exchange,
189
170
) ) ;
190
- Ok ( global_agg. into ( ) )
171
+ global_agg. into ( )
191
172
} else {
192
173
let exchange = RequiredDist :: shard_by_key ( input_col_num, & global_group_key)
193
174
. enforce_if_not_satisfies ( local_agg. into ( ) , & Order :: any ( ) ) ?;
194
175
// Local phase should have reordered the group keys into their required order.
195
176
// we can just follow it.
196
177
let global_agg = new_stream_hash_agg (
197
178
Agg :: new (
198
- self . agg_calls ( )
179
+ core . agg_calls
199
180
. iter ( )
200
181
. enumerate ( )
201
182
. map ( |( partial_output_idx, agg_call) | {
@@ -208,8 +189,14 @@ impl LogicalAgg {
208
189
) ,
209
190
None ,
210
191
) ;
211
- Ok ( global_agg. into ( ) )
212
- }
192
+ global_agg. into ( )
193
+ } ;
194
+ Self :: add_row_merge_if_needed (
195
+ approx_percentile,
196
+ global_agg,
197
+ approx_percentile_col_mapping,
198
+ non_approx_percentile_col_mapping,
199
+ )
213
200
}
214
201
215
202
fn gen_single_plan ( & self , stream_input : PlanRef ) -> Result < PlanRef > {
@@ -304,6 +291,71 @@ impl LogicalAgg {
304
291
}
305
292
}
306
293
294
+ /// Prepares metadata and the `approx_percentile` plan, if there's one present.
295
+ /// It may modify `core.agg_calls` to separate normal agg and approx percentile agg,
296
+ /// and `core.input` to share the input via `StreamShare`,
297
+ /// to both approx percentile agg and normal agg.
298
+ fn prepare_approx_percentile (
299
+ & self ,
300
+ core : & mut Agg < PlanRef > ,
301
+ stream_input : PlanRef ,
302
+ ) -> Result < ( ColIndexMapping , ColIndexMapping , Option < PlanRef > ) > {
303
+ let SeparatedAggInfo { normal, approx } = self . separate_normal_and_special_agg ( ) ;
304
+
305
+ let AggInfo {
306
+ calls : non_approx_percentile_agg_calls,
307
+ col_mapping : non_approx_percentile_col_mapping,
308
+ } = normal;
309
+ let AggInfo {
310
+ calls : approx_percentile_agg_calls,
311
+ col_mapping : approx_percentile_col_mapping,
312
+ } = approx;
313
+ if !self . group_key ( ) . is_empty ( ) && !approx_percentile_agg_calls. is_empty ( ) {
314
+ bail_not_implemented ! ( "two-phase approx percentile agg with group key, please use single phase agg for approx_percentile with group key" ) ;
315
+ }
316
+
317
+ // Either we have approx percentile aggs and non_approx percentile aggs,
318
+ // or we have at least 2 approx percentile aggs.
319
+ let needs_row_merge = ( !non_approx_percentile_agg_calls. is_empty ( )
320
+ && !approx_percentile_agg_calls. is_empty ( ) )
321
+ || approx_percentile_agg_calls. len ( ) >= 2 ;
322
+ core. input = if needs_row_merge {
323
+ // If there's row merge, we need to share the input.
324
+ StreamShare :: new_from_input ( stream_input. clone ( ) ) . into ( )
325
+ } else {
326
+ stream_input
327
+ } ;
328
+ core. agg_calls = non_approx_percentile_agg_calls;
329
+
330
+ let approx_percentile =
331
+ self . build_approx_percentile_aggs ( core. input . clone ( ) , & approx_percentile_agg_calls) ?;
332
+ Ok ( (
333
+ non_approx_percentile_col_mapping,
334
+ approx_percentile_col_mapping,
335
+ approx_percentile,
336
+ ) )
337
+ }
338
+
339
+ /// Add `RowMerge` if needed
340
+ fn add_row_merge_if_needed (
341
+ approx_percentile : Option < PlanRef > ,
342
+ global_agg : PlanRef ,
343
+ approx_percentile_col_mapping : ColIndexMapping ,
344
+ non_approx_percentile_col_mapping : ColIndexMapping ,
345
+ ) -> Result < PlanRef > {
346
+ if let Some ( approx_percentile) = approx_percentile {
347
+ let row_merge = StreamRowMerge :: new (
348
+ approx_percentile,
349
+ global_agg,
350
+ approx_percentile_col_mapping,
351
+ non_approx_percentile_col_mapping,
352
+ ) ?;
353
+ Ok ( row_merge. into ( ) )
354
+ } else {
355
+ Ok ( global_agg)
356
+ }
357
+ }
358
+
307
359
fn separate_normal_and_special_agg ( & self ) -> SeparatedAggInfo {
308
360
let estimated_len = self . agg_calls ( ) . len ( ) - 1 ;
309
361
let mut approx_percentile_agg_calls = Vec :: with_capacity ( estimated_len) ;
0 commit comments