1. Introduction
In this tutorial, we’ll discuss fork/join framework which is an implementation of ExecutorService
interface.
The computer systems in present time have multiple cores. A better concurrent programming approach is to use all the computation power of all cores. The fork/join framework enables to use processing power of all available cores. The fork/join framework works on the divide and conquer approach. The fork/join approach is best suited for a problem which can be broken down into smaller problems recursively where each smaller problem can be solved or worked upon independently. This fork/join is a implementation of ExecutorService
interface, and like ExecutorService
implementation, the fork/join framework assigns tasks to worker threads in a thread pool.
The framework “forks”, recursively breaking the task into smaller independent subtasks. When tasks are executed, the framework “joins” the results of subtasks into a single result. In the case of a task which returns void, the program simply waits until every subtask is executed.
The main call of the fork/join framework is the ForkJoinPool
class which is a subclass of AbstractExecutorService
class. ForkJoinPool
implements the core work-stealing algorithm and can execute ForkJoinTask
processes.
2. Work stealing
Work-stealing algorithm is used to redistribute and balance the tasks among the worker threads in the pool. When a task in the queue of a worker is divided into two subtasks, one of the two subtasks is stolen by another idle worker.
The subtasks are solved with the help of worker threads provided in a thread pool. Each worker thread stores sub-tasks it will work on in a double-ended queue. The worker thread works on a task by taking out a task at the head of the dequeue. When the worker thread’s deque is empty, it means all its sub-tasks are complete. When a worker thread nothing to work on, a worker thread will randomly pick a worker thread from the pool and will try to steal work from it. It then uses the first-in, first-out approach (FIFO) to take sub-tasks from the tail end of this thread’s dequeue.
3. ForkJoinPool class
ForkJoinPool
class extends java.util.concurrent.AbstractExecutorService
and is responsible for running and monitoring submitted ForkJoinTask
. All threads in the pool try to find and execute tasks submitted to the pool. If work does not exist the threads are in blocking waiting state.
A statically constructed common pool is available. This pool is statically constructed and its run state is unaffected by attempts to shutdown()
or shutdownNow()
. A ForkJoinTask
can be submitted to a specified pool but for most of the applications, commonPool is enough. When not in use, threads are reclaimed which result is efficient resource usage.
If a separate pool is required, ForkJoinPool
can be created using following constructors:
ForkJoinPool(int parallelism)
ForkJoinPool(int parallelism, ForkJoinPool.ForkJoinWorkerThreadFactory factory, Thread.UncaughtExceptionHandler handler, boolean asyncMode)
When ForkJoinPool
is created without any parameters, parallelism equal to Runtime.availableProcessors()
. The ForkJoinPool
class also provides tools to get information about the thread pool state and performance.
Following are the important methods called from non-fork/join clients:
public void execute(ForkJoinTask task)
: Arranges for (asynchronous) execution of the given task.public T invoke(ForkJoinTask task)
: Performs the given task, returning its result upon completion.public ForkJoinTask submit(ForkJoinTask task)
: Submits aForkJoinTask
for execution.
4. ForkJoinTask class
ForkJoinTask
class is an abstract base class for tasks that are run within a ForkJoinPool
. Following are its known subclasses:
CountedCompleter
: computations in which completed actions trigger other action.RecursiveAction
: computations that do not return results.RecursiveTask
: computations that return results.
A ForkJoinTask
is a lightweight form of Future
. ForkJoinTask
computations should avoid blocking operations or synchronized methods.
There is one important method V join()
which returns the result of the computation when it is done.
Following are the important methods called from within fork/join computations:
public final ForkJoinTask fork()
: Arranges to asynchronously execute this task.public final V invoke()
: Commences performing this task, awaits its completion if necessary, and returns its result.
5. RecursiveTask
To submit tasks to ForkJoin pool, you have to create a subclass of RecursiveTask
, where R is the type of the result produced by the parallelized task (and each of its subtasks) or of RecursiveAction
if the task returns no result. You can define RecursiveTasks by implementing implement its single abstract method, compute
:
protected abstract R compute();
Following is the pseudocode of a RecursiveTask:
if (no longer possible to divide the task) {
compute task sequentially
} else {
split task in two subtasks
call this method recursively
wait for the completion of all subtasks
combine the results of each subtask
}
What this pseudocode represents is nothing but parallel version of the well-known divide-and-conquer algorithm.
6. Best practices for using the fork/join framework
- Invoking the join method on a task blocks the caller until the result produced by that task is ready. That is why, it should be called after the computation of both subtasks has been started. Otherwise, every subtask will have to wait for the other one to complete before starting.
- Only sequential code should use
invoke
to begin parallel computation. In other words, theinvoke
method of a ForkJoinPool shouldn’t be used from within aRecursiveTask
. - You should call
compute
on one of the left or right subtasks rather callingfork
on both subtasks. Doing this allows you to reuse the same thread for one of the two subtasks and avoid the overhead caused by the unnecessary allocation of a further task on the pool. - Forking a large number of subtasks is usually advisable and good solution. If the subtasks are fine-grained and take same amount of time, this keeps all the cores of your CPU equally busy.
7. Example
In this example, we’ll try to find the sum of elements of an array.
import java.util.concurrent.ExecutionException; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveTask; public class MainThread { public static void main(final String[] arguments) throws InterruptedException, ExecutionException { int threads = Runtime.getRuntime().availableProcessors(); System.out.println("available processors: " + threads); // create array int[] array = new int[1000]; // populate array with elements for (int i = 0; i < array.length; i++) { array[i] = i; } ForkJoinPool forkJoinPool = new ForkJoinPool(threads); Long result = forkJoinPool.invoke(new Sum(array, 0, array.length)); System.out.println("result :" + result); } static class Sum extends RecursiveTask<Long> { int low; int high; int[] array; Sum(int[] array, int low, int high) { this.array = array; this.low = low; this.high = high; } protected Long compute() { //if the array has 10 elements calculate sum if (high - low <= 10) { long sum = 0; for (int i = low; i < high; ++i) sum += array[i]; return sum; } else { // if array has more than 10 elements // divide the array and calculate the sum int mid = low + (high - low) / 2; Sum left = new Sum(array, low, mid); Sum right = new Sum(array, mid, high); left.fork(); long rightResult = right.compute(); long leftResult = left.join(); return leftResult + rightResult; } } } }
Output
available processors: 6 result :499500