diff --git a/subprojects/parseq-batching/src/test/java/com/linkedin/parseq/batching/TestTaskSimpleBatchingStrategyBlocking.java b/subprojects/parseq-batching/src/test/java/com/linkedin/parseq/batching/TestTaskSimpleBatchingStrategyBlocking.java index 4dd28e28..81a85982 100644 --- a/subprojects/parseq-batching/src/test/java/com/linkedin/parseq/batching/TestTaskSimpleBatchingStrategyBlocking.java +++ b/subprojects/parseq-batching/src/test/java/com/linkedin/parseq/batching/TestTaskSimpleBatchingStrategyBlocking.java @@ -36,7 +36,7 @@ private Strategy(long sleepMs) { @Override public Task>> taskForBatch(Set keys) { - return Task.blocking(() -> { + return Task.callableInExecutor(() -> { try { // make this batching task long-running Thread.sleep(_sleepMs); diff --git a/subprojects/parseq-zk-client/src/main/java/com/linkedin/parseq/zk/client/ZKClientImpl.java b/subprojects/parseq-zk-client/src/main/java/com/linkedin/parseq/zk/client/ZKClientImpl.java index 9403b0f3..cfe719da 100644 --- a/subprojects/parseq-zk-client/src/main/java/com/linkedin/parseq/zk/client/ZKClientImpl.java +++ b/subprojects/parseq-zk-client/src/main/java/com/linkedin/parseq/zk/client/ZKClientImpl.java @@ -18,14 +18,12 @@ import com.linkedin.parseq.Context; import com.linkedin.parseq.Engine; -import com.linkedin.parseq.MultiException; import com.linkedin.parseq.Task; import com.linkedin.parseq.promise.Promise; import com.linkedin.parseq.promise.PromiseListener; import com.linkedin.parseq.promise.Promises; import com.linkedin.parseq.promise.SettablePromise; import java.io.IOException; -import java.util.ArrayList; import java.util.EnumMap; import java.util.List; import java.util.Map; @@ -282,7 +280,7 @@ public Task delete(String path, int version) { */ @Override public Task> multi(List ops, Executor executor) { - return Task.blocking(() -> _zkClient.multi(ops), executor); + return Task.callableInExecutor(() -> _zkClient.multi(ops), executor); } /** diff --git a/subprojects/parseq/src/main/java/com/linkedin/parseq/AsyncCallableTask.java b/subprojects/parseq/src/main/java/com/linkedin/parseq/AsyncCallableTask.java index 8b466168..06553258 100644 --- a/subprojects/parseq/src/main/java/com/linkedin/parseq/AsyncCallableTask.java +++ b/subprojects/parseq/src/main/java/com/linkedin/parseq/AsyncCallableTask.java @@ -15,7 +15,6 @@ */ package com.linkedin.parseq; -import com.linkedin.parseq.EngineBuilder; import com.linkedin.parseq.internal.ArgumentUtil; import com.linkedin.parseq.promise.Promise; import com.linkedin.parseq.promise.Promises; @@ -37,7 +36,7 @@ * To use this class with an engine, register an executor with engine using * {@link #register(EngineBuilder, java.util.concurrent.Executor)} * - * @deprecated As of 2.0.0, replaced by {@link Task#blocking(String, Callable, Executor) Task.blocking}. + * @deprecated As of 2.0.0, replaced by {@link Task#callableInExecutor(String, Callable, Executor)}. * @author Walter Fender (wfender@linkedin.com) */ @Deprecated @@ -51,7 +50,7 @@ public static void register(EngineBuilder builder, Executor executor) { } /** - * @deprecated As of 2.0.0, replaced by {@link Task#blocking(String, Callable, Executor) Task.blocking}. + * @deprecated As of 2.0.0, replaced by {@link Task#callableInExecutor(String, Callable, Executor)}. */ @Deprecated public AsyncCallableTask(final Callable syncJob) { @@ -59,7 +58,7 @@ public AsyncCallableTask(final Callable syncJob) { } /** - * @deprecated As of 2.0.0, replaced by {@link Task#blocking(String, Callable, Executor) Task.blocking}. + * @deprecated As of 2.0.0, replaced by {@link Task#callableInExecutor(String, Callable, Executor)}. */ @Deprecated public AsyncCallableTask(final String name, final Callable syncJob) { diff --git a/subprojects/parseq/src/main/java/com/linkedin/parseq/Task.java b/subprojects/parseq/src/main/java/com/linkedin/parseq/Task.java index 7e275703..9ae027d5 100644 --- a/subprojects/parseq/src/main/java/com/linkedin/parseq/Task.java +++ b/subprojects/parseq/src/main/java/com/linkedin/parseq/Task.java @@ -20,7 +20,6 @@ import java.util.concurrent.Callable; import java.util.concurrent.CompletionStage; import java.util.concurrent.Executor; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; @@ -1079,7 +1078,7 @@ public static Task flatten(final Task> task) { * Creates a new task that have a value of type {@code Void}. Because the * returned task returns no value, it is typically used to produce side effects. * It is not appropriate for long running or blocking actions. If action is - * long running or blocking use {@link #blocking(String, Callable, Executor) blocking} method. + * long running or blocking use {@link #callableInExecutor(String, Callable, Executor)} method. * *
    * // this task will print "Hello" on standard output
@@ -1167,7 +1166,7 @@ public static  Task failure(final Throwable failure) {
    * from the supplied callable. This task is useful when doing basic
    * computation that does not require asynchrony. It is not appropriate for
    * long running or blocking callables. If callable is long running or blocking
-   * use {@link #blocking(String, Callable, Executor) blocking} method.
+   * use {@link #callableInExecutor(String, Callable, Executor)} method.
    *
    * 
    * // this task will complete with {@code String} representing current time
@@ -1313,7 +1312,7 @@ public static  Task fromTry(final Try tried) {
    *
    * This method is not appropriate for long running or blocking callables.
    * If callable is long running or blocking use
-   * {@link #blocking(String, Callable, Executor) blocking} method.
+   * {@link #callableInExecutor(String, Callable, Executor)} method.
    * 

* * @param the type of the return value for this task @@ -1403,10 +1402,10 @@ public static Task async(final Function1> f * @return a new task that will submit the callable to given executor and complete * with result returned by that callable */ - public static Task blocking(final String name, final Callable callable, final Executor executor) { + public static Task callableInExecutor(final String name, final Callable callable, final Executor executor) { ArgumentUtil.requireNotNull(callable, "callable"); ArgumentUtil.requireNotNull(callable, "executor"); - Task blockingTask = async(name, () -> { + Task asyncCallableTask = async(name, () -> { final SettablePromise promise = Promises.settable(); executor.execute(() -> { try { @@ -1417,18 +1416,36 @@ public static Task blocking(final String name, final Callable Task callableInExecutor(final Callable callable, final Executor executor) { + return callableInExecutor("callableInExecutor: " + _taskDescriptor.getDescription(callable.getClass().getName()), callable, executor); } /** - * Equivalent to {@code blocking("blocking", callable, executor)}. - * @see #blocking(String, Callable, Executor) + * @deprecated please use {@link Task#callableInExecutor(Callable, Executor)} */ + @Deprecated public static Task blocking(final Callable callable, final Executor executor) { - return blocking("blocking: " + _taskDescriptor.getDescription(callable.getClass().getName()), callable, executor); + return callableInExecutor("callableInExecutor: " + _taskDescriptor.getDescription(callable.getClass().getName()), callable, executor); + } + + + /** + * @deprecated please use {@link Task#callableInExecutor(String, Callable, Executor)} + */ + @Deprecated + public static Task blocking(final String name, final Callable callable, final Executor executor) { + return callableInExecutor(name, callable, executor); } + /** * Creates a new task that will run given tasks in parallel. Returned task * will be resolved with results of all tasks as soon as all of them has diff --git a/subprojects/parseq/src/main/java/com/linkedin/parseq/TaskType.java b/subprojects/parseq/src/main/java/com/linkedin/parseq/TaskType.java index 4e385e9a..406ef325 100644 --- a/subprojects/parseq/src/main/java/com/linkedin/parseq/TaskType.java +++ b/subprojects/parseq/src/main/java/com/linkedin/parseq/TaskType.java @@ -6,7 +6,7 @@ */ public enum TaskType { FUSION ("fusion"), - BLOCKING ("blocking"), + CALLABLE_IN_EXECUTOR("callbleInExecutor"), SHAREABLE ("shareable"), FLATTEN ("flatten"), WITH_SIDE_EFFECT ("withSideEffect"), diff --git a/subprojects/parseq/src/test/java/com/linkedin/parseq/TestTaskFactoryMethods.java b/subprojects/parseq/src/test/java/com/linkedin/parseq/TestTaskFactoryMethods.java index f20f40db..ec3dc0eb 100644 --- a/subprojects/parseq/src/test/java/com/linkedin/parseq/TestTaskFactoryMethods.java +++ b/subprojects/parseq/src/test/java/com/linkedin/parseq/TestTaskFactoryMethods.java @@ -91,10 +91,10 @@ public void testAsyncWithContext() { } @Test - public void testBlocking() { + public void testcallableInExecutor() { TestingExecutorService es = new TestingExecutorService(Executors.newSingleThreadExecutor()); try { - Task task = Task.blocking(() -> "from blocking", es); + Task task = Task.callableInExecutor(() -> "from blocking", es); runAndWait("TestTaskFactoryMethods.testBlocking", task); assertEquals(task.get(), "from blocking"); assertEquals(es.getCount(), 1); diff --git a/subprojects/parseq/src/test/java/com/linkedin/parseq/TestTaskType.java b/subprojects/parseq/src/test/java/com/linkedin/parseq/TestTaskType.java index fb37458e..ebad3cc9 100644 --- a/subprojects/parseq/src/test/java/com/linkedin/parseq/TestTaskType.java +++ b/subprojects/parseq/src/test/java/com/linkedin/parseq/TestTaskType.java @@ -28,9 +28,9 @@ public void testFusionTaskType() { public void testBlockingTaskType() { TestingExecutorService es = new TestingExecutorService(Executors.newSingleThreadExecutor()); try { - Task task = Task.blocking(() -> "blocking task", es); + Task task = Task.callableInExecutor(() -> "blocking task", es); runAndWait("blockingTaskType", task); - assertEquals(task.getShallowTrace().getTaskType(), TaskType.BLOCKING.getName()); + assertEquals(task.getShallowTrace().getTaskType(), TaskType.CALLABLE_IN_EXECUTOR.getName()); } finally { es.shutdown(); }