From b5ff007648e69da83502620433fcf9cb981c6d70 Mon Sep 17 00:00:00 2001 From: okelepko Date: Fri, 26 Jun 2015 23:17:34 -0700 Subject: [PATCH 1/2] Properly synchronize CancellableLoop. The write to the thread variable was guarded by a lock, but the reads were not. Also, operations in cancel and isRunning methods were not atomic. This could lead to visibility issues and data races (NPEs). --- .../org/ros/concurrent/CancellableLoop.java | 60 +++++++++++-------- 1 file changed, 36 insertions(+), 24 deletions(-) diff --git a/rosjava/src/main/java/org/ros/concurrent/CancellableLoop.java b/rosjava/src/main/java/org/ros/concurrent/CancellableLoop.java index fa6bc0ba6..bc6412346 100644 --- a/rosjava/src/main/java/org/ros/concurrent/CancellableLoop.java +++ b/rosjava/src/main/java/org/ros/concurrent/CancellableLoop.java @@ -16,49 +16,45 @@ package org.ros.concurrent; -import com.google.common.base.Preconditions; - import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicReference; /** * An interruptable loop that can be run by an {@link ExecutorService}. - * + * * @author khughes@google.com (Keith M. Hughes) */ public abstract class CancellableLoop implements Runnable { + private static final Object NOT_STARTED = null; - private final Object mutex; - - /** - * {@code true} if the code has been run once, {@code false} otherwise. - */ - private boolean ranOnce = false; + private static final Object FINISHED = CancellableLoop.class; /** - * The {@link Thread} the code will be running in. + * State of this loop. Possible values are: + * */ - private Thread thread; - - public CancellableLoop() { - mutex = new Object(); - } + private final AtomicReference state = new AtomicReference(); @Override public void run() { - synchronized (mutex) { - Preconditions.checkState(!ranOnce, "CancellableLoops cannot be restarted."); - ranOnce = true; - thread = Thread.currentThread(); + Thread currentThread = Thread.currentThread(); + if (!state.compareAndSet(NOT_STARTED, currentThread)) { + throw new IllegalStateException("CancellableLoops cannot be restarted."); } + try { setup(); - while (!thread.isInterrupted()) { + while (!currentThread.isInterrupted()) { loop(); } } catch (InterruptedException e) { handleInterruptedException(e); } finally { - thread = null; + state.set(this); } } @@ -86,8 +82,23 @@ protected void handleInterruptedException(InterruptedException e) { * Interrupts the loop. */ public void cancel() { - if (thread != null) { - thread.interrupt(); + for (; ; ) { + Object currentState = state.get(); + if (currentState == NOT_STARTED) { + if (state.compareAndSet(NOT_STARTED, FINISHED)) { + // cancelled before starting + return; + } else { + // started before we cancelled, try again + continue; + } + // either finished, or we cancel it + } else if (currentState != FINISHED && state.compareAndSet(currentState, FINISHED)) { + // first to interrupt + Thread runningThread = (Thread) currentState; + runningThread.interrupt(); + } + return; } } @@ -95,6 +106,7 @@ public void cancel() { * @return {@code true} if the loop is running */ public boolean isRunning() { - return thread != null && !thread.isInterrupted(); + Object currentState = state.get(); + return currentState != NOT_STARTED && currentState != FINISHED; } } From 280c4a057652e8bdbb01a33081e19f49363458e8 Mon Sep 17 00:00:00 2001 From: okelepko Date: Fri, 26 Jun 2015 23:17:34 -0700 Subject: [PATCH 2/2] Properly synchronize CancellableLoop. The write to the thread variable was guarded by a lock, but the reads were not. Also, operations in cancel and isRunning methods were not atomic. This could lead to visibility issues and data races (NPEs). --- rosjava/src/main/java/org/ros/concurrent/CancellableLoop.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rosjava/src/main/java/org/ros/concurrent/CancellableLoop.java b/rosjava/src/main/java/org/ros/concurrent/CancellableLoop.java index bc6412346..b340d7666 100644 --- a/rosjava/src/main/java/org/ros/concurrent/CancellableLoop.java +++ b/rosjava/src/main/java/org/ros/concurrent/CancellableLoop.java @@ -54,7 +54,7 @@ public void run() { } catch (InterruptedException e) { handleInterruptedException(e); } finally { - state.set(this); + state.set(FINISHED); } }