Skip to content

Commit

Permalink
HIVE-28480: Disable SMB on partition hash generator mismatch across j…
Browse files Browse the repository at this point in the history
…oin branches in previous RS (Himanshu Mishra, reviewed by Krisztian Kasa, Shohei Okumiya)
  • Loading branch information
himanshu-mishra authored Sep 9, 2024
1 parent 52954f2 commit cbfc88e
Show file tree
Hide file tree
Showing 4 changed files with 405 additions and 8 deletions.
17 changes: 17 additions & 0 deletions ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.Set;
import java.util.Stack;

import com.google.common.base.Preconditions;
import org.apache.hadoop.hive.ql.exec.NodeUtils.Function;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.parse.SemiJoinBranchInfo;
Expand Down Expand Up @@ -698,4 +699,20 @@ public static Set<Operator<?>> getAllFetchOperators(FetchTask task) {
return getAllOperatorsForSimpleFetch(operatorList);
}

/**
* Traverses the operator chain upwards to find input source operator in that branch
*
* @param op the starting operator
* @return the first matching operator or null if none found
*/
public static Operator<?> findSourceOperatorInSameBranch(Operator<?> op) {
while (op != null && !(op instanceof TableScanOperator || op instanceof ReduceSinkOperator
|| op instanceof CommonJoinOperator)) {
// If op has parents it is guaranteed to be 1.
List<Operator<?>> parents = op.getParentOperators();
Preconditions.checkState(parents.size() <= 1);
op = parents.size() == 1 ? parents.get(0) : null;
}
return op;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
import org.apache.hadoop.hive.ql.plan.OpTraits;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
import org.apache.hadoop.hive.ql.plan.Statistics;
import org.apache.hadoop.hive.ql.stats.StatsUtils;
import org.apache.hadoop.hive.ql.util.JavaDataModel;
Expand Down Expand Up @@ -849,14 +850,7 @@ private boolean checkConvertJoinSMBJoin(JoinOperator joinOp, OptimizeTezProcCont
for (Operator<? extends OperatorDesc> parentOp : joinOp.getParentOperators()) {
// Check if the parent is coming from a table scan, if so, what is the version of it.
assert parentOp.getParentOperators() != null && parentOp.getParentOperators().size() == 1;
Operator<?> op = parentOp;
while (op != null && !(op instanceof TableScanOperator || op instanceof ReduceSinkOperator
|| op instanceof CommonJoinOperator)) {
// If op has parents it is guaranteed to be 1.
List<Operator<?>> parents = op.getParentOperators();
Preconditions.checkState(parents.size() == 0 || parents.size() == 1);
op = parents.size() == 1 ? parents.get(0) : null;
}
Operator<?> op = OperatorUtils.findSourceOperatorInSameBranch(parentOp);

if (op instanceof TableScanOperator) {
int localVersion = ((TableScanOperator) op).getConf().getTableMetadata().getBucketingVersion();
Expand All @@ -870,6 +864,28 @@ private boolean checkConvertJoinSMBJoin(JoinOperator joinOp, OptimizeTezProcCont
}
}

/* As SMB replaces last RS op from the joining branches and the JOIN op with MERGEJOIN, we need to ensure
* the RS before these RS, in both branches, are partitioning using same hash generator. It
* differs depending on ReducerTraits.UNIFORM i.e. ReduceSinkOperator#computeMurmurHash or
* ReduceSinkOperator#computeHashCode, leading to different code for same value. Skip SMB join in such cases.
*/
Boolean prevRsHasUniformTrait = null;
for (Operator<? extends OperatorDesc> parentOp : joinOp.getParentOperators()) {
// Assertion of mandatory single parent is already being done in bucket version check earlier
Operator<?> op = OperatorUtils.findSourceOperatorInSameBranch(parentOp.getParentOperators().get(0));

if (op instanceof ReduceSinkOperator) {
boolean hasUniformTrait = ((ReduceSinkOperator) op).getConf()
.getReducerTraits().contains(ReduceSinkDesc.ReducerTraits.UNIFORM);
if (prevRsHasUniformTrait == null) {
prevRsHasUniformTrait = hasUniformTrait;
} else if (prevRsHasUniformTrait != hasUniformTrait) {
LOG.debug("SMB Join can't be performed due to partition hash generator mismatch across join branches");
return false;
}
}
}

LOG.info("We can convert the join to an SMB join.");
return true;
}
Expand Down
39 changes: 39 additions & 0 deletions ql/src/test/queries/clientpositive/auto_sortmerge_join_18.q
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
CREATE TABLE t_asj_18 (k STRING, v INT);
INSERT INTO t_asj_18 values ('a', 10), ('a', 10);

set hive.auto.convert.join=false;
set hive.tez.auto.reducer.parallelism=true;

EXPLAIN SELECT * FROM (
SELECT k, COUNT(DISTINCT v), SUM(v)
FROM t_asj_18 GROUP BY k
) a LEFT JOIN (
SELECT k, COUNT(v)
FROM t_asj_18 GROUP BY k
) b ON a.k = b.k;

SELECT * FROM (
SELECT k, COUNT(DISTINCT v), SUM(v)
FROM t_asj_18 GROUP BY k
) a LEFT JOIN (
SELECT k, COUNT(v)
FROM t_asj_18 GROUP BY k
) b ON a.k = b.k;

set hive.optimize.distinct.rewrite=false;

EXPLAIN SELECT * FROM (
SELECT k, COUNT(DISTINCT v)
FROM t_asj_18 GROUP BY k
) a LEFT JOIN (
SELECT k, COUNT(v)
FROM t_asj_18 GROUP BY k
) b ON a.k = b.k;

SELECT * FROM (
SELECT k, COUNT(DISTINCT v)
FROM t_asj_18 GROUP BY k
) a LEFT JOIN (
SELECT k, COUNT(v)
FROM t_asj_18 GROUP BY k
) b ON a.k = b.k;
Loading

0 comments on commit cbfc88e

Please sign in to comment.