-
Notifications
You must be signed in to change notification settings - Fork 4.8k
HIVE-28735: Propagated the exception properly from hash table-loader threads to main thread #5845
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
0da6a43
c80fe21
1789052
5bf9cea
a0d2fca
7e356e0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,13 +18,18 @@ | |
package org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast; | ||
|
||
import java.io.IOException; | ||
import java.util.ArrayList; | ||
import java.util.Collections; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.concurrent.BlockingQueue; | ||
import java.util.concurrent.CompletableFuture; | ||
import java.util.concurrent.ExecutionException; | ||
import java.util.concurrent.ExecutorService; | ||
import java.util.concurrent.Executors; | ||
import java.util.concurrent.LinkedBlockingQueue; | ||
import java.util.concurrent.TimeUnit; | ||
import java.util.concurrent.TimeoutException; | ||
import java.util.concurrent.atomic.LongAccumulator; | ||
|
||
import com.google.common.base.Preconditions; | ||
|
@@ -83,6 +88,10 @@ public VectorMapJoinFastHashTableLoader() { | |
|
||
} | ||
|
||
ExecutorService getLoadExecService() { | ||
return loadExecService; | ||
} | ||
|
||
public VectorMapJoinFastHashTableLoader(TezContext context, Configuration hconf, MapJoinOperator joinOp) { | ||
this.tezContext = context; | ||
this.hconf = hconf; | ||
|
@@ -105,7 +114,7 @@ public void init(ExecMapperContext context, MapredContext mrContext, | |
this.htLoadCounter = tezContext.getTezProcessorContext().getCounters().findCounter(counterGroup, counterName); | ||
} | ||
|
||
private void initHTLoadingService(long estKeyCount) { | ||
void initHTLoadingService(long estKeyCount) { | ||
if (estKeyCount < VectorMapJoinFastHashTable.FIRST_SIZE_UP) { | ||
// Avoid many small HTs that will rehash multiple times causing GCs | ||
this.numLoadThreads = 1; | ||
|
@@ -148,22 +157,26 @@ public void resetBeforeOffer(HashTableElementBatch elementBatch) { | |
} | ||
} | ||
|
||
private void submitQueueDrainThreads(VectorMapJoinFastTableContainer vectorMapJoinFastTableContainer) | ||
throws InterruptedException, IOException, SerDeException { | ||
List<CompletableFuture<Void>> submitQueueDrainThreads(VectorMapJoinFastTableContainer vectorMapJoinFastTableContainer) | ||
throws InterruptedException, IOException, SerDeException { | ||
List<CompletableFuture<Void>> loaderTasks = new ArrayList<>(); | ||
for (int partitionId = 0; partitionId < numLoadThreads; partitionId++) { | ||
int finalPartitionId = partitionId; | ||
this.loadExecService.submit(() -> { | ||
CompletableFuture<Void> asyncTask = CompletableFuture.runAsync(() -> { | ||
try { | ||
LOG.info("Partition id {} with Queue size {}", finalPartitionId, loadBatchQueues[finalPartitionId].size()); | ||
drainAndLoadForPartition(finalPartitionId, vectorMapJoinFastTableContainer); | ||
} catch (IOException | InterruptedException | SerDeException | HiveException e) { | ||
throw new RuntimeException("Failed to start HT Load threads", e); | ||
LOG.error("Caught error while starting HT threads: " + e.getMessage(), e); | ||
throw new RuntimeException("Failed to start HT Load thread", e); | ||
} | ||
}); | ||
}, loadExecService); | ||
loaderTasks.add(asyncTask); | ||
} | ||
return loaderTasks; | ||
} | ||
|
||
private void drainAndLoadForPartition(int partitionId, VectorMapJoinFastTableContainer tableContainer) | ||
void drainAndLoadForPartition(int partitionId, VectorMapJoinFastTableContainer tableContainer) | ||
throws InterruptedException, IOException, HiveException, SerDeException { | ||
LOG.info("Starting draining thread {}", partitionId); | ||
long totalProcessedEntries = 0; | ||
|
@@ -271,7 +284,7 @@ public void load(MapJoinTableContainer[] mapJoinTables, | |
|
||
tableContainer.setSerde(null, null); // No SerDes here. | ||
// Submit parallel loading Threads | ||
submitQueueDrainThreads(tableContainer); | ||
List<CompletableFuture<Void>> loaderTasks = submitQueueDrainThreads(tableContainer); | ||
|
||
long receivedEntries = 0; | ||
long startTime = System.currentTimeMillis(); | ||
|
@@ -307,9 +320,19 @@ public void load(MapJoinTableContainer[] mapJoinTables, | |
LOG.info("Finished loading the queue for input: {} waiting {} minutes for TPool shutdown", inputName, 2); | ||
addQueueDoneSentinel(); | ||
loadExecService.shutdown(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. remove loadExecService.shutdown() |
||
|
||
if (!loadExecService.awaitTermination(2, TimeUnit.MINUTES)) { | ||
try { | ||
CompletableFuture.allOf(loaderTasks.toArray(new CompletableFuture[0])) | ||
.get(2, TimeUnit.MINUTES); | ||
} catch (TimeoutException e) { | ||
throw new HiveException("Failed to complete the hash table loader. Loading timed out."); | ||
} catch (ExecutionException e) { | ||
throw new HiveException("One of the loader threads failed", e.getCause()); | ||
} catch (InterruptedException e) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. you already have handling for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. please check the patch
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ack, I'll update in the next revision |
||
Thread.currentThread().interrupt(); | ||
} finally { | ||
if (loadExecService != null && !loadExecService.isTerminated()) { | ||
loadExecService.shutdownNow(); | ||
} | ||
} | ||
batchPool.clear(); | ||
LOG.info("Total received entries: {} Threads {} HT entries: {}", receivedEntries, numLoadThreads, totalEntries.get()); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,119 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast; | ||
|
||
import org.apache.hadoop.conf.Configuration; | ||
import org.apache.hadoop.hive.ql.exec.MapJoinOperator; | ||
import org.apache.hadoop.hive.ql.exec.tez.TezContext; | ||
import org.apache.hadoop.hive.ql.metadata.HiveException; | ||
import org.apache.hadoop.hive.ql.plan.MapJoinDesc; | ||
import org.apache.hadoop.hive.serde2.SerDeException; | ||
import org.apache.tez.common.counters.TezCounter; | ||
import org.apache.tez.common.counters.TezCounters; | ||
import org.apache.tez.runtime.api.ProcessorContext; | ||
import org.junit.jupiter.api.AfterEach; | ||
import org.junit.jupiter.api.BeforeEach; | ||
import org.junit.jupiter.api.Test; | ||
import org.mockito.Mock; | ||
import org.mockito.MockitoAnnotations; | ||
|
||
import java.io.IOException; | ||
import java.util.List; | ||
import java.util.concurrent.*; | ||
import static org.junit.jupiter.api.Assertions.*; | ||
import static org.mockito.Mockito.*; | ||
|
||
class TestVectorMapJoinFastHashTableLoader { | ||
|
||
private VectorMapJoinFastHashTableLoader vectorMapJoinFastHashTableLoader; | ||
private Configuration hconf; | ||
|
||
@Mock | ||
private VectorMapJoinFastTableContainer mockTableContainer; | ||
|
||
@Mock | ||
private TezContext mockTezContext; | ||
|
||
@Mock | ||
private MapJoinOperator mockJoinOp; | ||
|
||
@Mock | ||
private ProcessorContext mockProcessorContext; | ||
|
||
@Mock | ||
private TezCounters mockTezCounters; | ||
|
||
@Mock | ||
private TezCounter mockTezCounter; | ||
|
||
@Mock | ||
private MapJoinDesc joinDesc; | ||
Paramvir109 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
|
||
@BeforeEach | ||
void setUp() { | ||
MockitoAnnotations.openMocks(this); | ||
hconf = new Configuration(true); | ||
when(mockTezContext.getTezProcessorContext()).thenReturn(mockProcessorContext); | ||
when(mockProcessorContext.getCounters()).thenReturn(mockTezCounters); | ||
when(mockTezCounters.findCounter(anyString(), anyString())).thenReturn(mockTezCounter); | ||
|
||
when(mockJoinOp.getConf()).thenReturn(joinDesc); | ||
when(mockJoinOp.getCacheKey()).thenReturn("testCacheKey"); | ||
|
||
vectorMapJoinFastHashTableLoader = spy(new VectorMapJoinFastHashTableLoader( | ||
mockTezContext, hconf, mockJoinOp)); | ||
|
||
} | ||
|
||
@Test | ||
public void testSubmitQueueDrainThreads_FutureGetThrowsExecutionException() throws | ||
IOException, InterruptedException, SerDeException, HiveException { | ||
doThrow(new InterruptedException("Simulated interruption")).when(vectorMapJoinFastHashTableLoader) | ||
.drainAndLoadForPartition(anyInt(), any(VectorMapJoinFastTableContainer.class)); | ||
vectorMapJoinFastHashTableLoader.initHTLoadingService(1048577); | ||
List<CompletableFuture<Void>> loaderTasks = vectorMapJoinFastHashTableLoader.submitQueueDrainThreads(mockTableContainer); | ||
assertEquals(2, loaderTasks.size()); | ||
vectorMapJoinFastHashTableLoader.getLoadExecService().shutdown(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i don't think that is a good way to repro the issue:
Instead, execute direct
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @deniskuzZ , when we call the submitQueueDrainThreads method - it will internally call drainAndLoadForPartition and this is what I want to test. If any error occurs in drainAndLoadForPartition, it will get caught and RuntimeException will be thrown which will be propagated as ExecutionException. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. here would be wrapped in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
The above code block is checking for ExecutionException only
The above assertion is just extra assertion which tells the underlying cause of ExecutionException. I can remove it too |
||
|
||
ExecutionException thrown = assertThrows(ExecutionException.class, () -> { | ||
CompletableFuture.allOf(loaderTasks.toArray(new CompletableFuture[0])) | ||
.get(2, TimeUnit.MINUTES); | ||
}); | ||
Throwable cause = thrown.getCause(); | ||
assertInstanceOf(RuntimeException.class, cause); | ||
assertInstanceOf(InterruptedException.class, cause.getCause()); | ||
} | ||
|
||
@AfterEach | ||
void tearDown() { | ||
ExecutorService executorService = vectorMapJoinFastHashTableLoader.getLoadExecService(); | ||
if (!executorService.isShutdown()) { | ||
executorService.shutdownNow(); | ||
try { | ||
if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) { | ||
System.err.println("Executor did not terminate in time"); | ||
} | ||
} catch (InterruptedException e) { | ||
Thread.currentThread().interrupt(); | ||
} | ||
} | ||
} | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I need this so I can actually shutdown the service in tearDown method after running unit test. Since loadExecService is a private member of the class, I thought to make a package-private getter for the same so that I can access it in unit test.
Other approach will be to change access modifier of loadExecService to package-private