Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rename Task.blocking to Task.callableInExecutor #292

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ private Strategy(long sleepMs) {

@Override
public Task<Map<Integer, Try<String>>> taskForBatch(Set<Integer> keys) {
return Task.blocking(() -> {
return Task.runInExecutor(() -> {
try {
// make this batching task long-running
Thread.sleep(_sleepMs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,12 @@

import com.linkedin.parseq.Context;
import com.linkedin.parseq.Engine;
import com.linkedin.parseq.MultiException;
import com.linkedin.parseq.Task;
import com.linkedin.parseq.promise.Promise;
import com.linkedin.parseq.promise.PromiseListener;
import com.linkedin.parseq.promise.Promises;
import com.linkedin.parseq.promise.SettablePromise;
import java.io.IOException;
import java.util.ArrayList;
import java.util.EnumMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -282,7 +280,7 @@ public Task<Void> delete(String path, int version) {
*/
@Override
public Task<List<OpResult>> multi(List<Op> ops, Executor executor) {
return Task.blocking(() -> _zkClient.multi(ops), executor);
return Task.runInExecutor(() -> _zkClient.multi(ops), executor);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
*/
package com.linkedin.parseq;

import com.linkedin.parseq.EngineBuilder;
import com.linkedin.parseq.internal.ArgumentUtil;
import com.linkedin.parseq.promise.Promise;
import com.linkedin.parseq.promise.Promises;
Expand All @@ -37,7 +36,7 @@
* To use this class with an engine, register an executor with engine using
* {@link #register(EngineBuilder, java.util.concurrent.Executor)}
*
* @deprecated As of 2.0.0, replaced by {@link Task#blocking(String, Callable, Executor) Task.blocking}.
* @deprecated As of 2.0.0, replaced by {@link Task#runInExecutor(String, Callable, Executor) Task.blocking}.
* @author Walter Fender ([email protected])
*/
@Deprecated
Expand All @@ -51,15 +50,15 @@ public static void register(EngineBuilder builder, Executor executor) {
}

/**
* @deprecated As of 2.0.0, replaced by {@link Task#blocking(String, Callable, Executor) Task.blocking}.
* @deprecated As of 2.0.0, replaced by {@link Task#runInExecutor(String, Callable, Executor) Task.blocking}.
*/
@Deprecated
public AsyncCallableTask(final Callable<R> syncJob) {
this(null, syncJob);
}

/**
* @deprecated As of 2.0.0, replaced by {@link Task#blocking(String, Callable, Executor) Task.blocking}.
* @deprecated As of 2.0.0, replaced by {@link Task#runInExecutor(String, Callable, Executor) Task.blocking}.
*/
@Deprecated
public AsyncCallableTask(final String name, final Callable<R> syncJob) {
Expand Down
27 changes: 19 additions & 8 deletions subprojects/parseq/src/main/java/com/linkedin/parseq/Task.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -1079,7 +1078,7 @@ public static <R> Task<R> flatten(final Task<Task<R>> task) {
* Creates a new task that have a value of type {@code Void}. Because the
* returned task returns no value, it is typically used to produce side effects.
* It is not appropriate for long running or blocking actions. If action is
* long running or blocking use {@link #blocking(String, Callable, Executor) blocking} method.
* long running or blocking use {@link #runInExecutor(String, Callable, Executor) blocking} method.
*
* <blockquote><pre>
* // this task will print "Hello" on standard output
Expand Down Expand Up @@ -1167,7 +1166,7 @@ public static <T> Task<T> failure(final Throwable failure) {
* from the supplied callable. This task is useful when doing basic
* computation that does not require asynchrony. It is not appropriate for
* long running or blocking callables. If callable is long running or blocking
* use {@link #blocking(String, Callable, Executor) blocking} method.
* use {@link #runInExecutor(String, Callable, Executor) blocking} method.
*
* <blockquote><pre>
* // this task will complete with {@code String} representing current time
Expand Down Expand Up @@ -1313,7 +1312,7 @@ public static <T> Task<T> fromTry(final Try<? extends T> tried) {
*
* This method is not appropriate for long running or blocking callables.
* If callable is long running or blocking use
* {@link #blocking(String, Callable, Executor) blocking} method.
* {@link #runInExecutor(String, Callable, Executor) blocking} method.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update blocking to runInExecutor. same for other places

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think here the word blocking does not need to be replaced.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

text will point to #blocking method but link for new method in parsed doc. It will be confusing since we can't remove blocking method

* <p>
*
* @param <T> the type of the return value for this task
Expand Down Expand Up @@ -1403,7 +1402,7 @@ public static <T> Task<T> async(final Function1<Context, Promise<? extends T>> f
* @return a new task that will submit the callable to given executor and complete
* with result returned by that callable
*/
public static <T> Task<T> blocking(final String name, final Callable<? extends T> callable, final Executor executor) {
public static <T> Task<T> runInExecutor(final String name, final Callable<? extends T> callable, final Executor executor) {
aman1309 marked this conversation as resolved.
Show resolved Hide resolved
ArgumentUtil.requireNotNull(callable, "callable");
ArgumentUtil.requireNotNull(callable, "executor");
Task<T> blockingTask = async(name, () -> {
Expand All @@ -1422,13 +1421,25 @@ public static <T> Task<T> blocking(final String name, final Callable<? extends T
}

/**
* Equivalent to {@code blocking("blocking", callable, executor)}.
* @see #blocking(String, Callable, Executor)
* Equivalent to {@code runInExecutor("runInExecutor", callable, executor)}.
* @see #runInExecutor(String, Callable, Executor)
*/
public static <T> Task<T> runInExecutor(final Callable<? extends T> callable, final Executor executor) {
return runInExecutor("runInExecutor: " + _taskDescriptor.getDescription(callable.getClass().getName()), callable, executor);
}

@Deprecated

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment on the replacement to use.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

udpated

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add java doc to redirect to new method

public static <T> Task<T> blocking(final Callable<? extends T> callable, final Executor executor) {
return blocking("blocking: " + _taskDescriptor.getDescription(callable.getClass().getName()), callable, executor);
return runInExecutor("runInExecutor: " + _taskDescriptor.getDescription(callable.getClass().getName()), callable, executor);
}


@Deprecated

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment on the replacement to use.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

udpated

public static <T> Task<T> blocking(final String name, final Callable<? extends T> callable, final Executor executor) {
return runInExecutor(name, callable, executor);
}


/**
* Creates a new task that will run given tasks in parallel. Returned task
* will be resolved with results of all tasks as soon as all of them has
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public void testAsyncWithContext() {
public void testBlocking() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update test name

TestingExecutorService es = new TestingExecutorService(Executors.newSingleThreadExecutor());
try {
Task<String> task = Task.blocking(() -> "from blocking", es);
Task<String> task = Task.runInExecutor(() -> "from blocking", es);
runAndWait("TestTaskFactoryMethods.testBlocking", task);
assertEquals(task.get(), "from blocking");
assertEquals(es.getCount(), 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public void testFusionTaskType() {
public void testBlockingTaskType() {
TestingExecutorService es = new TestingExecutorService(Executors.newSingleThreadExecutor());
try {
Task<String> task = Task.blocking(() -> "blocking task", es);
Task<String> task = Task.runInExecutor(() -> "blocking task", es);
runAndWait("blockingTaskType", task);
assertEquals(task.getShallowTrace().getTaskType(), TaskType.BLOCKING.getName());
} finally {
Expand Down