Learnitweb

Fork/Join framework in Java

1. Introduction

In this tutorial, we’ll discuss fork/join framework which is an implementation of ExecutorService interface.

The computer systems in present time have multiple cores. A better concurrent programming approach is to use all the computation power of all cores. The fork/join framework enables to use processing power of all available cores. The fork/join framework works on the divide and conquer approach. The fork/join approach is best suited for a problem which can be broken down into smaller problems recursively where each smaller problem can be solved or worked upon independently. This fork/join is a implementation of ExecutorService interface, and like ExecutorService implementation, the fork/join framework assigns tasks to worker threads in a thread pool.

The framework “forks”, recursively breaking the task into smaller independent subtasks. When tasks are executed, the framework “joins” the results of subtasks into a single result. In the case of a task which returns void, the program simply waits until every subtask is executed.

The main call of the fork/join framework is the ForkJoinPool class which is a subclass of AbstractExecutorService class. ForkJoinPool implements the core work-stealing algorithm and can execute ForkJoinTask processes.

2. Work stealing

Work-stealing algorithm is used to redistribute and balance the tasks among the worker threads in the pool. When a task in the queue of a worker is divided into two subtasks, one of the two subtasks is stolen by another idle worker.

The subtasks are solved with the help of worker threads provided in a thread pool. Each worker thread stores sub-tasks it will work on in a double-ended queue. The worker thread works on a task by taking out a task at the head of the dequeue. When the worker thread’s deque is empty, it means all its sub-tasks are complete. When a worker thread nothing to work on, a worker thread will randomly pick a worker thread from the pool and will try to steal work from it. It then uses the first-in, first-out approach (FIFO) to take sub-tasks from the tail end of this thread’s dequeue.

3. ForkJoinPool class

ForkJoinPool class extends java.util.concurrent.AbstractExecutorService and is responsible for running and monitoring submitted ForkJoinTask. All threads in the pool try to find and execute tasks submitted to the pool. If work does not exist the threads are in blocking waiting state.

A statically constructed common pool is available. This pool is statically constructed and its run state is unaffected by attempts to shutdown() or shutdownNow(). A ForkJoinTask can be submitted to a specified pool but for most of the applications, commonPool is enough. When not in use, threads are reclaimed which result is efficient resource usage.

If a separate pool is required, ForkJoinPool can be created using following constructors:

  • ForkJoinPool(int parallelism)
  • ForkJoinPool(int parallelism, ForkJoinPool.ForkJoinWorkerThreadFactory factory, Thread.UncaughtExceptionHandler handler, boolean asyncMode)

When ForkJoinPool is created without any parameters, parallelism equal to Runtime.availableProcessors(). The ForkJoinPool class also provides tools to get information about the thread pool state and performance.

Following are the important methods called from non-fork/join clients:

  • public void execute(ForkJoinTask task): Arranges for (asynchronous) execution of the given task.
  • public T invoke(ForkJoinTask task): Performs the given task, returning its result upon completion.
  • public ForkJoinTask submit(ForkJoinTask task): Submits a ForkJoinTask for execution.

4. ForkJoinTask class

ForkJoinTask class is an abstract base class for tasks that are run within a ForkJoinPool. Following are its known subclasses:

  • CountedCompleter: computations in which completed actions trigger other action.
  • RecursiveAction: computations that do not return results.
  • RecursiveTask: computations that return results.

A ForkJoinTask is a lightweight form of Future. ForkJoinTask computations should avoid blocking operations or synchronized methods.

There is one important method V join() which returns the result of the computation when it is done.

Following are the important methods called from within fork/join computations:

  • public final ForkJoinTask fork(): Arranges to asynchronously execute this task.
  • public final V invoke(): Commences performing this task, awaits its completion if necessary, and returns its result.

5. RecursiveTask

To submit tasks to ForkJoin pool, you have to create a subclass of RecursiveTask, where R is the type of the result produced by the parallelized task (and each of its subtasks) or of RecursiveAction if the task returns no result. You can define RecursiveTasks by implementing implement its single abstract method, compute:

protected abstract R compute();

Following is the pseudocode of a RecursiveTask:

if (no longer possible to divide the task) {
	compute task sequentially
} else {
	split task in two subtasks
	call this method recursively
	wait for the completion of all subtasks
	combine the results of each subtask
} 

What this pseudocode represents is nothing but parallel version of the well-known divide-and-conquer algorithm.

6. Best practices for using the fork/join framework

  • Invoking the join method on a task blocks the caller until the result produced by that task is ready. That is why, it should be called after the computation of both subtasks has been started. Otherwise, every subtask will have to wait for the other one to complete before starting.
  • Only sequential code should use invoke to begin parallel computation. In other words, the invoke method of a ForkJoinPool shouldn’t be used from within a RecursiveTask.
  • You should call compute on one of the left or right subtasks rather calling fork on both subtasks. Doing this allows you to reuse the same thread for one of the two subtasks and avoid the overhead caused by the unnecessary allocation of a further task on the pool.
  • Forking a large number of subtasks is usually advisable and good solution. If the subtasks are fine-grained and take same amount of time, this keeps all the cores of your CPU equally busy.

7. Example

In this example, we’ll try to find the sum of elements of an array.

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;

public class MainThread {

	public static void main(final String[] arguments) throws InterruptedException, ExecutionException {

		int threads = Runtime.getRuntime().availableProcessors();
		System.out.println("available processors: " + threads);

		// create array
		int[] array = new int[1000];

		// populate array with elements
		for (int i = 0; i < array.length; i++) {
			array[i] = i;
		}

		ForkJoinPool forkJoinPool = new ForkJoinPool(threads);
		Long result = forkJoinPool.invoke(new Sum(array, 0, array.length));
		System.out.println("result :" + result);
	}

	static class Sum extends RecursiveTask<Long> {
		int low;
		int high;
		int[] array;

		Sum(int[] array, int low, int high) {
			this.array = array;
			this.low = low;
			this.high = high;
		}

		protected Long compute() {
			//if the array has 10 elements calculate sum
			if (high - low <= 10) {
				long sum = 0;

				for (int i = low; i < high; ++i)
					sum += array[i];
				return sum;
			} else {
				// if array has more than 10 elements
				// divide the array and calculate the sum
				int mid = low + (high - low) / 2;
				Sum left = new Sum(array, low, mid);
				Sum right = new Sum(array, mid, high);
				left.fork();
				long rightResult = right.compute();
				long leftResult = left.join();
				return leftResult + rightResult;
			}
		}
	}
}

Output

available processors: 6
result :499500