diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java index 9ece5d1dfdaa..938ebfc55254 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java @@ -245,6 +245,13 @@ protected void initLoggingPrefix(String className) { // that appear in the small table portion of the join output. protected transient VectorCopyRow outerSmallTableKeyVectorCopy; + /** + * Helper to copy BigTable keys to SmallTable value output columns. + * This is initialized in {@link #initializeOp(Configuration)} and used + * in {@link VectorMapJoinGenerateResultOperator}. + */ + protected transient VectorCopyRow bigTableKeyToSmallTableValueCopy; + // This helper object deserializes LazyBinary format small table values into columns of a row // in a vectorized row batch. protected transient VectorDeserializeRow smallTableValueVectorDeserializeRow; @@ -596,6 +603,16 @@ protected void initializeOp(Configuration hconf) throws HiveException { outerSmallTableKeyVectorCopy.init(outerSmallTableKeyMapping); } + // Checks if the plan is projecting values from the SmallTable + // AND if the number of projected values matches the number of join keys. + if (smallTableValueMapping.getCount() > 0 && + smallTableValueMapping.getCount() == bigTableKeyColumnMap.length) { + bigTableKeyToSmallTableValueCopy = new VectorCopyRow(); + bigTableKeyToSmallTableValueCopy.init( + bigTableKeyColumnMap, smallTableValueMapping.getOutputColumns(), bigTableKeyTypeInfos + ); + } + /* * Setup the overflow batch. */ diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java index 6e6d37ae7d04..ba172e51773c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java @@ -214,10 +214,7 @@ protected int generateHashMapResultSingleValue(VectorizedRowBatch batch, batch, batchIndex); } - if (smallTableValueVectorDeserializeRow != null) { - doSmallTableValueDeserializeRow(batch, batchIndex, - byteSegmentRef, hashMapResult); - } + updateBatchWithSmallTableValue(batch, hashMapResult, batchIndex, byteSegmentRef); // Use the big table row as output. batch.selected[numSel++] = batchIndex; @@ -226,6 +223,29 @@ protected int generateHashMapResultSingleValue(VectorizedRowBatch batch, return numSel; } + private void updateBatchWithSmallTableValue( + VectorizedRowBatch batch, VectorMapJoinHashMapResult hashMapResult, int batchIndex, ByteSegmentRef byteSegmentRef) + throws HiveException { + + // Check if the small table value is empty. + boolean isSmallTableValueEmpty = byteSegmentRef.getLength() == 0; + + // INNER join where only the small table key is projected. + // The hash map value for that key is empty, and deserializing it would produce NULLs. + // Instead, copy the big table key into the small table value position. + // This preserves the joining key in the output without introducing NULLs. + // (Applied only when bigTableKeyToSmallTableValueCopy is available.) + if (isSmallTableValueEmpty && bigTableKeyToSmallTableValueCopy != null) { + bigTableKeyToSmallTableValueCopy.copyByValue( + batch, batchIndex, // from big table key area + batch, batchIndex // to small table value area + ); + } else if (smallTableValueVectorDeserializeRow != null) { + doSmallTableValueDeserializeRow(batch, batchIndex, + byteSegmentRef, hashMapResult); + } + } + /** * Generate results for a N x M cross product. * @@ -295,11 +315,7 @@ protected void generateHashMapResultMultiValue(VectorizedRowBatch batch, overflowBatch, overflowBatch.size); } - if (smallTableValueVectorDeserializeRow != null) { - - doSmallTableValueDeserializeRow(overflowBatch, overflowBatch.size, - byteSegmentRef, hashMapResult); - } + updateBatchWithSmallTableValue(overflowBatch, hashMapResult, overflowBatch.size, byteSegmentRef); overflowBatch.size++; if (overflowBatch.size == overflowBatch.DEFAULT_SIZE) { @@ -339,12 +355,7 @@ private void generateHashMapResultLargeMultiValue(VectorizedRowBatch batch, // Fill up as much of the overflow batch as possible with small table values. while (byteSegmentRef != null) { - - if (smallTableValueVectorDeserializeRow != null) { - doSmallTableValueDeserializeRow(overflowBatch, overflowBatch.size, - byteSegmentRef, hashMapResult); - } - + updateBatchWithSmallTableValue(overflowBatch, hashMapResult, overflowBatch.size, byteSegmentRef); overflowBatch.size++; if (overflowBatch.size == overflowBatch.DEFAULT_SIZE) { break; diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/mapjoin/MapJoinTestConfig.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/mapjoin/MapJoinTestConfig.java index e4674d81efc5..a448bf61255f 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/mapjoin/MapJoinTestConfig.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/mapjoin/MapJoinTestConfig.java @@ -38,10 +38,8 @@ import org.apache.hadoop.hive.ql.exec.persistence.MapJoinObjectSerDeContext; import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer; import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainerSerDe; -import org.apache.hadoop.hive.ql.exec.util.collectoroperator.CollectorTestOperator; import org.apache.hadoop.hive.ql.exec.util.collectoroperator.CountCollectorTestOperator; import org.apache.hadoop.hive.ql.exec.util.collectoroperator.RowCollectorTestOperator; -import org.apache.hadoop.hive.ql.exec.util.collectoroperator.RowCollectorTestOperatorBase; import org.apache.hadoop.hive.ql.exec.util.collectoroperator.RowVectorCollectorTestOperator; import org.apache.hadoop.hive.ql.exec.util.rowobjects.RowTestObjects; import org.apache.hadoop.hive.ql.exec.util.rowobjects.RowTestObjectsMultiSet; @@ -53,6 +51,7 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext; import org.apache.hadoop.hive.ql.exec.vector.VectorizationContextRegion; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; +import org.apache.hadoop.hive.ql.exec.vector.mapjoin.MapJoinTestDescription.SmallTableGenerationParameters.ValueOption; import org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast.VectorMapJoinFastTableContainer; import org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast.VerifyFastRow; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -80,10 +79,6 @@ import org.apache.hadoop.hive.serde2.binarysortable.fast.BinarySortableSerializeWrite; import org.apache.hadoop.hive.serde2.lazybinary.fast.LazyBinarySerializeWrite; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; -import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hive.common.util.ReflectionUtil; @@ -776,7 +771,7 @@ private static void loadTableContainerData(MapJoinTestDescription testDesc, MapJ keyBytesWritable.set(keyOutput.getData(), 0, keyOutput.getLength()); - if (valueRow == null) { + if (valueRow == null || ValueOption.EMPTY_VALUE == testDesc.smallTableGenerationParameters.getValueOption()) { // Empty value. mapJoinTableContainer.putRow(keyBytesWritable, valueBytesWritable); } else { diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/mapjoin/MapJoinTestData.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/mapjoin/MapJoinTestData.java index 81cc1b63c285..9d3a13bca4fc 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/mapjoin/MapJoinTestData.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/mapjoin/MapJoinTestData.java @@ -297,14 +297,25 @@ public static void generateVariationData(MapJoinTestData testData, private static RowTestObjects generateRandomSmallTableValueRow(MapJoinTestDescription testDesc, Random random) { + final ValueOption valueOption = testDesc.getSmallTableGenerationParameters().getValueOption(); final int columnCount = testDesc.smallTableValueTypeInfos.length; PrimitiveTypeInfo[] primitiveTypeInfos = new PrimitiveTypeInfo[columnCount]; for (int i = 0; i < columnCount; i++) { primitiveTypeInfos[i] = (PrimitiveTypeInfo) testDesc.smallTableValueTypeInfos[i]; } - Object[] smallTableValueRow = - VectorRandomRowSource.randomWritablePrimitiveRow( - columnCount, random, primitiveTypeInfos); + Object[] smallTableValueRow; + + // Generate empty value row if specified. + if (valueOption == ValueOption.EMPTY_VALUE) { + smallTableValueRow = new Object[columnCount]; + for (int c = 0; c < columnCount; c++) { + smallTableValueRow[c] = + VectorizedBatchUtil.getPrimitiveWritable(primitiveTypeInfos[c].getPrimitiveCategory()); + } + } else { + smallTableValueRow = VectorRandomRowSource.randomWritablePrimitiveRow( + columnCount, random, primitiveTypeInfos); + } for (int c = 0; c < smallTableValueRow.length; c++) { smallTableValueRow[c] = ((PrimitiveObjectInspector) testDesc.smallTableValueObjectInspectors[c]).copyObject(smallTableValueRow[c]); } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/mapjoin/MapJoinTestDescription.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/mapjoin/MapJoinTestDescription.java index 93fdb28eb52a..aceee64e874c 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/mapjoin/MapJoinTestDescription.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/mapjoin/MapJoinTestDescription.java @@ -24,6 +24,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.util.DescriptionTest; +import org.apache.hadoop.hive.ql.exec.vector.mapjoin.MapJoinTestConfig.MapJoinTestImplementation; import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.VectorMapJoinVariation; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; @@ -44,7 +45,8 @@ public static class SmallTableGenerationParameters { public static enum ValueOption { NO_RESTRICTION, ONLY_ONE, - NO_REGULAR_SMALL_KEYS + NO_REGULAR_SMALL_KEYS, + EMPTY_VALUE, // Generate empty value entries. } private ValueOption valueOption; @@ -135,6 +137,8 @@ public int getNoMatchKeyOutOfAThousand() { public ObjectInspector[] outputObjectInspectors; final MapJoinPlanVariation mapJoinPlanVariation; + public MapJoinTestImplementation [] implementations; + public boolean shouldCheckForExpectedOutputAndFail = false; public MapJoinTestDescription ( HiveConf hiveConf, @@ -183,6 +187,8 @@ public MapJoinTestDescription ( this.smallTableGenerationParameters = smallTableGenerationParameters; this.mapJoinPlanVariation = mapJoinPlanVariation; + + this.implementations = MapJoinTestImplementation.values(); computeDerived(); } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/mapjoin/TestMapJoinOperator.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/mapjoin/TestMapJoinOperator.java index a38a6c98f47a..1973168576a6 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/mapjoin/TestMapJoinOperator.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/mapjoin/TestMapJoinOperator.java @@ -43,8 +43,10 @@ import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.VectorMapJoinVariation; import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.jetbrains.annotations.NotNull; import org.junit.Test; import org.junit.Ignore; @@ -53,6 +55,10 @@ import org.junit.Assert; +import static org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory.intTypeInfo; +import static org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory.longTypeInfo; +import static org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory.stringTypeInfo; + public class TestMapJoinOperator { private boolean addLongHiveConfVariation(int hiveConfVariation, HiveConf hiveConf) { @@ -1343,6 +1349,93 @@ public void testString2() throws Exception { } while (!hiveConfVariationsDone); } + /** + * Test case for INNER vector map join with only small table key projection - string type. + * + * @throws Exception Exception + */ + @Test + public void testSmallTableKeyOnlyProjectionWithEmptyValueString() throws Exception { + long seed = 26653L; + int rowCount = 100; + + MapJoinTestDescription testDesc = getTestDescriptionForSmallTableEmptyValue(stringTypeInfo); + + // Create the test data. + MapJoinTestData testData = new MapJoinTestData(rowCount, testDesc, seed); + + executeTest(testDesc, testData, "testSmallTableKeyOnlyProjectionWithEmptyValueString"); + } + + /** + * Test case for INNER vector map join with only small table key projection - int type. + * + * @throws Exception Exception + */ + @Test + public void testSmallTableKeyOnlyProjectionWithEmptyValueInt() throws Exception { + long seed = 26653L; + int rowCount = 100; + + MapJoinTestDescription testDesc = getTestDescriptionForSmallTableEmptyValue(intTypeInfo); + + // Create the test data. + MapJoinTestData testData = new MapJoinTestData(rowCount, testDesc, seed); + + executeTest(testDesc, testData, "testSmallTableKeyOnlyProjectionWithEmptyValueInt"); + } + + /** + * Test case for INNER vector map join with only small table key projection - long type. + * + * @throws Exception Exception + */ + @Test + public void testSmallTableKeyOnlyProjectionWithEmptyValueLong() throws Exception { + long seed = 26653L; + int rowCount = 100; + + MapJoinTestDescription testDesc = getTestDescriptionForSmallTableEmptyValue(longTypeInfo); + // Increase the probability of a big table key being present in the small table. + // 200 out of 1000 keys in small table. + testDesc.smallTableGenerationParameters.setKeyOutOfAThousand(200); + + // Create the test data. + MapJoinTestData testData = new MapJoinTestData(rowCount, testDesc, seed); + + executeTest(testDesc, testData, "testSmallTableKeyOnlyProjectionWithEmptyValueLong"); + } + + private @NotNull MapJoinTestDescription getTestDescriptionForSmallTableEmptyValue(PrimitiveTypeInfo typeInfo) { + // Define BigTable + TypeInfo[] bigTableTypeInfos = new TypeInfo[] {typeInfo}; + int[] bigTableKeyColumnNums = new int[] { 0 }; // key is column 0 + + // Define SmallTable + TypeInfo[] smallTableValueTypeInfos = new TypeInfo[] {typeInfo}; + int[] smallTableRetainKeyColumnNums = new int[] { 0 }; // retain key column 0 + + // Generate empty values for small table + SmallTableGenerationParameters smallTableGenParams = new SmallTableGenerationParameters(); + smallTableGenParams.setValueOption(ValueOption.EMPTY_VALUE); + + // Create an INNER MapJoin test description + MapJoinTestDescription mapJoinTestDescription = new MapJoinTestDescription( + getHiveConf(), + VectorMapJoinVariation.INNER, + bigTableTypeInfos, + bigTableKeyColumnNums, + smallTableValueTypeInfos, + smallTableRetainKeyColumnNums, + smallTableGenParams, + MapJoinPlanVariation.DYNAMIC_PARTITION_HASH_JOIN); + mapJoinTestDescription.implementations = + new MapJoinTestImplementation[] {MapJoinTestImplementation.NATIVE_VECTOR_OPTIMIZED}; + mapJoinTestDescription.shouldCheckForExpectedOutputAndFail = true; + + return mapJoinTestDescription; + } + public boolean doTestString2(long seed, int hiveConfVariation, VectorMapJoinVariation vectorMapJoinVariation, MapJoinPlanVariation mapJoinPlanVariation) throws Exception { @@ -1512,8 +1605,17 @@ private RowTestObjectsMultiSet createExpectedTestRowMultiSet(MapJoinTestDescript Object[] valueRow = valueList.get(v).getRow(); final int smallTableRetainValueColumnNumsLength = testDesc.smallTableRetainValueColumnNums.length; + + // When EMPTY_VALUE is specified, the small table value columns are not + // actually stored in the small table HashMap. So we have to simulate that here. + // The expected output should have values from the big table key columns. + final boolean isEmptyValue = + testDesc.smallTableGenerationParameters.getValueOption() == ValueOption.EMPTY_VALUE && + testDesc.smallTableRetainValueColumnNums.length > 0 && + testDesc.smallTableRetainValueColumnNums.length == testDesc.bigTableKeyColumnNums.length; + for (int o = 0; o < smallTableRetainValueColumnNumsLength; o++) { - outputObjects[outputColumnNum++] = + outputObjects[outputColumnNum++] = isEmptyValue ? bigTableKeyObjects[o] : valueRow[testDesc.smallTableRetainValueColumnNums[o]]; } @@ -1773,7 +1875,7 @@ private void doExecuteTest(MapJoinTestDescription testDesc, MapJoinTestData test " totalValueCount " + expectedTestRowMultiSet.getTotalValueCount()); // Execute all implementation variations. - for (MapJoinTestImplementation mapJoinImplementation : MapJoinTestImplementation.values()) { + for (MapJoinTestImplementation mapJoinImplementation : testDesc.implementations) { if (testDesc.vectorMapJoinVariation == VectorMapJoinVariation.FULL_OUTER && mapJoinImplementation == MapJoinTestImplementation.ROW_MODE_HASH_MAP) { @@ -1981,6 +2083,11 @@ private void executeTestImplementation( " for implementation " + mapJoinImplementation + " variation " + testDesc.vectorMapJoinVariation + option); expectedTestRowMultiSet.displayDifferences(outputTestRowMultiSet, "expected", "actual"); + + if (testDesc.shouldCheckForExpectedOutputAndFail) { + Assert.fail(title + " failed for implementation " + mapJoinImplementation + + " variation " + testDesc.vectorMapJoinVariation + option); + } } else { System.out.println("*BENCHMARK* " + title + " verify succeeded " + " for implementation " + mapJoinImplementation +