Skip to content

Commit 691a130

Browse files
committed
Revise the concurrent pattern
Signed-off-by: bowenlan-amzn <[email protected]>
1 parent 26efcc3 commit 691a130

File tree

1 file changed

+28
-27
lines changed

1 file changed

+28
-27
lines changed

server/src/main/java/org/opensearch/action/search/QueryPhaseResultConsumer.java

Lines changed: 28 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -302,7 +302,7 @@ class PendingMerges implements Releasable {
302302
private volatile long maxAggsCurrentBufferSize = 0;
303303

304304
private final ArrayDeque<MergeTask> queue = new ArrayDeque<>();
305-
private final AtomicReference<MergeTask> runningTask = new AtomicReference<>();
305+
private final AtomicReference<MergeTask> runningTask = new AtomicReference<>(); // ensure only one task is running
306306
private final AtomicReference<Exception> failure = new AtomicReference<>();
307307

308308
private final SearchPhaseController.TopDocsStats topDocsStats;
@@ -355,7 +355,7 @@ private synchronized long addEstimateAndMaybeBreak(long estimatedSize) {
355355
return circuitBreakerBytes;
356356
}
357357

358-
private void resetCircuitBreaker() {
358+
private synchronized void resetCircuitBreaker() {
359359
if (circuitBreakerBytes > 0) {
360360
circuitBreaker.addWithoutBreaking(-circuitBreakerBytes);
361361
circuitBreakerBytes = 0;
@@ -388,12 +388,12 @@ private long estimateRamBytesUsedForReduce(long size) {
388388
void consume(QuerySearchResult result, Runnable callback) {
389389
checkCancellation();
390390

391-
if (processResult(result, callback)) {
391+
if (consumeResult(result, callback)) {
392392
callback.run();
393393
}
394394
}
395395

396-
private synchronized boolean processResult(QuerySearchResult result, Runnable callback) {
396+
private synchronized boolean consumeResult(QuerySearchResult result, Runnable callback) {
397397
if (hasFailure()) {
398398
result.consumeAll(); // release memory
399399
return true;
@@ -455,8 +455,7 @@ protected void doRun() {
455455
try {
456456
final QuerySearchResult[] toConsume = task.consumeBuffer();
457457
if (toConsume == null) {
458-
onAfterMerge(task, thisMergeResult, estimatedTotalSize);
459-
executor.execute(() -> tryExecuteNext());
458+
onAfterMerge(task, null, 0);
460459
return;
461460
}
462461
long estimateRamBytesUsedForReduce = estimateRamBytesUsedForReduce(estimatedTotalSize);
@@ -469,7 +468,6 @@ protected void doRun() {
469468
return;
470469
}
471470
onAfterMerge(task, newMerge, estimatedTotalSize);
472-
executor.execute(() -> tryExecuteNext());
473471
}
474472

475473
@Override
@@ -480,29 +478,32 @@ public void onFailure(Exception exc) {
480478
}
481479

482480
private void onAfterMerge(MergeTask task, MergeResult newResult, long estimatedSize) {
483-
synchronized (this) {
484-
runningTask.compareAndSet(task, null);
485-
if (hasFailure()) {
486-
return;
487-
}
488-
mergeResult = newResult;
489-
if (hasAggs) {
490-
// Update the circuit breaker to remove the size of the source aggregations
491-
// and replace the estimation with the serialized size of the newly reduced result.
492-
long newSize = mergeResult.estimatedSize - estimatedSize;
493-
addWithoutBreaking(newSize);
494-
logger.trace(
495-
"aggs partial reduction [{}->{}] max [{}]",
496-
estimatedSize,
497-
mergeResult.estimatedSize,
498-
maxAggsCurrentBufferSize
499-
);
481+
if (newResult != null) {
482+
synchronized (this) {
483+
if (hasFailure()) {
484+
return;
485+
}
486+
runningTask.compareAndSet(task, null);
487+
mergeResult = newResult;
488+
if (hasAggs) {
489+
// Update the circuit breaker to remove the size of the source aggregations
490+
// and replace the estimation with the serialized size of the newly reduced result.
491+
long newSize = mergeResult.estimatedSize - estimatedSize;
492+
addWithoutBreaking(newSize);
493+
logger.trace(
494+
"aggs partial reduction [{}->{}] max [{}]",
495+
estimatedSize,
496+
mergeResult.estimatedSize,
497+
maxAggsCurrentBufferSize
498+
);
499+
}
500500
}
501-
task.consumeListener();
502501
}
502+
task.consumeListener();
503+
executor.execute(this::tryExecuteNext);
503504
}
504505

505-
// Idempotent failure handling logic
506+
// Idempotent and thread-safe failure handling
506507
private synchronized void onMergeFailure(Exception exc) {
507508
if (hasFailure()) {
508509
assert circuitBreakerBytes == 0;
@@ -515,7 +516,7 @@ private synchronized void onMergeFailure(Exception exc) {
515516
cancelTaskOnFailure.accept(exc);
516517
}
517518

518-
private void clearMergeTaskQueue() {
519+
private synchronized void clearMergeTaskQueue() {
519520
MergeTask task = runningTask.get();
520521
runningTask.compareAndSet(task, null);
521522
List<MergeTask> toCancels = new ArrayList<>();

0 commit comments

Comments
 (0)