Skip to content

Commit d24e8fa

Browse files
authored
fix: UdfIT (IGinX-THU#592)
1 parent 5b150a8 commit d24e8fa

File tree

4 files changed

+26
-8
lines changed

4 files changed

+26
-8
lines changed

core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/task/memory/StreamSourceMemoryPhysicalTask.java

+10-4
Original file line numberDiff line numberDiff line change
@@ -22,23 +22,29 @@
2222
import cn.edu.tsinghua.iginx.engine.physical.exception.PhysicalException;
2323
import cn.edu.tsinghua.iginx.engine.physical.memory.execute.executor.util.Batch;
2424
import cn.edu.tsinghua.iginx.engine.physical.task.TaskResult;
25+
import cn.edu.tsinghua.iginx.engine.physical.task.utils.PhysicalSupplier;
2526
import cn.edu.tsinghua.iginx.engine.shared.RequestContext;
2627
import cn.edu.tsinghua.iginx.engine.shared.data.read.BatchSchema;
2728
import cn.edu.tsinghua.iginx.engine.shared.data.read.BatchStream;
2829
import java.util.Objects;
2930

3031
public class StreamSourceMemoryPhysicalTask extends SourceMemoryPhysicalTask {
3132

32-
private final BatchStream stream;
33+
private final PhysicalSupplier<BatchStream> supplier;
3334

34-
public StreamSourceMemoryPhysicalTask(RequestContext context, Object info, BatchStream stream) {
35+
public StreamSourceMemoryPhysicalTask(
36+
RequestContext context, Object info, PhysicalSupplier<BatchStream> supplier) {
3537
super(context, info);
36-
this.stream = Objects.requireNonNull(stream);
38+
this.supplier = Objects.requireNonNull(supplier);
3739
}
3840

3941
@Override
4042
public TaskResult<BatchStream> execute() {
41-
return new TaskResult<>(new StreamWrapper(stream));
43+
try {
44+
return new TaskResult<>(new StreamWrapper(supplier.get()));
45+
} catch (PhysicalException e) {
46+
return new TaskResult<>(e);
47+
}
4248
}
4349

4450
private class StreamWrapper implements BatchStream {

core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/task/memory/parallel/ParallelPipelineMemoryPhysicalTask.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ private BatchStream compute(BatchStream previous) {
106106
private GatherMemoryPhysicalTask constructPipeline(
107107
RequestContext context, BatchStream stream, ScatterGatherBatchStream outputStream) {
108108
MemoryPhysicalTask<BatchStream> source =
109-
new StreamSourceMemoryPhysicalTask(context, "Parallel Pipeline Source", stream);
109+
new StreamSourceMemoryPhysicalTask(context, "Parallel Pipeline Source", () -> stream);
110110
PipelineMemoryPhysicalTask pipeline = pipelineFactory.createPipeline(context, source);
111111
GatherMemoryPhysicalTask gather = new GatherMemoryPhysicalTask(context, pipeline, outputStream);
112112

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package cn.edu.tsinghua.iginx.engine.physical.task.utils;
2+
3+
import cn.edu.tsinghua.iginx.engine.physical.exception.PhysicalException;
4+
5+
public interface PhysicalSupplier<T extends PhysicalCloseable> {
6+
T get() throws PhysicalException;
7+
}

optimizer/src/main/java/cn/edu/tsinghua/iginx/physical/optimizer/naive/NaivePhysicalPlanner.java

+8-3
Original file line numberDiff line numberDiff line change
@@ -152,8 +152,7 @@ public PhysicalTask<?> fetch(Source source, RequestContext context) {
152152
}
153153
}
154154

155-
public PhysicalTask<BatchStream> construct(
156-
List<Expression> constantExpressions, RequestContext context) {
155+
public PhysicalTask<?> construct(List<Expression> constantExpressions, RequestContext context) {
157156
Preconditions.checkArgument(
158157
constantExpressions.stream()
159158
.allMatch(cn.edu.tsinghua.iginx.sql.utils.ExpressionUtils::isConstantArithmeticExpr));
@@ -162,7 +161,7 @@ public PhysicalTask<BatchStream> construct(
162161
new StreamSourceMemoryPhysicalTask(
163162
context,
164163
"Produce 1 emtpy row to calculate constant values",
165-
BatchStreams.nonColumn(context.getBatchRowCount(), 1));
164+
() -> BatchStreams.nonColumn(context.getBatchRowCount(), 1));
166165

167166
List<FunctionCall> functionCalls = new ArrayList<>();
168167
for (Expression expression : constantExpressions) {
@@ -172,6 +171,12 @@ public PhysicalTask<BatchStream> construct(
172171

173172
RowTransform rowTransform = new RowTransform(EmptySource.EMPTY_SOURCE, functionCalls);
174173

174+
if (rowTransform.getFunctionCallList().stream()
175+
.anyMatch(UDFDetector::containNonSystemFunction)) {
176+
return new UnaryRowMemoryPhysicalTask(
177+
convert(source, context, RowStream.class), rowTransform, context);
178+
}
179+
175180
return new PipelineMemoryPhysicalTask(
176181
source,
177182
Collections.singletonList(rowTransform),

0 commit comments

Comments
 (0)