36
36
import cn .edu .tsinghua .iginx .engine .physical .task .memory .row .RowToArrowUnaryMemoryPhysicalTask ;
37
37
import cn .edu .tsinghua .iginx .engine .physical .task .memory .row .UnaryRowMemoryPhysicalTask ;
38
38
import cn .edu .tsinghua .iginx .engine .physical .task .utils .PhysicalCloseable ;
39
- import cn .edu .tsinghua .iginx .engine .physical .utils .PhysicalExpressionUtils ;
40
39
import cn .edu .tsinghua .iginx .engine .physical .utils .PhysicalJoinUtils ;
41
40
import cn .edu .tsinghua .iginx .engine .shared .RequestContext ;
42
41
import cn .edu .tsinghua .iginx .engine .shared .data .read .BatchStream ;
45
44
import cn .edu .tsinghua .iginx .engine .shared .expr .Expression ;
46
45
import cn .edu .tsinghua .iginx .engine .shared .function .FunctionCall ;
47
46
import cn .edu .tsinghua .iginx .engine .shared .function .FunctionParams ;
48
- import cn .edu .tsinghua .iginx .engine .shared .function .FunctionType ;
49
47
import cn .edu .tsinghua .iginx .engine .shared .function .system .ArithmeticExpr ;
50
48
import cn .edu .tsinghua .iginx .engine .shared .operator .*;
51
49
import cn .edu .tsinghua .iginx .engine .shared .operator .type .JoinAlgType ;
52
50
import cn .edu .tsinghua .iginx .engine .shared .operator .type .OperatorType ;
53
51
import cn .edu .tsinghua .iginx .engine .shared .source .*;
54
52
import cn .edu .tsinghua .iginx .physical .optimizer .naive .initializer .*;
53
+ import cn .edu .tsinghua .iginx .physical .optimizer .naive .util .UDFDetector ;
55
54
import java .util .ArrayList ;
56
55
import java .util .Collections ;
57
56
import java .util .List ;
@@ -293,9 +292,7 @@ public PhysicalTask<BatchStream> construct(AddSchemaPrefix operator, RequestCont
293
292
}
294
293
295
294
public PhysicalTask <?> construct (RowTransform operator , RequestContext context ) {
296
- if (operator .getFunctionCallList ().stream ()
297
- .map (FunctionCall ::getFunction )
298
- .anyMatch (f -> f .getFunctionType () != FunctionType .System )) {
295
+ if (operator .getFunctionCallList ().stream ().anyMatch (UDFDetector ::containNonSystemFunction )) {
299
296
return constructRow (operator , context );
300
297
}
301
298
@@ -324,6 +321,10 @@ public PhysicalTask<?> construct(Select operator, RequestContext context) {
324
321
return storageTask ;
325
322
}
326
323
324
+ if (UDFDetector .containNonSystemFunction (operator .getFilter ())) {
325
+ return constructRow (operator , context );
326
+ }
327
+
327
328
if (sourceTask .getResultClass () == RowStream .class ) {
328
329
return new UnaryRowMemoryPhysicalTask (
329
330
convert (sourceTask , context , RowStream .class ), operator , context );
@@ -400,9 +401,7 @@ public PhysicalTask<?> construct(SetTransform operator, RequestContext context)
400
401
return storageTask ;
401
402
}
402
403
403
- if (operator .getFunctionCallList ().stream ()
404
- .map (FunctionCall ::getFunction )
405
- .anyMatch (f -> f .getFunctionType () != FunctionType .System )) {
404
+ if (operator .getFunctionCallList ().stream ().anyMatch (UDFDetector ::containNonSystemFunction )) {
406
405
return constructRow (operator , context );
407
406
}
408
407
@@ -421,13 +420,10 @@ public PhysicalTask<?> construct(GroupBy operator, RequestContext context) {
421
420
return storageTask ;
422
421
}
423
422
424
- if (operator .getFunctionCallList ().stream ()
425
- .map (FunctionCall ::getFunction )
426
- .anyMatch (f -> f .getFunctionType () != FunctionType .System )) {
423
+ if (operator .getFunctionCallList ().stream ().anyMatch (UDFDetector ::containNonSystemFunction )) {
427
424
return constructRow (operator , context );
428
425
}
429
- if (!operator .getGroupByExpressions ().stream ()
430
- .allMatch (PhysicalExpressionUtils ::containSystemFunctionOnly )) {
426
+ if (operator .getGroupByExpressions ().stream ().anyMatch (UDFDetector ::containNonSystemFunction )) {
431
427
return constructRow (operator , context );
432
428
}
433
429
@@ -482,6 +478,10 @@ public PhysicalTask<?> construct(InnerJoin operator, RequestContext context) {
482
478
return constructRow (operator , context );
483
479
}
484
480
481
+ if (UDFDetector .containNonSystemFunction (operator .getFilter ())) {
482
+ return constructRow (operator , context );
483
+ }
484
+
485
485
// NOTE: The order of left and right task is reversed in InnerJoin
486
486
// 这里以及后面交换了左右两个表的顺序,原因是在之前基于行的实现中,右表是BuildSide,左表是ProbeSide
487
487
// 现在基于列的实现中,左表是BuildSide,右表是ProbeSide
@@ -503,6 +503,10 @@ public PhysicalTask<?> construct(OuterJoin operator, RequestContext context) {
503
503
return constructRow (operator , context );
504
504
}
505
505
506
+ if (UDFDetector .containNonSystemFunction (operator .getFilter ())) {
507
+ return constructRow (operator , context );
508
+ }
509
+
506
510
operator = PhysicalJoinUtils .reverse (operator );
507
511
508
512
PhysicalTask <BatchStream > leftTask = fetchAsync (operator .getSourceA (), context );
@@ -521,6 +525,10 @@ public PhysicalTask<?> construct(MarkJoin operator, RequestContext context) {
521
525
return constructRow (operator , context );
522
526
}
523
527
528
+ if (UDFDetector .containNonSystemFunction (operator .getFilter ())) {
529
+ return constructRow (operator , context );
530
+ }
531
+
524
532
operator = PhysicalJoinUtils .reverse (operator );
525
533
526
534
PhysicalTask <BatchStream > leftTask = fetchAsync (operator .getSourceA (), context );
@@ -539,6 +547,10 @@ public PhysicalTask<?> construct(SingleJoin operator, RequestContext context) {
539
547
return constructRow (operator , context );
540
548
}
541
549
550
+ if (UDFDetector .containNonSystemFunction (operator .getFilter ())) {
551
+ return constructRow (operator , context );
552
+ }
553
+
542
554
operator = PhysicalJoinUtils .reverse (operator );
543
555
544
556
PhysicalTask <BatchStream > leftTask = fetchAsync (operator .getSourceA (), context );
0 commit comments