diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundService.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundService.java index 144d1725fdb5..8301e9e15079 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundService.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundService.java @@ -24,6 +24,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.ratis.util.TimeDuration; import org.slf4j.Logger; @@ -40,8 +41,9 @@ public abstract class BackgroundService { protected static final Logger LOG = LoggerFactory.getLogger(BackgroundService.class); - // Executor to launch child tasks - private ScheduledThreadPoolExecutor exec; + private ThreadPoolExecutor exec; + // Scheduler for PeriodicalTask + private ScheduledThreadPoolExecutor scheduler; private ThreadGroup threadGroup; private final String serviceName; private long interval; @@ -86,10 +88,16 @@ public synchronized void setPoolSize(int size) { throw new IllegalArgumentException("Pool size must be positive."); } - // In ScheduledThreadPoolExecutor, maximumPoolSize is Integer.MAX_VALUE - // the corePoolSize will always less maximumPoolSize. - // So we can directly set the corePoolSize - exec.setCorePoolSize(size); + // In ThreadPoolExecutor, maximumPoolSize must always be >= corePoolSize. + // Update in the correct order based on whether we are increasing or decreasing. + final int currentCorePoolSize = exec.getCorePoolSize(); + if (size > currentCorePoolSize) { + exec.setMaximumPoolSize(size); + exec.setCorePoolSize(size); + } else if (size < currentCorePoolSize) { + exec.setCorePoolSize(size); + exec.setMaximumPoolSize(size); + } } public synchronized void setServiceTimeoutInNanos(long newTimeout) { @@ -113,12 +121,13 @@ public void runPeriodicalTaskNow() throws Exception { // start service public synchronized void start() { - if (exec == null || exec.isShutdown() || exec.isTerminated()) { + if (exec == null || exec.isShutdown() || exec.isTerminated() + || scheduler == null || scheduler.isShutdown() || scheduler.isTerminated()) { initExecutorAndThreadGroup(); } LOG.info("Starting service {} with interval {} {}", serviceName, interval, unit.name().toLowerCase()); - exec.scheduleWithFixedDelay(service, 0, interval, unit); + scheduler.scheduleWithFixedDelay(service, 0, interval, unit); } protected synchronized void setInterval(long newInterval, TimeUnit newUnit) { @@ -141,15 +150,6 @@ protected void execTaskCompletion() { } public class PeriodicalTask implements Runnable { @Override public void run() { - // wait for previous set of tasks to complete - try { - future.join(); - } catch (RuntimeException e) { - LOG.error("Background service execution failed.", e); - } finally { - execTaskCompletion(); - } - if (LOG.isDebugEnabled()) { LOG.debug("Running background service : {}", serviceName); } @@ -187,20 +187,33 @@ public void run() { }, exec).exceptionally(e -> null), (Void1, Void) -> null); } } + // wait for all tasks to complete + try { + future.join(); + } catch (RuntimeException e) { + LOG.error("Background service execution failed.", e); + } finally { + execTaskCompletion(); + } } } // shutdown and make sure all threads are properly released. public synchronized void shutdown() { LOG.info("Shutting down service {}", this.serviceName); + scheduler.shutdown(); exec.shutdown(); try { + if (!scheduler.awaitTermination(60, TimeUnit.SECONDS)) { + scheduler.shutdownNow(); + } if (!exec.awaitTermination(60, TimeUnit.SECONDS)) { exec.shutdownNow(); } } catch (InterruptedException e) { // Re-interrupt the thread while catching InterruptedException Thread.currentThread().interrupt(); + scheduler.shutdownNow(); exec.shutdownNow(); } if (threadGroup.activeCount() == 0 && !threadGroup.isDestroyed()) { @@ -215,7 +228,14 @@ private void initExecutorAndThreadGroup() { .setDaemon(true) .setNameFormat(threadNamePrefix + serviceName + "#%d") .build(); - exec = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(threadPoolSize, threadFactory); + exec = (ThreadPoolExecutor) Executors.newFixedThreadPool(threadPoolSize, threadFactory); + // Single-thread scheduler for PeriodicalTask + ThreadFactory schedulerThreadFactory = new ThreadFactoryBuilder() + .setThreadFactory(r -> new Thread(threadGroup, r)) + .setDaemon(true) + .setNameFormat(threadNamePrefix + serviceName + "-scheduler") + .build(); + scheduler = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(1, schedulerThreadFactory); } protected String getServiceName() { diff --git a/hadoop-hdds/common/src/test/java/org/apache/hadoop/util/TestBackgroundService.java b/hadoop-hdds/common/src/test/java/org/apache/hadoop/util/TestBackgroundService.java index 2c5ed1bfea5a..865e9e8fbb80 100644 --- a/hadoop-hdds/common/src/test/java/org/apache/hadoop/util/TestBackgroundService.java +++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/util/TestBackgroundService.java @@ -19,9 +19,11 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -134,11 +136,11 @@ public void testBackgroundServiceRunWaitsForTasks() throws InterruptedException, // Verify that even index tasks have not run even after serviceTimeout. assertEquals(expValuesWithEvenLocks, IntStream.range(0, nTasks).boxed().map(map::get).collect(Collectors.toList())); // Background service is still waiting for all tasks to complete. - assertEquals(0, runCount.get()); + assertEquals(-1, runCount.get()); // Release the locks on even index tasks. lockList.forEach(Lock::unlock); // Wait for current run of BackgroundService to complete. - GenericTestUtils.waitFor(() -> runCount.get() == 1, interval / 5, serviceTimeout); + GenericTestUtils.waitFor(() -> runCount.get() >= 0, interval / 5, serviceTimeout); // Verify that all tasks have completed. assertEquals(expFinalValues, IntStream.range(0, nTasks).boxed().map(map::get).collect(Collectors.toList())); assertTrue(queue.isEmpty()); @@ -159,9 +161,61 @@ public void testSingleWorkerThread() throws InterruptedException, TimeoutExcepti 1, serviceTimeout); backgroundService.start(); // Wait till current run of BackgroundService completes. - GenericTestUtils.waitFor(() -> runCount.get() == 1, interval / 5, serviceTimeout); + GenericTestUtils.waitFor(() -> runCount.get() >= 0, interval / 5, serviceTimeout); // Verify that all tasks have completed. assertEquals(expFinalValues, IntStream.range(0, nTasks).boxed().map(map::get).collect(Collectors.toList())); assertTrue(queue.isEmpty()); } + + @Test + public void testRunWaitsForTaskCompletion() throws TimeoutException, InterruptedException { + int interval = 100; + int serviceTimeout = 5000; + // Lock to control when the task can complete. + Lock taskLock = new ReentrantLock(); + List runStartTimes = Collections.synchronizedList(new ArrayList<>()); + List taskEndTimes = Collections.synchronizedList(new ArrayList<>()); + + backgroundService = new BackgroundService("testFixedDelay", interval, + TimeUnit.MILLISECONDS, 1, serviceTimeout) { + @Override + public BackgroundTaskQueue getTasks() { + BackgroundTaskQueue taskQueue = new BackgroundTaskQueue(); + runStartTimes.add(System.currentTimeMillis()); + taskQueue.add(() -> { + taskLock.lock(); + try { + taskEndTimes.add(System.currentTimeMillis()); + } finally { + taskLock.unlock(); + } + return BackgroundTaskResult.EmptyTaskResult.newResult(); + }); + return taskQueue; + } + + @Override + public void execTaskCompletion() { + runCount.incrementAndGet(); + } + }; + + // Hold the lock so the first task blocks, the task cannot complete before release lock + taskLock.lock(); + try { + backgroundService.start(); + // Wait for the first run to start. + GenericTestUtils.waitFor(() -> !runStartTimes.isEmpty(), 100, serviceTimeout); + try { + // Task cannot complete before release lock + GenericTestUtils.waitFor(() -> runCount.get() >= 0, 100, 2000); + fail("BackgroundService should not complete task"); + } catch (TimeoutException e) { + } + } finally { + taskLock.unlock(); + } + // Wait for first run to complete after unlock + GenericTestUtils.waitFor(() -> runCount.get() >= 0, 10, serviceTimeout); + } }