Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CURATOR-727: Allow watches to be executed asynchronously #514

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.List;
import java.util.Objects;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.curator.CuratorZookeeperClient;
Expand Down Expand Up @@ -171,6 +172,7 @@ public static class Builder {
private SchemaSet schemaSet = SchemaSet.getDefaultSchemaSet();
private int waitForShutdownTimeoutMs = 0;
private Executor runSafeService = null;
private ExecutorService asyncWatchService = null;
private ConnectionStateListenerManagerFactory connectionStateListenerManagerFactory =
ConnectionStateListenerManagerFactory.standard;
private int simulatedSessionExpirationPercent = 100;
Expand Down Expand Up @@ -506,6 +508,19 @@ public Builder runSafeService(Executor runSafeService) {
return this;
}

/**
* By default, watches are run sequentially.
* If an executor is provided here, then all watch calls will be run asynchronously via this executor.
* This executor service will be closed when the CuratorFramework is closed.
*
* @param asyncWatchService executorService to use for all watch calls
* @return this
*/
public Builder asyncWatchService(ExecutorService asyncWatchService) {
this.asyncWatchService = asyncWatchService;
return this;
}

/**
* Sets the connection state listener manager factory. For example,
* you can set {@link org.apache.curator.framework.state.ConnectionStateListenerManagerFactory#circuitBreaking(org.apache.curator.RetryPolicy)}
Expand All @@ -530,6 +545,10 @@ public Executor getRunSafeService() {
return runSafeService;
}

public ExecutorService getAsyncWatchService() {
return asyncWatchService;
}

public ACLProvider getAclProvider() {
return aclProvider;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ public class CuratorFrameworkImpl implements CuratorFramework {
private final EnsembleTracker ensembleTracker;
private final SchemaSet schemaSet;
private final Executor runSafeService;
private final ExecutorService asyncWatchService;
private final ZookeeperCompatibility zookeeperCompatibility;

private volatile ExecutorService executorService;
Expand Down Expand Up @@ -205,6 +206,7 @@ public void process(WatchedEvent watchedEvent) {
builder.withEnsembleTracker() ? new EnsembleTracker(this, builder.getEnsembleProvider()) : null;

runSafeService = makeRunSafeService(builder);
asyncWatchService = builder.getAsyncWatchService();
zookeeperCompatibility = builder.getZookeeperCompatibility();
}

Expand Down Expand Up @@ -294,6 +296,7 @@ protected CuratorFrameworkImpl(CuratorFrameworkImpl parent) {
schemaSet = parent.schemaSet;
ensembleTracker = parent.ensembleTracker;
runSafeService = parent.runSafeService;
asyncWatchService = parent.asyncWatchService;
zookeeperCompatibility = parent.zookeeperCompatibility;
}

Expand Down Expand Up @@ -430,7 +433,6 @@ public void close() {
Thread.currentThread().interrupt();
}
}

if (ensembleTracker != null) {
ensembleTracker.close();
}
Expand All @@ -445,6 +447,18 @@ public void close() {
unhandledErrorListeners.clear();
connectionStateManager.close();
client.close();

if (asyncWatchService != null) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If one of those long-running processes interacts with curator, I could see it failing. It'd be better to have close() relatively early call shutdown(), after publishing the close event.

asyncWatchService.shutdown();
try {
if (!asyncWatchService.awaitTermination(maxCloseWaitMs, TimeUnit.MILLISECONDS)) {
asyncWatchService.shutdownNow();
}
} catch (InterruptedException e) {
// Interrupted while interrupting; I give up.
Thread.currentThread().interrupt();
}
}
}
}

Expand Down Expand Up @@ -630,6 +644,10 @@ FailedRemoveWatchManager getFailedRemoveWatcherManager() {
return failedRemoveWatcherManager;
}

ExecutorService getAsyncWatchService() {
return asyncWatchService;
}

RetryLoop newRetryLoop() {
return client.newRetryLoop();
}
Expand Down
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the "asyncExecutor" could in fact always be pre-initialized to a "direct" (same thread Executor like Guava MoreExecutors.newDirectExecutorService), then there'd be no branching check around if it's present or not. Solr uses this technique in SimpleFacets too. That said, there's little complexity here so I'm not going to recommend it.

Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import java.io.Closeable;
import java.util.concurrent.Executor;
import org.apache.curator.framework.api.CuratorWatcher;
import org.apache.curator.utils.ThreadUtils;
import org.apache.zookeeper.WatchedEvent;
Expand Down Expand Up @@ -65,14 +66,25 @@ public void process(WatchedEvent event) {
client.getWatcherRemovalManager().noteTriggeredWatcher(this);
}

Runnable watchRunnable = null;
if (actualWatcher != null) {
actualWatcher.process(new NamespaceWatchedEvent(client, event));
watchRunnable = () -> actualWatcher.process(new NamespaceWatchedEvent(client, event));
} else if (curatorWatcher != null) {
try {
curatorWatcher.process(new NamespaceWatchedEvent(client, event));
} catch (Exception e) {
ThreadUtils.checkInterrupted(e);
client.logError("Watcher exception", e);
watchRunnable = () -> {
try {
curatorWatcher.process(new NamespaceWatchedEvent(client, event));
} catch (Exception e) {
ThreadUtils.checkInterrupted(e);
client.logError("Watcher exception", e);
}
};
}
if (watchRunnable != null) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps more elegant to add an "else" that returns. Therefore, watchRunnable could be final non-null; no condition to act on it's null-ness.

Executor watchExecutor = client.getAsyncWatchService();
if (watchExecutor != null) {
watchExecutor.execute(watchRunnable);
} else {
watchRunnable.run();
}
}
}
Expand Down
Loading