-
Notifications
You must be signed in to change notification settings - Fork 4.8k
HIVE-28735: Propagated the exception properly from hash table-loader threads to main thread #5845
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
…threads to main thread
…threads to main thread
...ava/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java
Outdated
Show resolved
Hide resolved
...ava/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java
Outdated
Show resolved
Hide resolved
vectorMapJoinFastHashTableLoader.initHTLoadingService(1048577); | ||
List<CompletableFuture<Void>> loaderTasks = vectorMapJoinFastHashTableLoader.submitQueueDrainThreads(mockTableContainer); | ||
assertEquals(2, loaderTasks.size()); | ||
vectorMapJoinFastHashTableLoader.getLoadExecService().shutdown(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i don't think that is a good way to repro the issue:
- why prepare mock
drainAndLoadForPartition
and not even trigger it?
Instead, execute direct getLoadExecService().shutdown()
and CompletableFuture.allOf
.
- your test should focus on
ExecutionException
and not theInterruptedException
(based on PR description)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@deniskuzZ , when we call the submitQueueDrainThreads method - it will internally call drainAndLoadForPartition and this is what I want to test. If any error occurs in drainAndLoadForPartition, it will get caught and RuntimeException will be thrown which will be propagated as ExecutionException.
In my repro sceanrio the code flow was reaching till here but not being propagated properly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here would be wrapped in ExecutionException
.
Your test is checking for InterruptedException
which was already covered in https://github.com/apache/hive/pull/5845/files#diff-573e1d79d59124631df039ba5fedfd037457f3e3c3b8fb5e349b4086191bc197L330
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ExecutionException thrown = assertThrows(ExecutionException.class, () -> {
CompletableFuture.allOf(loaderTasks.toArray(new CompletableFuture[0]))
.get(2, TimeUnit.MINUTES);
});
The above code block is checking for ExecutionException only
assertInstanceOf(InterruptedException.class, cause.getCause());
The above assertion is just extra assertion which tells the underlying cause of ExecutionException. I can remove it too
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should test the production code drainAndLoadForPartition
, configure drainAndLoadForPartition
to throw the exception, and not test the TEST
...org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/TestVectorMapJoinFastHashTableLoader.java
Show resolved
Hide resolved
|
throw new HiveException("Failed to complete the hash table loader. Loading timed out."); | ||
} catch (ExecutionException e) { | ||
throw new HiveException("One of the loader threads failed", e.getCause()); | ||
} catch (InterruptedException e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you already have handling for InterruptedException
and loadExecService
shutdown here
no need to duplicate
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please check the patch
Subject: [PATCH] cleanup
---
Index: ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java (revision 7e356e06eee218e077586a52811a42449196152d)
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java (date 1749805780973)
@@ -88,10 +88,6 @@
}
- ExecutorService getLoadExecService() {
- return loadExecService;
- }
-
public VectorMapJoinFastHashTableLoader(TezContext context, Configuration hconf, MapJoinOperator joinOp) {
this.tezContext = context;
this.hconf = hconf;
@@ -114,7 +110,7 @@
this.htLoadCounter = tezContext.getTezProcessorContext().getCounters().findCounter(counterGroup, counterName);
}
- void initHTLoadingService(long estKeyCount) {
+ void initHTLoadingService(long estKeyCount) {
if (estKeyCount < VectorMapJoinFastHashTable.FIRST_SIZE_UP) {
// Avoid many small HTs that will rehash multiple times causing GCs
this.numLoadThreads = 1;
@@ -157,8 +153,8 @@
}
}
- List<CompletableFuture<Void>> submitQueueDrainThreads(VectorMapJoinFastTableContainer vectorMapJoinFastTableContainer)
- throws InterruptedException, IOException, SerDeException {
+ List<CompletableFuture<Void>> submitQueueDrainThreads(VectorMapJoinFastTableContainer vectorMapJoinFastTableContainer)
+ throws InterruptedException {
List<CompletableFuture<Void>> loaderTasks = new ArrayList<>();
for (int partitionId = 0; partitionId < numLoadThreads; partitionId++) {
int finalPartitionId = partitionId;
@@ -319,20 +315,11 @@
LOG.info("Finished loading the queue for input: {} waiting {} minutes for TPool shutdown", inputName, 2);
addQueueDoneSentinel();
- loadExecService.shutdown();
try {
CompletableFuture.allOf(loaderTasks.toArray(new CompletableFuture[0]))
- .get(2, TimeUnit.MINUTES);
- } catch (TimeoutException e) {
- throw new HiveException("Failed to complete the hash table loader. Loading timed out.");
- } catch (ExecutionException e) {
- throw new HiveException("One of the loader threads failed", e.getCause());
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- } finally {
- if (loadExecService != null && !loadExecService.isTerminated()) {
- loadExecService.shutdownNow();
- }
+ .get(2, TimeUnit.MINUTES);
+ } catch (ExecutionException | TimeoutException e) {
+ throw new HiveException("Failed to complete the hash table loader.", e.getCause());
}
batchPool.clear();
LOG.info("Total received entries: {} Threads {} HT entries: {}", receivedEntries, numLoadThreads, totalEntries.get());
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack, I'll update in the next revision
@@ -83,6 +88,10 @@ public VectorMapJoinFastHashTableLoader() { | |||
|
|||
} | |||
|
|||
ExecutorService getLoadExecService() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I need this so I can actually shutdown the service in tearDown method after running unit test. Since loadExecService is a private member of the class, I thought to make a package-private getter for the same so that I can access it in unit test.
Other approach will be to change access modifier of loadExecService to package-private
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
drainAndLoadForPartition
in the finally should shutdown the service
What changes were proposed in this pull request?
Need to properly propagate the exception which can occur in drainAndLoadForPartition method while parallel loading of fast Hash tables by the threads.
In the current scenario the thread is silently dying causing incorrect(lesser number)of HT entries to be reported which is giving incorrect results
Why are the changes needed?
It fixes bug caused by https://issues.apache.org/jira/browse/HIVE-25149 that hides actual exceptions and gives incorrect results. Example query is added in the jira : https://issues.apache.org/jira/browse/HIVE-28735
Is the change a dependency upgrade?
No
Does this PR introduce any user-facing change?
No
How was this patch tested?
Tested using the sample query.
Hive 3
Hive 4
Before the fix : Wrong results on each run
After the fix : Getting same OOM exception as in hive 3
Also added a unit test which verifies the same