Skip to content

Commit 88261fb

Browse files
authored
[fix](planner) fix legacy planner compute wrong result when parallel_pipeline_task_num =1 (#56312)
### What problem does this PR solve? fix legacy planner compute wrong result when parallel_pipeline_task_num = 1 ### Check List (For Author) - Test <!-- At least one of them must be included. --> - [x] Regression test - [ ] Unit Test - [ ] Manual test (add detailed scripts or steps below) - [ ] No need to test or manual test. Explain why: - [ ] This is a refactor/code format and no logic has been changed. - [ ] Previous test can cover this change. - [ ] No code files have been changed. - [ ] Other reason <!-- Add your reason? --> - Behavior changed: - [x] No. - [ ] Yes. <!-- Explain the behavior change --> - Does this need documentation? - [x] No. - [ ] Yes. <!-- Add document PR link here. eg: apache/doris-website#1214 --> ### Check List (For Reviewer who merge this PR) - [ ] Confirm the release note - [ ] Confirm test cases - [ ] Confirm document - [ ] Add branch pick label <!-- Add branch pick label that this PR should merge into -->
1 parent 563d2f9 commit 88261fb

File tree

4 files changed

+91
-2
lines changed

4 files changed

+91
-2
lines changed

fe/fe-core/src/main/java/org/apache/doris/common/TreeNode.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -251,4 +251,14 @@ public boolean anyMatch(Predicate<TreeNode<? extends NodeType>> func) {
251251
return false;
252252
}
253253

254+
/** foreachDown */
255+
public void foreachDown(Predicate<TreeNode<NodeType>> visitor) {
256+
if (!visitor.test(this)) {
257+
return;
258+
}
259+
260+
for (TreeNode<NodeType> child : getChildren()) {
261+
child.foreachDown(visitor);
262+
}
263+
}
254264
}

fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -236,8 +236,14 @@ private PlanFragment createPlanFragments(
236236
// move 'result' to end, it depends on all of its children
237237
fragments.remove(result);
238238
fragments.add(result);
239-
if ((!isPartitioned && result.isPartitioned() && result.getPlanRoot().getNumInstances() > 1)
240-
|| (!(root instanceof SortNode) && root.hasOffset())) {
239+
List<ScanNode> scanNodes = result.getPlanRoot().collectInCurrentFragment(p -> p instanceof ScanNode);
240+
int scanRangeNum = 0;
241+
for (ScanNode scanNode : scanNodes) {
242+
scanRangeNum += scanNode.getScanRangeLocations(0).size();
243+
}
244+
boolean inputFragmentHasMultiInstances = !isPartitioned && result.isPartitioned()
245+
&& (result.getPlanRoot().getNumInstances() > 1 || scanRangeNum > 1);
246+
if (inputFragmentHasMultiInstances || (!(root instanceof SortNode) && root.hasOffset())) {
241247
result = createMergeFragment(result);
242248
fragments.add(result);
243249
}

fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,8 @@
6262
import java.util.List;
6363
import java.util.Map;
6464
import java.util.Set;
65+
import java.util.function.Consumer;
66+
import java.util.function.Predicate;
6567
import java.util.stream.Collectors;
6668

6769
/**
@@ -1271,4 +1273,27 @@ public void addIntermediateOutputTupleDescList(TupleDescriptor tupleDescriptor)
12711273
public void addIntermediateProjectList(List<Expr> exprs) {
12721274
intermediateProjectListList.add(exprs);
12731275
}
1276+
1277+
public <T extends PlanNode> List<T> collectInCurrentFragment(Predicate<PlanNode> predicate) {
1278+
List<PlanNode> result = Lists.newArrayList();
1279+
foreachDownInCurrentFragment(child -> {
1280+
if (predicate.test(child)) {
1281+
result.add(child);
1282+
}
1283+
});
1284+
return (List) result;
1285+
}
1286+
1287+
/** foreachDownInCurrentFragment */
1288+
public void foreachDownInCurrentFragment(Consumer<PlanNode> visitor) {
1289+
int currentFragmentId = getFragmentId().asInt();
1290+
foreachDown(child -> {
1291+
PlanNode childNode = (PlanNode) child;
1292+
if (childNode.getFragmentId().asInt() != currentFragmentId) {
1293+
return false;
1294+
}
1295+
visitor.accept(childNode);
1296+
return true;
1297+
});
1298+
}
12741299
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
suite("not_equal") {
19+
multi_sql """
20+
drop table if exists random_tbl_test;
21+
CREATE TABLE `random_tbl_test` (
22+
`id` int NULL
23+
) ENGINE=OLAP
24+
DISTRIBUTED BY RANDOM BUCKETS 10
25+
PROPERTIES (
26+
"replication_allocation" = "tag.location.default: 1"
27+
);
28+
29+
insert into random_tbl_test select * from numbers('number'='100');
30+
31+
set enable_nereids_planner=false;
32+
set parallel_pipeline_task_num=1;
33+
set enable_pipeline_engine=true;
34+
set enable_pipeline_x_engine=false;
35+
set enable_shared_scan=true;"""
36+
37+
explain {
38+
sql "select * from random_tbl_test where 1 ! = 2"
39+
check { String explainStr ->
40+
assertEquals(2, explainStr.count("PLAN FRAGMENT"))
41+
}
42+
}
43+
44+
test {
45+
sql "select * from random_tbl_test where 1 ! = 2"
46+
rowNum(100)
47+
}
48+
}

0 commit comments

Comments
 (0)