@@ -40,9 +40,7 @@ use crate::sorts::streaming_merge::StreamingMergeBuilder;
40
40
use crate :: stream:: RecordBatchStreamAdapter ;
41
41
use crate :: { DisplayFormatType , ExecutionPlan , Partitioning , PlanProperties , Statistics } ;
42
42
43
- use arrow:: array:: {
44
- ArrayRef , BooleanArray , PrimitiveArray , RecordBatch , RecordBatchOptions ,
45
- } ;
43
+ use arrow:: array:: { BooleanArray , PrimitiveArray , RecordBatch , RecordBatchOptions } ;
46
44
use arrow:: compute:: take_arrays;
47
45
use arrow:: datatypes:: { SchemaRef , UInt32Type } ;
48
46
use arrow_schema:: { DataType , Field } ;
@@ -204,7 +202,7 @@ enum BatchPartitionerState {
204
202
} ,
205
203
}
206
204
207
- pub static SELECTION_FILED_NAME : & str = "selection " ;
205
+ pub static SELECTION_FILED_NAME : & str = "__selection " ;
208
206
209
207
impl BatchPartitioner {
210
208
/// Create a new [`BatchPartitioner`] with the provided [`Partitioning`]
@@ -379,15 +377,15 @@ impl BatchPartitioner {
379
377
let it = ( 0 ..* num_partitions) . map ( move |partition| {
380
378
// Tracking time required for repartitioned batches construction
381
379
let _timer = partitioner_timer. timer ( ) ;
382
- let select_vector = hash_vector
383
- . iter ( )
384
- . map ( | & hash| hash == partition as u64 )
385
- . collect :: < Vec < _ > > ( ) ;
386
- let new_col : ArrayRef =
387
- Arc :: new ( BooleanArray :: from ( select_vector ) ) as ArrayRef ;
380
+ let selection_vector = Arc :: new (
381
+ hash_vector
382
+ . iter ( )
383
+ . map ( | & hash| Some ( hash == partition as u64 ) )
384
+ . collect :: < BooleanArray > ( ) ,
385
+ ) ;
388
386
let mut columns =
389
387
batch. columns ( ) . iter ( ) . map ( Arc :: clone) . collect :: < Vec < _ > > ( ) ;
390
- columns. push ( new_col ) ;
388
+ columns. push ( selection_vector ) ;
391
389
let mut options = RecordBatchOptions :: new ( ) ;
392
390
options = options. with_row_count ( Some ( batch. num_rows ( ) ) ) ;
393
391
let batch = RecordBatch :: try_new_with_options (
0 commit comments