32
32
import javax .annotation .Nullable ;
33
33
import javax .annotation .WillClose ;
34
34
import javax .annotation .WillCloseWhenClosed ;
35
- import org .apache .arrow .algorithm .sort .DefaultVectorComparators ;
36
- import org .apache .arrow .algorithm .sort .IndexSorter ;
37
- import org .apache .arrow .algorithm .sort .StableVectorComparator ;
38
35
import org .apache .arrow .memory .BufferAllocator ;
39
36
import org .apache .arrow .util .Preconditions ;
40
37
import org .apache .arrow .vector .*;
@@ -106,7 +103,7 @@ public void probe(VectorSchemaRoot probeSideBatch) throws ComputeException {
106
103
IntVector buildSideCandidateIndices = buildSideCandidateIndicesBuilder .build ();
107
104
IntVector proSideCandidateIndices = probeSideCandidateIndicesBuilder .build ()) {
108
105
boolean [] probeSideMatched = new boolean [probeSideBatch .getRowCount ()];
109
- output (
106
+ outputMatchedAndUnmatched (
110
107
outputDictionaryProvider ,
111
108
buildSideCandidateIndices ,
112
109
proSideCandidateIndices ,
@@ -143,12 +140,12 @@ public void flush() throws ComputeException {
143
140
IntVector buildSideIndices = buildSideIndicesBuilder .build (buildSideUnmatchedCount );
144
141
IntVector probeSideIndices = probeSideIndicesBuilder .build (buildSideUnmatchedCount );
145
142
BitVector mark = markBuilder .build (buildSideUnmatchedCount )) {
146
- output (dictionary , buildSideIndices , probeSideIndices , mark );
143
+ output (dictionary , buildSideIndices , probeSideIndices , mark , 0 );
147
144
}
148
145
}
149
146
}
150
147
151
- private void output (
148
+ private void outputMatchedAndUnmatched (
152
149
ArrayDictionaryProvider dictionary ,
153
150
IntVector buildSideCandidateIndices ,
154
151
IntVector proSideCandidateIndices ,
@@ -160,32 +157,37 @@ private void output(
160
157
allocator , "buildSideIndices" , buildSideCandidateIndices .getValueCount ());
161
158
SelectionBuilder probeSideIndicesBuilder =
162
159
new SelectionBuilder (
163
- allocator , "probeSideIndices" , proSideCandidateIndices .getValueCount ());
164
- MarkBuilder markBuilder = getMarkBuilder (probeSideMatched .length )) {
160
+ allocator , "probeSideIndices" , proSideCandidateIndices .getValueCount ())) {
165
161
162
+ int matchedCount = 0 ;
166
163
try (VectorSchemaRoot candidate =
167
164
getDictionaryEncodedBatch (
168
165
buildSideCandidateIndices , proSideCandidateIndices , dictionary );
169
166
BaseIntVector indicesSelection = matcher .filter (allocator , dictionary , candidate , null )) {
170
- outputMatched (
171
- buildSideIndicesBuilder ,
172
- probeSideIndicesBuilder ,
173
- markBuilder ,
174
- buildSideCandidateIndices ,
175
- proSideCandidateIndices ,
176
- indicesSelection ,
177
- probeSideMatched );
167
+ matchedCount =
168
+ outputMatched (
169
+ buildSideIndicesBuilder ,
170
+ probeSideIndicesBuilder ,
171
+ buildSideCandidateIndices ,
172
+ proSideCandidateIndices ,
173
+ indicesSelection ,
174
+ probeSideMatched );
178
175
}
179
176
177
+ int unmatchedCount = 0 ;
180
178
if (joinOption .isToOutputProbeSideUnmatched ()) {
181
- outputProbeSideUnmatched (probeSideIndicesBuilder , markBuilder , probeSideMatched );
179
+ unmatchedCount = outputProbeSideUnmatched (probeSideIndicesBuilder , probeSideMatched );
182
180
}
183
181
184
- try (IntVector probeSideIndices = probeSideIndicesBuilder .build ();
185
- IntVector buildSideIndices =
186
- buildSideIndicesBuilder .build (probeSideIndices .getValueCount ());
187
- BitVector mark = markBuilder .build (probeSideIndices .getValueCount ())) {
188
- output (dictionary , buildSideIndices , probeSideIndices , mark );
182
+ try (MarkBuilder markBuilder = getMarkBuilder (matchedCount + unmatchedCount )) {
183
+ markBuilder .appendTrue (matchedCount );
184
+ markBuilder .appendFalse (unmatchedCount );
185
+ try (IntVector probeSideIndices = probeSideIndicesBuilder .build ();
186
+ IntVector buildSideIndices =
187
+ buildSideIndicesBuilder .build (probeSideIndices .getValueCount ());
188
+ BitVector mark = markBuilder .build (probeSideIndices .getValueCount ())) {
189
+ output (dictionary , buildSideIndices , probeSideIndices , mark , unmatchedCount );
190
+ }
189
191
}
190
192
}
191
193
}
@@ -198,10 +200,9 @@ private MarkBuilder getMarkBuilder(int capacity) {
198
200
}
199
201
}
200
202
201
- private void outputMatched (
203
+ private int outputMatched (
202
204
SelectionBuilder buildSideIndicesBuilder ,
203
205
SelectionBuilder probeSideIndicesBuilder ,
204
- MarkBuilder markBuilder ,
205
206
IntVector buildSideCandidateIndices ,
206
207
IntVector proSideCandidateIndices ,
207
208
@ Nullable BaseIntVector indicesSelection ,
@@ -210,20 +211,18 @@ private void outputMatched(
210
211
Preconditions .checkState (
211
212
buildSideCandidateIndices .getValueCount () == proSideCandidateIndices .getValueCount ());
212
213
if (indicesSelection == null ) {
213
- outputMatched (
214
+ return outputMatched (
214
215
buildSideIndicesBuilder ,
215
216
probeSideIndicesBuilder ,
216
- markBuilder ,
217
217
buildSideCandidateIndices ,
218
218
proSideCandidateIndices ,
219
219
probeSideMatched ,
220
220
proSideCandidateIndices .getValueCount (),
221
221
i -> i );
222
222
} else {
223
- outputMatched (
223
+ return outputMatched (
224
224
buildSideIndicesBuilder ,
225
225
probeSideIndicesBuilder ,
226
- markBuilder ,
227
226
buildSideCandidateIndices ,
228
227
proSideCandidateIndices ,
229
228
probeSideMatched ,
@@ -232,10 +231,9 @@ private void outputMatched(
232
231
}
233
232
}
234
233
235
- private void outputMatched (
234
+ private int outputMatched (
236
235
SelectionBuilder buildSideIndicesBuilder ,
237
236
SelectionBuilder probeSideIndicesBuilder ,
238
- MarkBuilder markBuilder ,
239
237
IntVector buildSideCandidateIndices ,
240
238
IntVector proSideCandidateIndices ,
241
239
boolean [] probeSideMatched ,
@@ -266,21 +264,19 @@ private void outputMatched(
266
264
buildSideIndicesBuilder .append (buildSideMatchedIndex );
267
265
probeSideIndicesBuilder .append (probeSideMatchedIndex );
268
266
}
269
- markBuilder . appendTrue ( probeSideMatchedCount ) ;
267
+ return probeSideMatchedCount ;
270
268
}
271
269
272
- private void outputProbeSideUnmatched (
273
- SelectionBuilder probeSideIndicesBuilder ,
274
- MarkBuilder markBuilder ,
275
- boolean [] probeSideMatched ) {
270
+ private int outputProbeSideUnmatched (
271
+ SelectionBuilder probeSideIndicesBuilder , boolean [] probeSideMatched ) {
276
272
int probeSideUnmatchedCount = 0 ;
277
273
for (int probeSideIndex = 0 ; probeSideIndex < probeSideMatched .length ; probeSideIndex ++) {
278
274
if (!probeSideMatched [probeSideIndex ]) {
279
275
probeSideUnmatchedCount ++;
280
276
probeSideIndicesBuilder .append (probeSideIndex );
281
277
}
282
278
}
283
- markBuilder . appendFalse ( probeSideUnmatchedCount ) ;
279
+ return probeSideUnmatchedCount ;
284
280
}
285
281
286
282
private VectorSchemaRoot getDictionaryEncodedBatch (
@@ -306,7 +302,8 @@ private void output(
306
302
ArrayDictionaryProvider dictionaryProvider ,
307
303
BaseIntVector buildSideIndices ,
308
304
BaseIntVector probeSideIndices ,
309
- @ Nullable BitVector mark )
305
+ @ Nullable BitVector mark ,
306
+ int unmatchedCount )
310
307
throws ComputeException {
311
308
Preconditions .checkArgument (
312
309
buildSideIndices .getValueCount () == probeSideIndices .getValueCount ());
@@ -335,7 +332,7 @@ private void output(
335
332
VectorSchemaRoot output =
336
333
ScalarExpressions .evaluate (
337
334
allocator , dictionaryProvider , result , null , outputExpressions );
338
- BaseIntVector selection = getSelection (probeSideIndices )) {
335
+ BaseIntVector selection = getSelection (probeSideIndices , unmatchedCount )) {
339
336
resultConsumer .consume (
340
337
dictionaryProvider .slice (allocator ),
341
338
VectorSchemaRoots .transfer (allocator , output ),
@@ -344,33 +341,55 @@ private void output(
344
341
}
345
342
346
343
@ Nullable
347
- private BaseIntVector getSelection (BaseIntVector probeSideIndices ) {
348
- if (!joinOption .isToOutputProbeSideUnmatched () || !joinOption .isOrderByProbeSideOrdinal ()) {
344
+ private BaseIntVector getSelection (BaseIntVector probeSideIndices , int unmatchedCount ) {
345
+ int total = probeSideIndices .getValueCount ();
346
+ int matchedCount = total - unmatchedCount ;
347
+ if (matchedCount == 0 || unmatchedCount == 0 || !joinOption .isOrderByProbeSideOrdinal ()) {
349
348
return null ;
350
349
}
351
- if (isInOrder (probeSideIndices )) {
350
+
351
+ if (isInOrder (probeSideIndices , matchedCount - 1 , matchedCount )) {
352
352
return null ;
353
353
}
354
354
355
- IntVector selection = new IntVector ("selection" , allocator );
356
- selection .allocateNew (probeSideIndices .getValueCount ());
357
- selection .setValueCount (probeSideIndices .getValueCount ());
358
- new IndexSorter <>()
359
- .sort (
360
- probeSideIndices ,
361
- selection ,
362
- new StableVectorComparator <>(
363
- DefaultVectorComparators .createDefaultComparator (probeSideIndices )));
364
- return selection ;
355
+ try (SelectionBuilder selectionBuilder = new SelectionBuilder (allocator , "selection" , total )) {
356
+ // merge from [0,matchedCount) and from [matchedCount,probeSideIndices.getValueCount())
357
+ int leftCursor = 0 ;
358
+ int rightCursor = matchedCount ;
359
+ while (true ) {
360
+ if (leftCursor < matchedCount ) {
361
+ if (rightCursor < total ) {
362
+ if (isInOrder (probeSideIndices , leftCursor , rightCursor )) {
363
+ selectionBuilder .append (leftCursor ++);
364
+ } else {
365
+ selectionBuilder .append (rightCursor ++);
366
+ }
367
+ } else {
368
+ selectionBuilder .append (leftCursor ++);
369
+ }
370
+ } else {
371
+ if (rightCursor < total ) {
372
+ selectionBuilder .append (rightCursor ++);
373
+ } else {
374
+ break ;
375
+ }
376
+ }
377
+ }
378
+ return selectionBuilder .build ();
379
+ }
365
380
}
366
381
367
- private static boolean isInOrder (BaseIntVector vector ) {
368
- for (int i = 1 ; i < vector .getValueCount (); i ++) {
369
- if (vector .getValueAsLong (i ) < vector .getValueAsLong ((i - 1 ))) {
382
+ private static boolean isInOrder (BaseIntVector vector , int leftIndex , int rightIndex ) {
383
+ // null is biggest
384
+ if (vector .getField ().isNullable ()) {
385
+ if (vector .isNull (rightIndex )) {
386
+ return true ;
387
+ }
388
+ if (vector .isNull (leftIndex )) {
370
389
return false ;
371
390
}
372
391
}
373
- return true ;
392
+ return vector . getValueAsLong ( leftIndex ) <= vector . getValueAsLong ( rightIndex ) ;
374
393
}
375
394
376
395
public static class Builder implements AutoCloseable {
0 commit comments