Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.ratis.util.TimeDuration;
import org.slf4j.Logger;
Expand All @@ -40,8 +41,9 @@ public abstract class BackgroundService {
protected static final Logger LOG =
LoggerFactory.getLogger(BackgroundService.class);

// Executor to launch child tasks
private ScheduledThreadPoolExecutor exec;
private ThreadPoolExecutor exec;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exec is only responsible for executing tasks, so ThreadPoolExecutor is currently more suitable.

// Scheduler for PeriodicalTask
private ScheduledThreadPoolExecutor scheduler;
private ThreadGroup threadGroup;
private final String serviceName;
private long interval;
Expand Down Expand Up @@ -86,10 +88,16 @@ public synchronized void setPoolSize(int size) {
throw new IllegalArgumentException("Pool size must be positive.");
}

// In ScheduledThreadPoolExecutor, maximumPoolSize is Integer.MAX_VALUE
// the corePoolSize will always less maximumPoolSize.
// So we can directly set the corePoolSize
exec.setCorePoolSize(size);
// In ThreadPoolExecutor, maximumPoolSize must always be >= corePoolSize.
// Update in the correct order based on whether we are increasing or decreasing.
final int currentCorePoolSize = exec.getCorePoolSize();
if (size > currentCorePoolSize) {
exec.setMaximumPoolSize(size);
exec.setCorePoolSize(size);
} else if (size < currentCorePoolSize) {
exec.setCorePoolSize(size);
exec.setMaximumPoolSize(size);
}
}

public synchronized void setServiceTimeoutInNanos(long newTimeout) {
Expand All @@ -113,12 +121,13 @@ public void runPeriodicalTaskNow() throws Exception {

// start service
public synchronized void start() {
if (exec == null || exec.isShutdown() || exec.isTerminated()) {
if (exec == null || exec.isShutdown() || exec.isTerminated()
|| scheduler == null || scheduler.isShutdown() || scheduler.isTerminated()) {
initExecutorAndThreadGroup();
}
LOG.info("Starting service {} with interval {} {}", serviceName,
interval, unit.name().toLowerCase());
exec.scheduleWithFixedDelay(service, 0, interval, unit);
scheduler.scheduleWithFixedDelay(service, 0, interval, unit);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xichen01 , thanks for continue this improvement task.

Can we keep using the exec here? What's the extra benefit of introducing scheduler?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PeriodicalTask#run is now a blocking method. If exec only has one thread, when the PeriodicalTask#run blocks, Then the task submitted to the queue will remain without a thread to execute it.

Therefore, we introduce a separate thread scheduler responsible for submitting tasks, while exec is only responsible for executing the tasks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

”exec“ is a ScheduledExecutorService thread pool with no upper thread limit. I also just found it during the discussion of your last proposal patch's review.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but the queue of ScheduledExecutorService will always not be full, so only corePoolSize thread will work.

}

protected synchronized void setInterval(long newInterval, TimeUnit newUnit) {
Expand All @@ -141,15 +150,6 @@ protected void execTaskCompletion() { }
public class PeriodicalTask implements Runnable {
@Override
public void run() {
// wait for previous set of tasks to complete
try {
future.join();
} catch (RuntimeException e) {
LOG.error("Background service execution failed.", e);
} finally {
execTaskCompletion();
}

if (LOG.isDebugEnabled()) {
LOG.debug("Running background service : {}", serviceName);
}
Expand Down Expand Up @@ -187,20 +187,33 @@ public void run() {
}, exec).exceptionally(e -> null), (Void1, Void) -> null);
}
}
// wait for all tasks to complete
try {
future.join();
} catch (RuntimeException e) {
LOG.error("Background service execution failed.", e);
} finally {
execTaskCompletion();
}
}
}

// shutdown and make sure all threads are properly released.
public synchronized void shutdown() {
LOG.info("Shutting down service {}", this.serviceName);
scheduler.shutdown();
exec.shutdown();
try {
if (!scheduler.awaitTermination(60, TimeUnit.SECONDS)) {
scheduler.shutdownNow();
}
if (!exec.awaitTermination(60, TimeUnit.SECONDS)) {
exec.shutdownNow();
}
} catch (InterruptedException e) {
// Re-interrupt the thread while catching InterruptedException
Thread.currentThread().interrupt();
scheduler.shutdownNow();
exec.shutdownNow();
}
if (threadGroup.activeCount() == 0 && !threadGroup.isDestroyed()) {
Expand All @@ -215,7 +228,14 @@ private void initExecutorAndThreadGroup() {
.setDaemon(true)
.setNameFormat(threadNamePrefix + serviceName + "#%d")
.build();
exec = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(threadPoolSize, threadFactory);
exec = (ThreadPoolExecutor) Executors.newFixedThreadPool(threadPoolSize, threadFactory);
// Single-thread scheduler for PeriodicalTask
ThreadFactory schedulerThreadFactory = new ThreadFactoryBuilder()
.setThreadFactory(r -> new Thread(threadGroup, r))
.setDaemon(true)
.setNameFormat(threadNamePrefix + serviceName + "-scheduler")
.build();
scheduler = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(1, schedulerThreadFactory);
}

protected String getServiceName() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -134,11 +136,11 @@ public void testBackgroundServiceRunWaitsForTasks() throws InterruptedException,
// Verify that even index tasks have not run even after serviceTimeout.
assertEquals(expValuesWithEvenLocks, IntStream.range(0, nTasks).boxed().map(map::get).collect(Collectors.toList()));
// Background service is still waiting for all tasks to complete.
assertEquals(0, runCount.get());
assertEquals(-1, runCount.get());
// Release the locks on even index tasks.
lockList.forEach(Lock::unlock);
// Wait for current run of BackgroundService to complete.
GenericTestUtils.waitFor(() -> runCount.get() == 1, interval / 5, serviceTimeout);
GenericTestUtils.waitFor(() -> runCount.get() >= 0, interval / 5, serviceTimeout);
// Verify that all tasks have completed.
assertEquals(expFinalValues, IntStream.range(0, nTasks).boxed().map(map::get).collect(Collectors.toList()));
assertTrue(queue.isEmpty());
Expand All @@ -159,9 +161,61 @@ public void testSingleWorkerThread() throws InterruptedException, TimeoutExcepti
1, serviceTimeout);
backgroundService.start();
// Wait till current run of BackgroundService completes.
GenericTestUtils.waitFor(() -> runCount.get() == 1, interval / 5, serviceTimeout);
GenericTestUtils.waitFor(() -> runCount.get() >= 0, interval / 5, serviceTimeout);
// Verify that all tasks have completed.
assertEquals(expFinalValues, IntStream.range(0, nTasks).boxed().map(map::get).collect(Collectors.toList()));
assertTrue(queue.isEmpty());
}

@Test
public void testRunWaitsForTaskCompletion() throws TimeoutException, InterruptedException {
int interval = 100;
int serviceTimeout = 5000;
// Lock to control when the task can complete.
Lock taskLock = new ReentrantLock();
List<Long> runStartTimes = Collections.synchronizedList(new ArrayList<>());
List<Long> taskEndTimes = Collections.synchronizedList(new ArrayList<>());

backgroundService = new BackgroundService("testFixedDelay", interval,
TimeUnit.MILLISECONDS, 1, serviceTimeout) {
@Override
public BackgroundTaskQueue getTasks() {
BackgroundTaskQueue taskQueue = new BackgroundTaskQueue();
runStartTimes.add(System.currentTimeMillis());
taskQueue.add(() -> {
taskLock.lock();
try {
taskEndTimes.add(System.currentTimeMillis());
} finally {
taskLock.unlock();
}
return BackgroundTaskResult.EmptyTaskResult.newResult();
});
return taskQueue;
}

@Override
public void execTaskCompletion() {
runCount.incrementAndGet();
}
};

// Hold the lock so the first task blocks, the task cannot complete before release lock
taskLock.lock();
try {
backgroundService.start();
// Wait for the first run to start.
GenericTestUtils.waitFor(() -> !runStartTimes.isEmpty(), 100, serviceTimeout);
try {
// Task cannot complete before release lock
GenericTestUtils.waitFor(() -> runCount.get() >= 0, 100, 2000);
fail("BackgroundService should not complete task");
} catch (TimeoutException e) {
}
} finally {
taskLock.unlock();
}
// Wait for first run to complete after unlock
GenericTestUtils.waitFor(() -> runCount.get() >= 0, 10, serviceTimeout);
}
}