Java Fork/Join Framework

Category: Java   Tags: Java, Java Thread, Java MultiThread, Thread Synchronization, Fork/Join Framework, Java Concurrent Collection

A Fork/Join mechanism consist of two part:

Fork

Fork is a process of breaking a big task to smaller subtasks which can be executed concurrently. We keep breaking the task until it reaches a defined threshold. Each subtask can be executed in parallel by different CPUs, or different threads on the same CPU.

Java thread fork

Join

Once fork process is done all task wait for its subtask to finish its work. Once the subtasks have finished executing, the task may join all the results into one result.

Java thread join

Fork/Join Classes

Following four classes are used to execute Fork/Join Framework in Java:

  • ForkJoinTask
  • ForkJoinPool
  • RecursiveTask
  • RecursiveAction

ForkJoinTask

ForkJoinTask defines a task that can be managed by a ForkJoinPool. It is an abstract class. Task of ForkJoinTasks are executed by threads managed by a thread pool of type ForkJoinPool. This allows a large number of tasks to be managed by a small and fixed number of threads.

Some important methods of ForkJoinTask are:

public final ForkJoinTask<V> fork( )
This method submits the invoking task for asynchronous execution. It returns this. This method should be invoked only from within ForkJoinPool computations. Using in other contexts result in exceptions or errors.
public final V join( )
This method wait until invoking task is done. Once the invoking task is done, result is returned.
public final V invoke( )
It begins a task and wait for it to finish. The computed result is returned.

ForkJoinPool

A ForkJoinTasks is executed within a ForkJoinPool. It manages the execution of the tasks.

ForkJoinTasks defines several constructors. Two commonly used ones:

ForkJoinPool( )
It creates a default pool. Level of parallelism is the number of processors available in the system.
ForkJoinPool(int parallelism)
It creates a ForkJoinTasks with the indicated parallelism level. Parallelism is the number of threads that can execute concurrently. Level of parallelism determines the number of tasks that can be executed simultaneously.

RecursiveTask

A RecursiveTask split its task up into subtasks and merge the result of these subtasks into a collective result and return it. Here is a RecursiveTask example that perform the addition of 0 to 1000:

RecursiveTaskExample.java

                            package com.tutorial.javabasic;

                            import java.util.concurrent.*;

                            class AddBigArray extends RecursiveTask {

                                final int threshold = 100;
                                int[] data;
                                int start, end;

                                AddBigArray(int[] arr, int startIndex, int endIndex) {
                                    data = arr;
                                    start = startIndex;
                                    end = endIndex;
                                }

                                // This is the method in which parallel computation will occur.
                                protected Integer compute() {
                                    int sum = 0;
                                    // If number of elements is below the threshold,
                                    // then process sequentially.
                                    if ((end - start) < threshold) {
                                        // Add each number sequentially
                                        for (int i = start; i < end; i++) {
                                            sum += data[i];
                                        }
                                    } else {
                                        // Else, continue to break the data into smaller pieces.
                                        // Find the midpoint.
                                        int middle = (start + end) / 2;
                                        // Invoke new tasks, using the subdivided data.
                                        AddBigArray subTask1 = new AddBigArray(data, start, middle);
                                        AddBigArray subTask2 = new AddBigArray(data, middle, end);
                                        // Start each subtask by forking.
                                        subTask1.fork();
                                        subTask2.fork();
                                        // Wait for the subtasks to return, and aggregate the results.
                                        sum = subTask1.join() + subTask2.join();
                                    }

                                    return sum;
                                }
                            }

                            class RecursiveTaskExample {

                                public static void main(String args[]) {
                                    // Create a fork/join pool.
                                    ForkJoinPool forkJoinPool = new ForkJoinPool();
                                    int[] arr = new int[1000];
                                    for (int i = 0; i < arr.length; i++) {
                                        arr[i] = i;
                                    }
                                    AddBigArray task = new AddBigArray(arr, 0, arr.length);
                                    // Start the main ForkJoinTask.
                                    int sum = forkJoinPool.invoke(task);
                                    System.out.println("Total sum of array elements = " + sum);
                                }
                            }
                        

Output:

                            Total sum of array elements = 499500
                          

RecursiveAction

A RecursiveAction task does not return any value. It break up its work into smaller subtasks which can be executed by independent threads or CPUs. A RecursiveAction does some work and then exits. Here is a RecursiveAction example that add all numbers inside an array:

RecursiveActionExample.java

                            package com.tutorial.javabasic;

                            import java.util.concurrent.ForkJoinPool;
                            import java.util.concurrent.RecursiveAction;

                            class AddBigArray extends RecursiveAction {

                                private final int threshold = 1000;
                                private int[] data;
                                private int start, end;
                                static int result;

                                AddBigArray(int[] arr, int startIndex, int endIndex) {
                                    data = arr;
                                    start = startIndex;
                                    end = endIndex;
                                }

                                // This is the method in which parallel computation will occur.
                                protected void compute() {
                                    // If number of elements is below the threshold,
                                    // then process sequentially.
                                    if ((end - start) < threshold) {
                                        // Add each number
                                        for (int i = start; i < end; i++) {
                                            result += data[i];
                                        }
                                    } else {
                                        // Else, continue to break the data into smaller pieces.
                                        // Find the midpoint.
                                        int middle = (start + end) / 2;
                                        // Invoke new tasks, using the subtask.
                                        invokeAll(new AddBigArray(data, start, middle),
                                                new AddBigArray(data, middle, end));
                                    }
                                }
                            }

                            class RecursiveActionExample {

                                public static void main(String args[]) {
                                    // Create a fork/join pool.
                                    ForkJoinPool forkJoinPool = new ForkJoinPool();
                                    int[] arr = new int[10000];
                                    for (int i = 0; i < arr.length; i++) {
                                        arr[i] = i;
                                    }
                                    AddBigArray task = new AddBigArray(arr, 0, arr.length);

                                    // Start the main ForkJoinTask.
                                    forkJoinPool.invoke(task);
                                    System.out.println("Total sum of array elements = " + AddBigArray.result);
                                }
                            }
                        

Output:

                            Total sum of array elements = 26268117
                          

  • AddBigArray is a class that extends RecursiveAction.
  • threshold is the value that determines when sequential processing will take place.
  • Main computation is happening inside compute method. Whenever array size is less than threshold, all element is added inside the loop sequentially. If array size is greater than or equal to threshold it is further divided into subtask by calling invokeAll() method. Each subtask processes half the elements and waits until both tasks return.