Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,13 @@ protected void initLoggingPrefix(String className) {
// that appear in the small table portion of the join output.
protected transient VectorCopyRow outerSmallTableKeyVectorCopy;

/**
* Helper to copy BigTable keys to SmallTable value output columns.
* This is initialized in {@link #initializeOp(Configuration)} and used
* in {@link VectorMapJoinGenerateResultOperator}.
*/
protected transient VectorCopyRow bigTableKeyToSmallTableValueCopy;

// This helper object deserializes LazyBinary format small table values into columns of a row
// in a vectorized row batch.
protected transient VectorDeserializeRow<LazyBinaryDeserializeRead> smallTableValueVectorDeserializeRow;
Expand Down Expand Up @@ -596,6 +603,16 @@ protected void initializeOp(Configuration hconf) throws HiveException {
outerSmallTableKeyVectorCopy.init(outerSmallTableKeyMapping);
}

// Checks if the plan is projecting values from the SmallTable
// AND if the number of projected values matches the number of join keys.
if (smallTableValueMapping.getCount() > 0 &&
smallTableValueMapping.getCount() == bigTableKeyColumnMap.length) {
Comment on lines +608 to +609
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand the reasoning/intuition behind this check. Why do we care about values and keys being of the same length?

bigTableKeyToSmallTableValueCopy = new VectorCopyRow();
bigTableKeyToSmallTableValueCopy.init(
bigTableKeyColumnMap, smallTableValueMapping.getOutputColumns(), bigTableKeyTypeInfos
);
}

/*
* Setup the overflow batch.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,10 +214,7 @@ protected int generateHashMapResultSingleValue(VectorizedRowBatch batch,
batch, batchIndex);
}

if (smallTableValueVectorDeserializeRow != null) {
doSmallTableValueDeserializeRow(batch, batchIndex,
byteSegmentRef, hashMapResult);
}
updateBatchWithSmallTableValue(batch, hashMapResult, batchIndex, byteSegmentRef);

// Use the big table row as output.
batch.selected[numSel++] = batchIndex;
Expand All @@ -226,6 +223,29 @@ protected int generateHashMapResultSingleValue(VectorizedRowBatch batch,
return numSel;
}

private void updateBatchWithSmallTableValue(
VectorizedRowBatch batch, VectorMapJoinHashMapResult hashMapResult, int batchIndex, ByteSegmentRef byteSegmentRef)
throws HiveException {

// Check if the small table value is empty.
boolean isSmallTableValueEmpty = byteSegmentRef.getLength() == 0;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fact that we need to check the actual data in order to decide how to evaluate the join (or rather the creation of the resulting row) is somewhat suspicious and a bit brittle. Ideally, the compiler should be able to determine exactly how the operator should behave via the query plan. Can we exploit (or add) information in the query plan in order to drive the copy decision below?


// INNER join where only the small table key is projected.
// The hash map value for that key is empty, and deserializing it would produce NULLs.
// Instead, copy the big table key into the small table value position.
// This preserves the joining key in the output without introducing NULLs.
// (Applied only when bigTableKeyToSmallTableValueCopy is available.)
if (isSmallTableValueEmpty && bigTableKeyToSmallTableValueCopy != null) {
bigTableKeyToSmallTableValueCopy.copyByValue(
batch, batchIndex, // from big table key area
batch, batchIndex // to small table value area
);
} else if (smallTableValueVectorDeserializeRow != null) {
doSmallTableValueDeserializeRow(batch, batchIndex,
byteSegmentRef, hashMapResult);
}
}

/**
* Generate results for a N x M cross product.
*
Expand Down Expand Up @@ -295,11 +315,7 @@ protected void generateHashMapResultMultiValue(VectorizedRowBatch batch,
overflowBatch, overflowBatch.size);
}

if (smallTableValueVectorDeserializeRow != null) {

doSmallTableValueDeserializeRow(overflowBatch, overflowBatch.size,
byteSegmentRef, hashMapResult);
}
updateBatchWithSmallTableValue(overflowBatch, hashMapResult, overflowBatch.size, byteSegmentRef);

overflowBatch.size++;
if (overflowBatch.size == overflowBatch.DEFAULT_SIZE) {
Expand Down Expand Up @@ -339,12 +355,7 @@ private void generateHashMapResultLargeMultiValue(VectorizedRowBatch batch,

// Fill up as much of the overflow batch as possible with small table values.
while (byteSegmentRef != null) {

if (smallTableValueVectorDeserializeRow != null) {
doSmallTableValueDeserializeRow(overflowBatch, overflowBatch.size,
byteSegmentRef, hashMapResult);
}

updateBatchWithSmallTableValue(overflowBatch, hashMapResult, overflowBatch.size, byteSegmentRef);
overflowBatch.size++;
if (overflowBatch.size == overflowBatch.DEFAULT_SIZE) {
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,8 @@
import org.apache.hadoop.hive.ql.exec.persistence.MapJoinObjectSerDeContext;
import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer;
import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainerSerDe;
import org.apache.hadoop.hive.ql.exec.util.collectoroperator.CollectorTestOperator;
import org.apache.hadoop.hive.ql.exec.util.collectoroperator.CountCollectorTestOperator;
import org.apache.hadoop.hive.ql.exec.util.collectoroperator.RowCollectorTestOperator;
import org.apache.hadoop.hive.ql.exec.util.collectoroperator.RowCollectorTestOperatorBase;
import org.apache.hadoop.hive.ql.exec.util.collectoroperator.RowVectorCollectorTestOperator;
import org.apache.hadoop.hive.ql.exec.util.rowobjects.RowTestObjects;
import org.apache.hadoop.hive.ql.exec.util.rowobjects.RowTestObjectsMultiSet;
Expand All @@ -53,6 +51,7 @@
import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext;
import org.apache.hadoop.hive.ql.exec.vector.VectorizationContextRegion;
import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
import org.apache.hadoop.hive.ql.exec.vector.mapjoin.MapJoinTestDescription.SmallTableGenerationParameters.ValueOption;
import org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast.VectorMapJoinFastTableContainer;
import org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast.VerifyFastRow;
import org.apache.hadoop.hive.ql.metadata.HiveException;
Expand Down Expand Up @@ -80,10 +79,6 @@
import org.apache.hadoop.hive.serde2.binarysortable.fast.BinarySortableSerializeWrite;
import org.apache.hadoop.hive.serde2.lazybinary.fast.LazyBinarySerializeWrite;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hive.common.util.ReflectionUtil;
Expand Down Expand Up @@ -776,7 +771,7 @@ private static void loadTableContainerData(MapJoinTestDescription testDesc, MapJ

keyBytesWritable.set(keyOutput.getData(), 0, keyOutput.getLength());

if (valueRow == null) {
if (valueRow == null || ValueOption.EMPTY_VALUE == testDesc.smallTableGenerationParameters.getValueOption()) {
// Empty value.
mapJoinTableContainer.putRow(keyBytesWritable, valueBytesWritable);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,14 +297,25 @@ public static void generateVariationData(MapJoinTestData testData,

private static RowTestObjects generateRandomSmallTableValueRow(MapJoinTestDescription testDesc, Random random) {

final ValueOption valueOption = testDesc.getSmallTableGenerationParameters().getValueOption();
final int columnCount = testDesc.smallTableValueTypeInfos.length;
PrimitiveTypeInfo[] primitiveTypeInfos = new PrimitiveTypeInfo[columnCount];
for (int i = 0; i < columnCount; i++) {
primitiveTypeInfos[i] = (PrimitiveTypeInfo) testDesc.smallTableValueTypeInfos[i];
}
Object[] smallTableValueRow =
VectorRandomRowSource.randomWritablePrimitiveRow(
columnCount, random, primitiveTypeInfos);
Object[] smallTableValueRow;

// Generate empty value row if specified.
if (valueOption == ValueOption.EMPTY_VALUE) {
smallTableValueRow = new Object[columnCount];
for (int c = 0; c < columnCount; c++) {
smallTableValueRow[c] =
VectorizedBatchUtil.getPrimitiveWritable(primitiveTypeInfos[c].getPrimitiveCategory());
Comment on lines +312 to +313
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which real use-case is this code trying to simulate? Is this equivalent to having null values in the data or something else. Basically, I am trying to understand under what circumstances we have these values in the table.

From a quick look, it seems that we are using special values (e.g., new Text(ArrayUtils.EMPTY_BYTE_ARRAY)) but not really nulls. If that's the case then I don't see why we need to handle this separately from VectorRandomRowSource.randomWritablePrimitiveRow. Wouldn't it make more sense to tune the random generator to occasioanlly generate this "special" values if they can really appear in practice?

}
} else {
smallTableValueRow = VectorRandomRowSource.randomWritablePrimitiveRow(
columnCount, random, primitiveTypeInfos);
}
for (int c = 0; c < smallTableValueRow.length; c++) {
smallTableValueRow[c] = ((PrimitiveObjectInspector) testDesc.smallTableValueObjectInspectors[c]).copyObject(smallTableValueRow[c]);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.util.DescriptionTest;
import org.apache.hadoop.hive.ql.exec.vector.mapjoin.MapJoinTestConfig.MapJoinTestImplementation;
import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.VectorMapJoinVariation;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
Expand All @@ -44,7 +45,8 @@ public static class SmallTableGenerationParameters {
public static enum ValueOption {
NO_RESTRICTION,
ONLY_ONE,
NO_REGULAR_SMALL_KEYS
NO_REGULAR_SMALL_KEYS,
EMPTY_VALUE, // Generate empty value entries.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By going over the code, I get the impression that the ValueOption enumeration is about the generation of the keys of the small table not the values. Mixing the two creates confusion and makes the code harder to understand.

}

private ValueOption valueOption;
Expand Down Expand Up @@ -135,6 +137,8 @@ public int getNoMatchKeyOutOfAThousand() {
public ObjectInspector[] outputObjectInspectors;

final MapJoinPlanVariation mapJoinPlanVariation;
public MapJoinTestImplementation [] implementations;
public boolean shouldCheckForExpectedOutputAndFail = false;

public MapJoinTestDescription (
HiveConf hiveConf,
Expand Down Expand Up @@ -183,6 +187,8 @@ public MapJoinTestDescription (
this.smallTableGenerationParameters = smallTableGenerationParameters;

this.mapJoinPlanVariation = mapJoinPlanVariation;

this.implementations = MapJoinTestImplementation.values();

computeDerived();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,10 @@
import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.VectorMapJoinVariation;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
import org.jetbrains.annotations.NotNull;
import org.junit.Test;
import org.junit.Ignore;

Expand All @@ -53,6 +55,10 @@

import org.junit.Assert;

import static org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory.intTypeInfo;
import static org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory.longTypeInfo;
import static org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory.stringTypeInfo;

public class TestMapJoinOperator {

private boolean addLongHiveConfVariation(int hiveConfVariation, HiveConf hiveConf) {
Expand Down Expand Up @@ -1343,6 +1349,93 @@ public void testString2() throws Exception {
} while (!hiveConfVariationsDone);
}

/**
* Test case for INNER vector map join with only small table key projection - string type.
*
* @throws Exception Exception
*/
@Test
public void testSmallTableKeyOnlyProjectionWithEmptyValueString() throws Exception {
Copy link
Member

@zabetak zabetak Nov 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding tests in this class are useful but it may not be the best option in every situation. These tests depend on random generation of input/output and they are good for covering general behavior of the joins operators but for edge cases and very specific bugs having fixed input & output and join configuration would be much easier to reason about.

For showcasing the bug in this PR (if there is one), it would really help to have a dedicated test case possibly in another class and have well-defined and minimal input/output and join settings. Then we can discuss if we also need these randomized tests. The bug implies a problem in a binary join operator so we should be able to demonstrate the issue by correctly picking the schema/data for the left and right side of the join having a few rows on each side.

long seed = 26653L;
int rowCount = 100;

MapJoinTestDescription testDesc = getTestDescriptionForSmallTableEmptyValue(stringTypeInfo);

// Create the test data.
MapJoinTestData testData = new MapJoinTestData(rowCount, testDesc, seed);

executeTest(testDesc, testData, "testSmallTableKeyOnlyProjectionWithEmptyValueString");
}

/**
* Test case for INNER vector map join with only small table key projection - int type.
*
* @throws Exception Exception
*/
@Test
public void testSmallTableKeyOnlyProjectionWithEmptyValueInt() throws Exception {
long seed = 26653L;
int rowCount = 100;

MapJoinTestDescription testDesc = getTestDescriptionForSmallTableEmptyValue(intTypeInfo);

// Create the test data.
MapJoinTestData testData = new MapJoinTestData(rowCount, testDesc, seed);

executeTest(testDesc, testData, "testSmallTableKeyOnlyProjectionWithEmptyValueInt");
}

/**
* Test case for INNER vector map join with only small table key projection - long type.
*
* @throws Exception Exception
*/
@Test
public void testSmallTableKeyOnlyProjectionWithEmptyValueLong() throws Exception {
long seed = 26653L;
int rowCount = 100;

MapJoinTestDescription testDesc = getTestDescriptionForSmallTableEmptyValue(longTypeInfo);
// Increase the probability of a big table key being present in the small table.
// 200 out of 1000 keys in small table.
testDesc.smallTableGenerationParameters.setKeyOutOfAThousand(200);

// Create the test data.
MapJoinTestData testData = new MapJoinTestData(rowCount, testDesc, seed);

executeTest(testDesc, testData, "testSmallTableKeyOnlyProjectionWithEmptyValueLong");
}

private @NotNull MapJoinTestDescription getTestDescriptionForSmallTableEmptyValue(PrimitiveTypeInfo typeInfo) {
// Define BigTable
TypeInfo[] bigTableTypeInfos = new TypeInfo[] {typeInfo};
int[] bigTableKeyColumnNums = new int[] { 0 }; // key is column 0

// Define SmallTable
TypeInfo[] smallTableValueTypeInfos = new TypeInfo[] {typeInfo};
int[] smallTableRetainKeyColumnNums = new int[] { 0 }; // retain key column 0

// Generate empty values for small table
SmallTableGenerationParameters smallTableGenParams = new SmallTableGenerationParameters();
smallTableGenParams.setValueOption(ValueOption.EMPTY_VALUE);

// Create an INNER MapJoin test description
MapJoinTestDescription mapJoinTestDescription = new MapJoinTestDescription(
getHiveConf(),
VectorMapJoinVariation.INNER,
bigTableTypeInfos,
bigTableKeyColumnNums,
smallTableValueTypeInfos,
smallTableRetainKeyColumnNums,
smallTableGenParams,
MapJoinPlanVariation.DYNAMIC_PARTITION_HASH_JOIN);
mapJoinTestDescription.implementations =
new MapJoinTestImplementation[] {MapJoinTestImplementation.NATIVE_VECTOR_OPTIMIZED};
mapJoinTestDescription.shouldCheckForExpectedOutputAndFail = true;

return mapJoinTestDescription;
}

public boolean doTestString2(long seed, int hiveConfVariation,
VectorMapJoinVariation vectorMapJoinVariation,
MapJoinPlanVariation mapJoinPlanVariation) throws Exception {
Expand Down Expand Up @@ -1512,8 +1605,17 @@ private RowTestObjectsMultiSet createExpectedTestRowMultiSet(MapJoinTestDescript
Object[] valueRow = valueList.get(v).getRow();
final int smallTableRetainValueColumnNumsLength =
testDesc.smallTableRetainValueColumnNums.length;

// When EMPTY_VALUE is specified, the small table value columns are not
// actually stored in the small table HashMap. So we have to simulate that here.
// The expected output should have values from the big table key columns.
final boolean isEmptyValue =
testDesc.smallTableGenerationParameters.getValueOption() == ValueOption.EMPTY_VALUE &&
testDesc.smallTableRetainValueColumnNums.length > 0 &&
testDesc.smallTableRetainValueColumnNums.length == testDesc.bigTableKeyColumnNums.length;
Comment on lines +1612 to +1615
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This simulation is problematic cause it makes the solution and the test code somewhat identical. We're implementing a copy logic in two places (prod & test) so the tests will trivially pass as it is right now and immediately fail if the implementation changes in the future.


for (int o = 0; o < smallTableRetainValueColumnNumsLength; o++) {
outputObjects[outputColumnNum++] =
outputObjects[outputColumnNum++] = isEmptyValue ? bigTableKeyObjects[o] :
valueRow[testDesc.smallTableRetainValueColumnNums[o]];
}

Expand Down Expand Up @@ -1773,7 +1875,7 @@ private void doExecuteTest(MapJoinTestDescription testDesc, MapJoinTestData test
" totalValueCount " + expectedTestRowMultiSet.getTotalValueCount());

// Execute all implementation variations.
for (MapJoinTestImplementation mapJoinImplementation : MapJoinTestImplementation.values()) {
for (MapJoinTestImplementation mapJoinImplementation : testDesc.implementations) {

if (testDesc.vectorMapJoinVariation == VectorMapJoinVariation.FULL_OUTER &&
mapJoinImplementation == MapJoinTestImplementation.ROW_MODE_HASH_MAP) {
Expand Down Expand Up @@ -1981,6 +2083,11 @@ private void executeTestImplementation(
" for implementation " + mapJoinImplementation +
" variation " + testDesc.vectorMapJoinVariation + option);
expectedTestRowMultiSet.displayDifferences(outputTestRowMultiSet, "expected", "actual");

if (testDesc.shouldCheckForExpectedOutputAndFail) {
Assert.fail(title + " failed for implementation " + mapJoinImplementation +
" variation " + testDesc.vectorMapJoinVariation + option);
}
} else {
System.out.println("*BENCHMARK* " + title + " verify succeeded " +
" for implementation " + mapJoinImplementation +
Expand Down