From cbfc88ea9c609ef6bf1e296d6fd6f1ad786989cd Mon Sep 17 00:00:00 2001 From: Himanshu Mishra Date: Mon, 9 Sep 2024 13:53:13 +0530 Subject: [PATCH] HIVE-28480: Disable SMB on partition hash generator mismatch across join branches in previous RS (Himanshu Mishra, reviewed by Krisztian Kasa, Shohei Okumiya) --- .../hadoop/hive/ql/exec/OperatorUtils.java | 17 + .../hive/ql/optimizer/ConvertJoinMapJoin.java | 32 +- .../clientpositive/auto_sortmerge_join_18.q | 39 +++ .../llap/auto_sortmerge_join_18.q.out | 325 ++++++++++++++++++ 4 files changed, 405 insertions(+), 8 deletions(-) create mode 100644 ql/src/test/queries/clientpositive/auto_sortmerge_join_18.q create mode 100644 ql/src/test/results/clientpositive/llap/auto_sortmerge_join_18.q.out diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java index eb10e9aa6cdd..31400a2903b5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java @@ -30,6 +30,7 @@ import java.util.Set; import java.util.Stack; +import com.google.common.base.Preconditions; import org.apache.hadoop.hive.ql.exec.NodeUtils.Function; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.SemiJoinBranchInfo; @@ -698,4 +699,20 @@ public static Set> getAllFetchOperators(FetchTask task) { return getAllOperatorsForSimpleFetch(operatorList); } + /** + * Traverses the operator chain upwards to find input source operator in that branch + * + * @param op the starting operator + * @return the first matching operator or null if none found + */ + public static Operator findSourceOperatorInSameBranch(Operator op) { + while (op != null && !(op instanceof TableScanOperator || op instanceof ReduceSinkOperator + || op instanceof CommonJoinOperator)) { + // If op has parents it is guaranteed to be 1. + List> parents = op.getParentOperators(); + Preconditions.checkState(parents.size() <= 1); + op = parents.size() == 1 ? parents.get(0) : null; + } + return op; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java index 87b3820f6735..3e63c3d297e2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java @@ -68,6 +68,7 @@ import org.apache.hadoop.hive.ql.plan.MapJoinDesc; import org.apache.hadoop.hive.ql.plan.OpTraits; import org.apache.hadoop.hive.ql.plan.OperatorDesc; +import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc; import org.apache.hadoop.hive.ql.plan.Statistics; import org.apache.hadoop.hive.ql.stats.StatsUtils; import org.apache.hadoop.hive.ql.util.JavaDataModel; @@ -849,14 +850,7 @@ private boolean checkConvertJoinSMBJoin(JoinOperator joinOp, OptimizeTezProcCont for (Operator parentOp : joinOp.getParentOperators()) { // Check if the parent is coming from a table scan, if so, what is the version of it. assert parentOp.getParentOperators() != null && parentOp.getParentOperators().size() == 1; - Operator op = parentOp; - while (op != null && !(op instanceof TableScanOperator || op instanceof ReduceSinkOperator - || op instanceof CommonJoinOperator)) { - // If op has parents it is guaranteed to be 1. - List> parents = op.getParentOperators(); - Preconditions.checkState(parents.size() == 0 || parents.size() == 1); - op = parents.size() == 1 ? parents.get(0) : null; - } + Operator op = OperatorUtils.findSourceOperatorInSameBranch(parentOp); if (op instanceof TableScanOperator) { int localVersion = ((TableScanOperator) op).getConf().getTableMetadata().getBucketingVersion(); @@ -870,6 +864,28 @@ private boolean checkConvertJoinSMBJoin(JoinOperator joinOp, OptimizeTezProcCont } } + /* As SMB replaces last RS op from the joining branches and the JOIN op with MERGEJOIN, we need to ensure + * the RS before these RS, in both branches, are partitioning using same hash generator. It + * differs depending on ReducerTraits.UNIFORM i.e. ReduceSinkOperator#computeMurmurHash or + * ReduceSinkOperator#computeHashCode, leading to different code for same value. Skip SMB join in such cases. + */ + Boolean prevRsHasUniformTrait = null; + for (Operator parentOp : joinOp.getParentOperators()) { + // Assertion of mandatory single parent is already being done in bucket version check earlier + Operator op = OperatorUtils.findSourceOperatorInSameBranch(parentOp.getParentOperators().get(0)); + + if (op instanceof ReduceSinkOperator) { + boolean hasUniformTrait = ((ReduceSinkOperator) op).getConf() + .getReducerTraits().contains(ReduceSinkDesc.ReducerTraits.UNIFORM); + if (prevRsHasUniformTrait == null) { + prevRsHasUniformTrait = hasUniformTrait; + } else if (prevRsHasUniformTrait != hasUniformTrait) { + LOG.debug("SMB Join can't be performed due to partition hash generator mismatch across join branches"); + return false; + } + } + } + LOG.info("We can convert the join to an SMB join."); return true; } diff --git a/ql/src/test/queries/clientpositive/auto_sortmerge_join_18.q b/ql/src/test/queries/clientpositive/auto_sortmerge_join_18.q new file mode 100644 index 000000000000..50fa316269d1 --- /dev/null +++ b/ql/src/test/queries/clientpositive/auto_sortmerge_join_18.q @@ -0,0 +1,39 @@ +CREATE TABLE t_asj_18 (k STRING, v INT); +INSERT INTO t_asj_18 values ('a', 10), ('a', 10); + +set hive.auto.convert.join=false; +set hive.tez.auto.reducer.parallelism=true; + +EXPLAIN SELECT * FROM ( + SELECT k, COUNT(DISTINCT v), SUM(v) + FROM t_asj_18 GROUP BY k +) a LEFT JOIN ( + SELECT k, COUNT(v) + FROM t_asj_18 GROUP BY k +) b ON a.k = b.k; + +SELECT * FROM ( + SELECT k, COUNT(DISTINCT v), SUM(v) + FROM t_asj_18 GROUP BY k +) a LEFT JOIN ( + SELECT k, COUNT(v) + FROM t_asj_18 GROUP BY k +) b ON a.k = b.k; + +set hive.optimize.distinct.rewrite=false; + +EXPLAIN SELECT * FROM ( + SELECT k, COUNT(DISTINCT v) + FROM t_asj_18 GROUP BY k +) a LEFT JOIN ( + SELECT k, COUNT(v) + FROM t_asj_18 GROUP BY k +) b ON a.k = b.k; + +SELECT * FROM ( + SELECT k, COUNT(DISTINCT v) + FROM t_asj_18 GROUP BY k +) a LEFT JOIN ( + SELECT k, COUNT(v) + FROM t_asj_18 GROUP BY k +) b ON a.k = b.k; \ No newline at end of file diff --git a/ql/src/test/results/clientpositive/llap/auto_sortmerge_join_18.q.out b/ql/src/test/results/clientpositive/llap/auto_sortmerge_join_18.q.out new file mode 100644 index 000000000000..69061fcb46c9 --- /dev/null +++ b/ql/src/test/results/clientpositive/llap/auto_sortmerge_join_18.q.out @@ -0,0 +1,325 @@ +PREHOOK: query: CREATE TABLE t_asj_18 (k STRING, v INT) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@t_asj_18 +POSTHOOK: query: CREATE TABLE t_asj_18 (k STRING, v INT) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@t_asj_18 +PREHOOK: query: INSERT INTO t_asj_18 values ('a', 10), ('a', 10) +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@t_asj_18 +POSTHOOK: query: INSERT INTO t_asj_18 values ('a', 10), ('a', 10) +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@t_asj_18 +POSTHOOK: Lineage: t_asj_18.k SCRIPT [] +POSTHOOK: Lineage: t_asj_18.v SCRIPT [] +PREHOOK: query: EXPLAIN SELECT * FROM ( + SELECT k, COUNT(DISTINCT v), SUM(v) + FROM t_asj_18 GROUP BY k +) a LEFT JOIN ( + SELECT k, COUNT(v) + FROM t_asj_18 GROUP BY k +) b ON a.k = b.k +PREHOOK: type: QUERY +PREHOOK: Input: default@t_asj_18 +#### A masked pattern was here #### +POSTHOOK: query: EXPLAIN SELECT * FROM ( + SELECT k, COUNT(DISTINCT v), SUM(v) + FROM t_asj_18 GROUP BY k +) a LEFT JOIN ( + SELECT k, COUNT(v) + FROM t_asj_18 GROUP BY k +) b ON a.k = b.k +POSTHOOK: type: QUERY +POSTHOOK: Input: default@t_asj_18 +#### A masked pattern was here #### +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez +#### A masked pattern was here #### + Edges: + Reducer 2 <- Map 1 (SIMPLE_EDGE) + Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 4 (SIMPLE_EDGE) + Reducer 4 <- Map 1 (SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: t_asj_18 + Statistics: Num rows: 2 Data size: 178 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: k (type: string), v (type: int) + outputColumnNames: k, v + Statistics: Num rows: 2 Data size: 178 Basic stats: COMPLETE Column stats: COMPLETE + Group By Operator + aggregations: count(DISTINCT v), sum(v) + keys: k (type: string), v (type: int) + minReductionHashAggr: 0.5 + mode: hash + outputColumnNames: _col0, _col1, _col2, _col3 + Statistics: Num rows: 1 Data size: 105 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + key expressions: _col0 (type: string), _col1 (type: int) + null sort order: zz + sort order: ++ + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 1 Data size: 105 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col3 (type: bigint) + Filter Operator + predicate: k is not null (type: boolean) + Statistics: Num rows: 2 Data size: 178 Basic stats: COMPLETE Column stats: COMPLETE + Group By Operator + aggregations: count(v) + keys: k (type: string) + minReductionHashAggr: 0.5 + mode: hash + outputColumnNames: _col0, _col1 + Statistics: Num rows: 1 Data size: 93 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + key expressions: _col0 (type: string) + null sort order: z + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 1 Data size: 93 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col1 (type: bigint) + Execution mode: vectorized, llap + LLAP IO: all inputs + Reducer 2 + Execution mode: llap + Reduce Operator Tree: + Group By Operator + aggregations: count(DISTINCT KEY._col1:0._col0), sum(VALUE._col1) + keys: KEY._col0 (type: string) + mode: mergepartial + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 1 Data size: 101 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + key expressions: _col0 (type: string) + null sort order: z + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 1 Data size: 101 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col1 (type: bigint), _col2 (type: bigint) + Reducer 3 + Execution mode: llap + Reduce Operator Tree: + Merge Join Operator + condition map: + Left Outer Join 0 to 1 + keys: + 0 _col0 (type: string) + 1 _col0 (type: string) + outputColumnNames: _col0, _col1, _col2, _col3, _col4 + Statistics: Num rows: 1 Data size: 194 Basic stats: COMPLETE Column stats: COMPLETE + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 194 Basic stats: COMPLETE Column stats: COMPLETE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + Reducer 4 + Execution mode: vectorized, llap + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + keys: KEY._col0 (type: string) + mode: mergepartial + outputColumnNames: _col0, _col1 + Statistics: Num rows: 1 Data size: 93 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + key expressions: _col0 (type: string) + null sort order: z + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 1 Data size: 93 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col1 (type: bigint) + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: SELECT * FROM ( + SELECT k, COUNT(DISTINCT v), SUM(v) + FROM t_asj_18 GROUP BY k +) a LEFT JOIN ( + SELECT k, COUNT(v) + FROM t_asj_18 GROUP BY k +) b ON a.k = b.k +PREHOOK: type: QUERY +PREHOOK: Input: default@t_asj_18 +#### A masked pattern was here #### +POSTHOOK: query: SELECT * FROM ( + SELECT k, COUNT(DISTINCT v), SUM(v) + FROM t_asj_18 GROUP BY k +) a LEFT JOIN ( + SELECT k, COUNT(v) + FROM t_asj_18 GROUP BY k +) b ON a.k = b.k +POSTHOOK: type: QUERY +POSTHOOK: Input: default@t_asj_18 +#### A masked pattern was here #### +a 1 20 a 2 +PREHOOK: query: EXPLAIN SELECT * FROM ( + SELECT k, COUNT(DISTINCT v) + FROM t_asj_18 GROUP BY k +) a LEFT JOIN ( + SELECT k, COUNT(v) + FROM t_asj_18 GROUP BY k +) b ON a.k = b.k +PREHOOK: type: QUERY +PREHOOK: Input: default@t_asj_18 +#### A masked pattern was here #### +POSTHOOK: query: EXPLAIN SELECT * FROM ( + SELECT k, COUNT(DISTINCT v) + FROM t_asj_18 GROUP BY k +) a LEFT JOIN ( + SELECT k, COUNT(v) + FROM t_asj_18 GROUP BY k +) b ON a.k = b.k +POSTHOOK: type: QUERY +POSTHOOK: Input: default@t_asj_18 +#### A masked pattern was here #### +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez +#### A masked pattern was here #### + Edges: + Reducer 2 <- Map 1 (SIMPLE_EDGE) + Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 4 (SIMPLE_EDGE) + Reducer 4 <- Map 1 (SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: t_asj_18 + Statistics: Num rows: 2 Data size: 178 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: k (type: string), v (type: int) + outputColumnNames: k, v + Statistics: Num rows: 2 Data size: 178 Basic stats: COMPLETE Column stats: COMPLETE + Group By Operator + aggregations: count(DISTINCT v) + keys: k (type: string), v (type: int) + minReductionHashAggr: 0.5 + mode: hash + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 1 Data size: 97 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + key expressions: _col0 (type: string), _col1 (type: int) + null sort order: zz + sort order: ++ + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 1 Data size: 97 Basic stats: COMPLETE Column stats: COMPLETE + Filter Operator + predicate: k is not null (type: boolean) + Statistics: Num rows: 2 Data size: 178 Basic stats: COMPLETE Column stats: COMPLETE + Group By Operator + aggregations: count(v) + keys: k (type: string) + minReductionHashAggr: 0.5 + mode: hash + outputColumnNames: _col0, _col1 + Statistics: Num rows: 1 Data size: 93 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + key expressions: _col0 (type: string) + null sort order: z + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 1 Data size: 93 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col1 (type: bigint) + Execution mode: vectorized, llap + LLAP IO: all inputs + Reducer 2 + Execution mode: llap + Reduce Operator Tree: + Group By Operator + aggregations: count(DISTINCT KEY._col1:0._col0) + keys: KEY._col0 (type: string) + mode: mergepartial + outputColumnNames: _col0, _col1 + Statistics: Num rows: 1 Data size: 93 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + key expressions: _col0 (type: string) + null sort order: z + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 1 Data size: 93 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col1 (type: bigint) + Reducer 3 + Execution mode: llap + Reduce Operator Tree: + Merge Join Operator + condition map: + Left Outer Join 0 to 1 + keys: + 0 _col0 (type: string) + 1 _col0 (type: string) + outputColumnNames: _col0, _col1, _col2, _col3 + Statistics: Num rows: 1 Data size: 186 Basic stats: COMPLETE Column stats: COMPLETE + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 186 Basic stats: COMPLETE Column stats: COMPLETE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + Reducer 4 + Execution mode: vectorized, llap + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + keys: KEY._col0 (type: string) + mode: mergepartial + outputColumnNames: _col0, _col1 + Statistics: Num rows: 1 Data size: 93 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + key expressions: _col0 (type: string) + null sort order: z + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 1 Data size: 93 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col1 (type: bigint) + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: SELECT * FROM ( + SELECT k, COUNT(DISTINCT v) + FROM t_asj_18 GROUP BY k +) a LEFT JOIN ( + SELECT k, COUNT(v) + FROM t_asj_18 GROUP BY k +) b ON a.k = b.k +PREHOOK: type: QUERY +PREHOOK: Input: default@t_asj_18 +#### A masked pattern was here #### +POSTHOOK: query: SELECT * FROM ( + SELECT k, COUNT(DISTINCT v) + FROM t_asj_18 GROUP BY k +) a LEFT JOIN ( + SELECT k, COUNT(v) + FROM t_asj_18 GROUP BY k +) b ON a.k = b.k +POSTHOOK: type: QUERY +POSTHOOK: Input: default@t_asj_18 +#### A masked pattern was here #### +a 1 a 2