-
Notifications
You must be signed in to change notification settings - Fork 266
User's Guide
- Getting ParSeq
- Key Concepts
- Creating an Engine
- Creating and Running Tasks
- Handling Errors
- Using Timeouts
- Cancelling
- Tracing
- Unit Testing
- Integrating with ParSeq
See Getting ParSeq for instructions on how to integrate ParSeq with your project or to get the latest ParSeq binaries.
Before getting into the details of using ParSeq it is worth describing a few terms that will be used frequently.
A Task is a basic unit of work in the ParSeq system - it is similar to a Java Callable, but its result can be set asynchronously. Tasks can be executed by an Engine (see below). They are not executed by the user directly. Task implements a Promise which is like a fully asynchronous Java Future. Tasks can be transformed and composed to produce desired results. A Plan is collection of tasks executed as a consequence of running a root task.
An Engine is used to run tasks. Normally application has one instance of Engine.
The Engine is used to run tasks in the ParSeq framework. To construct an instance, use:
import com.linkedin.parseq.Engine;
import com.linkedin.parseq.EngineBuilder;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
// ...
final int numCores = Runtime.getRuntime().availableProcessors();
final ExecutorService taskScheduler = Executors.newFixedThreadPool(numCores + 1);
final ScheduledExecutorService timerScheduler = Executors.newSingleThreadScheduledExecutor();
final Engine engine = new EngineBuilder()
.setTaskExecutor(taskScheduler)
.setTimerScheduler(timerScheduler)
.build();
With these settings ParSeq will have numCores + 1
threads available for executing tasks and 1
thread available for scheduling timers. These settings are reasonable place to start, but can be customized to particular use cases.
To stop the engine, use:
engine.shutdown();
engine.awaitTermination(1, TimeUnit.SECONDS);
taskScheduler.shutdown();
timerScheduler.shutdown();
This will initiate shutdown (no new tasks can be executed, but old tasks are allowed to finish) and waits for the engine to quiesce within 1 second. This also shuts down the executors that are used by ParSeq. ParSeq does not manage the lifecycle for these executors.
We do not recommend using the CallerRunsPolicy
or the AbortPolicy
with the underlying ParSeq task executor. The former may tie up unrelated threads (e.g. IO worker threads) in the execution of ParSeq tasks. The latter, as of v1.3.7, will cause a plan to be aborted if it cannot be rescheduled due to a RejectedExecutionException
.
Instead we recommend using standard strategies for managing overload: back-pressure, load shedding, or degraded responses. In addition, the time spent on a plan can be bounded using a timeouts.
Initially tasks are created by integrating existing libraries with ParSeq. If task involves non-blocking computation, it can be created using Task.action()
or Task.callable()
. We also provide Task.value()
and Task.failure()
for most trivial cases. New tasks are created by transforming and composing existing tasks.
Few words about ParSeq API: most of the methods that create new tasks have version that accept task decription. We recommend to give short, clear description to every task. It provides great value when it comes to debugging and troubleshooting using ParSeq's tracing mechanisms.
Almost every method on ParSeq Task
interface creates a new instance of a Task
that may refer to the task used to create it e.g. it might depend on its result and cause it to run when executed by an engine.
Tasks are lazy. They are descriptions of computations that will happen when task is executed by engine. Once you've created a task, you can run it by submitting it to the engine:
engine.run(task);
A main mechanism of transforming tasks is a map()
method.
Suppose we only need the HTTP content type of google home page. Having instance of a Task<Response>
that makes HTTP HEAD request:
Task<Response> head = HttpClient.head("http://www.google.com").task();
we can transform it into a task that returns the content type with the following code:
Task<String> contentType =
head.map("toContentType", response -> response.getContentType());
Note that existing head
task has not been modified. Instead, a new task was created that, when executed, will first run head
task and, after it is resolved, will apply provided transformation. If head
task failed for any reason then the contentType
task would also fail and provided transformation would not be invoked. This mechanism is described in details in Handling Errors section.
Using ParSeq's tracing tools we would get the following diagram for the task above:
If there is a need to only consume result produced by a task then we can use andThen()
method:
Task<String> printContentType = contentType.andThen("print", System.out::println);
In above example we used Java 8 Method Reference but we could as well use Lambda Expression:
Task<String> printContentType = contentType.andThen("print", s -> System.out.println(s));
Similarly, if we need to consume potential failure of a task we can use onFailure()
method:
Task<String> logFailure = contentType.onFailure("print stack trace", e -> e.printStackTrace());
Sometimes it is useful to treat potential failure of a task more explicitly. toTry()
method that transforms Task<T>
into Task<Try<T>>
where Try type explicitly represents possibility of task failure:
Task<Try<String>> contentType =
head.map("toContentType", response -> response.getContentType()).toTry();
Task<Try<String>> logContentType =
contentType.andThen("log", type -> {
if (type.isFailed()) {
type.getError().printStackTrace();
} else {
System.out.println("Content type: " + type.get());
}
});
Finally, transform()
method combines toTry()
with map()
:
Task<Response> get = HttpClient.get("http://www.google.com").task();
Task<Optional<String>> contents = get.transform("getContents", tryGet -> {
if (tryGet.isFailed()) {
return Success.of(Optional.empty());
} else {
return Success.of(Optional.of(tryGet.get().getResponseBody()));
}
});
In example above contents
task always completes successfully returning contents of google page wrapped with Optional
or Optional.empty()
if HTTP GET request failed.
Many tasks are composed of other tasks that are run sequentially or in parallel.
Suppose we want to get String
consisting of the content types of a few different pages fetched in parallel.
First, let's create a helper method that returns a task responsible for fetching content type for a URL.
private Task<String> getContentType(String url) {
return HttpClient.get(url).task()
.map("getContentType", response -> response.getContentType());
}
We can use Task.par()
method to compose tasks to run in parallel:
final Task<String> googleContentType = getContentType("http://www.google.com");
final Task<String> bingContentType = getContentType("http://www.bing.com");
final Task<String> contentTypes =
Task.par(googleContentType, bingContentType)
.map("concatenate", (google, bing) -> "Google: " + google + "\n" +
"Bing: " + bing + "\n");
Task.par()
creates a new task that will run the googleContentType
and bingContentType
tasks in parallel. We transformed result into String
using map()
method.
Diagram representing above example:
Result of running task above:
Google: text/html; charset=ISO-8859-1
Bing: text/html; charset=utf-8
We talk about sequential composition when we need to run tasks sequentially.
ParSeq provides andThen()
method that can be used to run a task after successful completion of another task:
// task that processes payment
Task<PaymentStatus> processPayment = processPayment(...);
// task that ships product
Task<ShipmentInfo> shipProduct = shipProduct(...);
// this task will ship product only if payment was
// successfully processed
Task<ShipmentInfo> shipAfterPayment =
processPayment.andThen("shipProductAterPayment", shipProduct);
In above example shipProduct task will run only if processPayment task finished successfully. Notice that shipProduct does not depend on an actual result of the task that proceeds it.
In many situations second task directly depends on a result of first task. Let's discuss this on the following example. Suppose we would like to get information about the first image from a given web page. We will write a task that will fetch a web page contents, find first reference to an image in it, fetch that image and return short description of it. We will have two asynchronous tasks: fetching page contents and fetching first image. Obviously second task depends on an actual result of the first task.
Let's decompose this problem into smaller pieces. First we'll need tasks to fetch data given a URL. We need a version that treats contents as a String
to fetch web page contents and one that treats contents as a binary data - for the image:
private Task<String> getAsString(String url) {
return HttpClient.get(url).task()
.map("bodyAsString", response -> response.getResponseBody());
}
private Task<byte[]> getAsBytes(String url) {
return HttpClient.get(url).task()
.map("bodyAsBytes", response -> response.getResponseBodyAsBytes());
}
Having methods above we can write a method that returns info for an image (for simplicity just its length), given its URL:
private Task<String> info(String url) {
return getAsBytes(url).map("info", body -> url + ": length = " + body.length);
}
We will also need a way to find occurrence of first image given a web page contents. Naive implementation (serving just as an example) might look like this:
private String findFirstImage(String body) {
Pattern pat = Pattern.compile("[\\('\"]([^\\(\\)'\"]+.(png|gif|jpg))[\\)'\"]");
Matcher matcher = pat.matcher(body);
matcher.find();
return matcher.group(1);
}
We have all the pieces, let's combine them. First approach, using map()
method:
getAsString(url).map("firstImageInfo", body -> info(url + findFirstImage(body)));
The problem is that type of this expression is Task<Task<String>>
. This situation (nested tasks) will occur when one task depends on an outcome of another task. In this case task that fetches image obviously depends on the outcome of task that fetches the contents of a web page. There is a method that can flatten such an expression, unsurprisingly called Task.flatten()
:
Task.flatten(getAsString(url).map("firstImageInfo", body -> info(url + findFirstImage(body))));
Result of above expression has type Task<String>
. This is such a common pattern that there is a shorthand which combines flatten()
and map()
called flatMap()
:
getAsString(url).flatMap("firstImageInfo", body -> info(url + findFirstImage(body)));
Now we can write a method that returns info of a first image of a given web page:
private Task<String> firstImageInfo(String url) {
return getAsString(url).flatMap("firstImageInfo", body -> info(url + findFirstImage(body)));
}
Let's use it on google.com:
final Task<String> firstImageInfo = firstImageInfo("http://www.google.com");
Result:
http://www.google.com/images/google_favicon_128.png: length = 3243
Trace obtained from this task:
Finally let's combine sequential and parallel composition. We will, in parallel, get info of first image on google.com and bing.com web pages:
final Task<String> googleInfo = firstImageInfo("http://www.google.com");
final Task<String> bingInfo = firstImageInfo("http://www.bing.com");
Task<String> infos = Task.par(googleInfo, bingInfo)
.map("concatenate", (google, bing) -> "Google: " + google + "\n" +
"Bing: " + bing + "\n");
Running above task would yield the following trace:
And here is the result:
Google: http://www.google.com/images/google_favicon_128.png: length = 3243
Bing: http://www.bing.com/s/a/hpc12.png: length = 5574
Main principle in ParSeq is that task failure is always propagated to tasks that depend on it. Generally there is no need for catching or re-throwing exceptions:
Task<String> failing = Task.callable("hello", () -> {
return "Hello World".substring(100);
});
Task<Integer> length = failing.map("length", s -> s.length());
In example above length task would fail with java.lang.StringIndexOutOfBoundsException
that was propagated from failing task.
Often degraded behavior is a better choice than simply propagating an exception. If there exist a reasonable fallback value we can use recover()
method to recover from a failure:
Task<String> failing = Task.callable("hello", () -> {
return "Hello World".substring(100);
});
Task<Integer> length = failing.map("length", s -> s.length())
.recover("withDefault0", e -> 0);
This time length task recovers from java.lang.StringIndexOutOfBoundsException
with fallback default value 0. Notice that recovery function accepts exception that caused the failure as a parameter.
Sometimes we don't have fallback value ready to be used but we can compute it using another task e.g. compute it asynchronously. In those cases we can use recoverWith()
method. The difference between recover()
and recoverWith()
is that the latter returns an instance of a Task
that will be executed to obtain fallback value. The following example shows how to use recoverWith()
to fetch a user from main DB when fetching it from cache failed:
Task<Person> user = fetchFromCache(id)
.recoverWith(e -> fetchFromDB(id));
It is a good idea to set a timeout on asynchronous tasks. ParSeq provides withTimeout()
method to do that:
final Task<Response> google = HttpClient.get("http://google.com").task()
.withTimeout(10, TimeUnit.MILLISECONDS);
In above example google task will fail with TimeoutException
if fetching contents of google.com takes more 10ms.
ParSeq supports tasks cancellation. Cancelling a task means that result of that task is not relevant anymore. Task can be cancelled at any time. Implementation of a task can detect situation when it has been cancelled and react to it accordingly. Cancelled task completes with CancellationException
and thus behaves like a failed task i.e. cancellation is automatically propagated to all tasks that depend on it. Even though cancelled tasks are effectively failed, methods that handle failures such as recover()
, recoverWith()
, onFailure()
etc. are not called when task is cancelled. The reason is that task cancellation means that result of a task is not relevant, thus normally there is no point trying to recover from this situation. To cancel a task we can call cancel()
method.
Usually main goal of a task is to calculate a value. You can think of a task as an asynchronous function. Once the value has been calculated there is no point runing task again. Thus, ParSeq will run any task only once. Engine will recognize task that has been completed (or even started) and will not run it again.
It is possible for the value of a task to be known before it finished running or even befor it is started. One of such cases occurs when we specify timeout for a task. After specified amount of time task is failed by timeoutTimer even though original task might still be running. In such cases ParSeq will automatically cancell original task with EarlyFinishException
and, because of failure propagation, all tasksk that depend on it.
Recall timeout example:
Resulting task has failed after 10ms, as represented by red circle, while the original GET task has been automatically cancelled with EarlyFinishException
, which is represented by yellow color. See tracing section for details describing generated diagrams.
See Tracing for details about ParSeq's tracing and visualization features.
ParSeq comes with a test
classified artifact that contains BaseEngineTest which can be used as a base class for ParSeq-related tests. It will automatically create and shut down engine for every test case and provides many usefult methods for running and tracing tasks.
To use it add the following dependency:
<dependency>
<groupId>com.linkedin.parseq</groupId>
<artifactId>parseq</artifactId>
<version>2.0.0</version>
<classifier>test</classifier>
<scope>test</scope>
</dependency>
This section descfribes how to integrate existing asynchronous libraries with ParSeq. We provided two examples that might be useful as further guides:
- parseq-http-client integrates async http client and provides tasks for making asynchronous HTTP requests
- parseq-exec integrates Java's Process API and provides tasks for asynchronous running native processes
We will use Task.async()
method to create a task instance that will be completed asynchronously. It accepts either Callable
or a Function1
that returns an instance of a Promise
. When returned task is executed it will call provided callabel or a function and will propagate returned promise completion to itself. It means that task will complete when the promise completes. The following example shows how async http client can be integrated with ParSeq, for full source see WrappedRequestBuilder:
public Task<Response> task(final String desc) {
return Task.async(desc, () -> {
final SettablePromise<Response> result = Promises.settable();
_delegate.execute(new AsyncCompletionHandler<Response>() {
@Override
public Response onCompleted(final Response response) throws Exception {
result.done(response);
return response;
}
@Override
public void onThrowable(Throwable t) {
result.fail(t);
}
});
return result;
});
}
In example above we first create instance of SettablePromise
. Next, we invoke http client's method and pass a completion handler which will propagate result to the promise.
Unfortunately not every library provides asynchronous API, e.g. JDBC libraries. We should never run blocking code directly inside ParSeq task because this can affect other asynchronous tasks.
In order to integrate blocking API with ParSeq we can use Task.blocking()
method. It accepts two parameters:
-
Callable
that executes blocking code -
Executor
instance on which callable will be called
At runtime ParSeq will submit a blocking callable to provided executor and will complete the task with its result once it has completed.
Managing executors used for blocking calls it outside of ParSeq responsibilities and is use case specific but we recommend to always limit number of threads used by executor, limit size of work queue and specify Rejection Policy other than CallerRunsPolicy
.