Skip to content

Commit

Permalink
Avoid a race condition in RetryingExecutorService (rosjava#278)
Browse files Browse the repository at this point in the history
This fixes rosjava#274, which occurs when the task completes before the
`Future` is added to the `callables` map (because the corresponding
submit() is still executing). In that case, `callable` is null, which
causes latches.get(callable) to throw an NPE.

The bug can be reproduced with the added test by adding
`Thread.sleep(1000)` below the `completionService.submit` call.
  • Loading branch information
drigz authored May 18, 2018
1 parent 1dd68e3 commit f2134b2
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,13 @@ private class RetryLoop extends CancellableLoop {
@Override
public void loop() throws InterruptedException {
Future<Boolean> future = completionService.take();
final Callable<Boolean> callable = callables.remove(future);
Callable<Boolean> callable;
CountDownLatch latch;
// Grab the mutex to make sure submit() of the future that we took is finished.
synchronized (mutex) {
callable = callables.remove(future);
latch = latches.get(callable);
}
boolean retry;
try {
retry = future.get();
Expand All @@ -74,14 +80,15 @@ public void loop() throws InterruptedException {
if (DEBUG) {
log.info("Retry requested.");
}
final Callable<Boolean> finalCallable = callable;
scheduledExecutorService.schedule(new Runnable() {
@Override
public void run() {
submit(callable);
submit(finalCallable);
}
}, retryDelay, retryTimeUnit);
} else {
latches.get(callable).countDown();
latch.countDown();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright (C) 2012 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package org.ros.concurrent;

import static org.mockito.Mockito.*;


import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.junit.Before;
import org.junit.Test;

/**
* @author [email protected] (Rodrigo Queiro)
*/
public class RetryingExecutorServiceTest {

private ScheduledExecutorService executorService;

@Before
public void before() {
executorService = Executors.newScheduledThreadPool(4);
}

@Test
public void testNoRetry_calledOnce() throws Exception {
RetryingExecutorService service = new RetryingExecutorService(executorService);
Callable<Boolean> callable = mock(Callable.class);
when(callable.call()).thenReturn(false);
service.submit(callable);
service.shutdown(10, TimeUnit.SECONDS);
verify(callable, times(1)).call();
}

@Test
public void testOneRetry_calledTwice() throws Exception {
RetryingExecutorService service = new RetryingExecutorService(executorService);
service.setRetryDelay(0, TimeUnit.SECONDS);
Callable<Boolean> callable = mock(Callable.class);
when(callable.call()).thenReturn(true).thenReturn(false);
service.submit(callable);

// Call verify() with a timeout before calling shutdown, as shutdown() will prevent further
// retries.
verify(callable, timeout(10000).times(2)).call();
service.shutdown(10, TimeUnit.SECONDS);
}
}

0 comments on commit f2134b2

Please sign in to comment.