diff --git a/server/src/test/java/org/apache/kafka/server/purgatory/DelayedFutureTest.java b/server/src/test/java/org/apache/kafka/server/purgatory/DelayedFutureTest.java index 80efa9930c49f..d3faa193b4dce 100644 --- a/server/src/test/java/org/apache/kafka/server/purgatory/DelayedFutureTest.java +++ b/server/src/test/java/org/apache/kafka/server/purgatory/DelayedFutureTest.java @@ -26,7 +26,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Function; +import java.util.function.Consumer; import java.util.function.Predicate; import java.util.function.Supplier; @@ -49,13 +49,11 @@ void testDelayedFuture() throws Exception { .map(Thread::getName) .anyMatch(name -> name.contains("DelayedExecutor-" + purgatoryName)); - Function>, Void> updateResult = futures -> { - result.set(futures.stream() - .filter(Predicate.not(CompletableFuture::isCompletedExceptionally)) - .mapToInt(future -> assertDoesNotThrow(() -> future.get())) - .sum()); - return null; - }; + Consumer>> updateResult = futures -> + result.set(futures.stream() + .filter(Predicate.not(CompletableFuture::isCompletedExceptionally)) + .mapToInt(future -> assertDoesNotThrow(() -> future.get())) + .sum()); assertFalse(hasExecutorThread.get(), "Unnecessary thread created"); @@ -64,7 +62,7 @@ void testDelayedFuture() throws Exception { CompletableFuture.completedFuture(10), CompletableFuture.completedFuture(11) ); - DelayedFuture r1 = purgatory.tryCompleteElseWatch(100000L, futures1, () -> updateResult.apply(futures1)); + DelayedFuture r1 = purgatory.tryCompleteElseWatch(100000L, futures1, () -> updateResult.accept(futures1)); assertTrue(r1.isCompleted(), "r1 not completed"); assertEquals(21, result.get()); assertFalse(hasExecutorThread.get(), "Unnecessary thread created"); @@ -72,7 +70,7 @@ void testDelayedFuture() throws Exception { // Two delayed futures: callback should wait for both to complete result.set(-1); List> futures2 = List.of(new CompletableFuture<>(), new CompletableFuture<>()); - DelayedFuture r2 = purgatory.tryCompleteElseWatch(100000L, futures2, () -> updateResult.apply(futures2)); + DelayedFuture r2 = purgatory.tryCompleteElseWatch(100000L, futures2, () -> updateResult.accept(futures2)); assertFalse(r2.isCompleted(), "r2 should be incomplete"); futures2.get(0).complete(20); assertFalse(r2.isCompleted()); @@ -88,7 +86,7 @@ void testDelayedFuture() throws Exception { new CompletableFuture<>(), CompletableFuture.completedFuture(31) ); - DelayedFuture r3 = purgatory.tryCompleteElseWatch(100000L, futures3, () -> updateResult.apply(futures3)); + DelayedFuture r3 = purgatory.tryCompleteElseWatch(100000L, futures3, () -> updateResult.accept(futures3)); assertFalse(r3.isCompleted(), "r3 should be incomplete"); assertEquals(-1, result.get()); futures3.get(0).complete(30); @@ -100,7 +98,7 @@ void testDelayedFuture() throws Exception { long start = Time.SYSTEM.hiResClockMs(); long expirationMs = 2000L; List> futures4 = List.of(new CompletableFuture<>(), new CompletableFuture<>()); - DelayedFuture r4 = purgatory.tryCompleteElseWatch(expirationMs, futures4, () -> updateResult.apply(futures4)); + DelayedFuture r4 = purgatory.tryCompleteElseWatch(expirationMs, futures4, () -> updateResult.accept(futures4)); futures4.get(0).complete(40); TestUtils.waitForCondition(() -> futures4.get(1).isDone(), "r4 futures not expired"); assertTrue(r4.isCompleted(), "r4 not completed after timeout"); @@ -109,7 +107,7 @@ void testDelayedFuture() throws Exception { assertEquals(40, futures4.get(0).get()); Exception exception = assertThrows(ExecutionException.class, () -> futures4.get(1).get()); assertEquals(TimeoutException.class, exception.getCause().getClass()); - assertEquals(40, result.get()); + TestUtils.waitForCondition(() -> result.get() == 40, "callback not invoked"); } finally { purgatory.shutdown(); }