Wednesday 20 August 2014

How to work with Fork/Join? Discuss with Example

Fork / Join as the name suggests is designed for work that can be broken into smaller pieces recursively. It is a new addition to the JDK 1.7 to support parallelism. It is an implementation of the ExecutorService interface that helps you take advantage of multiple processors.The goal is to use all the available processing power to enhance the performance of your application.

The Fork/Join framework is designed to make divide-and-conquer algorithms easy to parallelize. That type of algorithms is perfect for problems that can be divided into two or more sub-problems of the same type. They use recursion to break down the problem to simple tasks until these become simple enough to be solved directly. The solutions to the sub-problems are then combined to give a solution to the original problem.


The center of the fork/join framework is the ForkJoinPool class, an extension of the AbstractExecutorService class. ForkJoinPool implements the core work-stealing algorithm and can execute ForkJoinTask processes.It is similar to the MapReduce approach used to paralyze tasks. Difference is that Fork/Join tasks will subdivide themselves into smaller tasks only if necessary (if too large), whereas MapReduce algorithms divide up all the work into portions as the first step of their execution.


Basic Algorithm:


?
1
2
3
4
5
6
7
8
9
if(the job is small enough)
{
   compute directly
}
else
{
   split the work in two pieces (fork)
   invoke the pieces and join the results (join)
}

A ForkJoinTask is an abstract base class for tasks that run within a ForkJoinPool. A ForkJoinTask is a thread-like entity that is much lighter weight than a normal thread. Huge numbers of tasks and subtasks may be hosted by a small number of actual threads in a ForkJoinPool, at the price of some usage limitations.

There are two specialized subclasses of the ForkJoinTask :

1. RecursiveAction : It is to be used when you don’t need the task to return a result, for example, when the task works on positions of an array, it doesn’t return anything because it worked on the array. The method you should implement in order to do the job iscompute():void, notice the void return.

2. RecursiveTask : It is to be used when your tasks return a result. For example, when computing addition of elements in an array, each task must return the number it computed in order to join them and obtain the general solution. The method you should implement in order to do the job is compute():V, where V is the type of return; for example in calculating the sum of integer elements in an array, V may be java.lang.Integer.

A Useful Example

The key for non-dumb examples, which is hinted at nicely by the name RecursiveTask, is that your computemethod can create other RecursiveTask objects and have the pool run them in parallel. First you create another object. Then you call its fork method. That actually starts parallel computation -- fork itself returns quickly, but more computation is now going on. When you need the answer, you call the join method on the object you called fork on. The join method will get you the answer from compute() that was figured out byfork. If it is not ready yet, then join will block (i.e., not return) until it is ready. So the point is to call fork"early" and call join "late", doing other useful work in-between.
Those are the "rules" of how forkjoin, and compute work, but in practice a lot of the parallel algorithms you write in this framework have a very similar form, best seen with an example. What this example does is sum all the elements of an array, using parallelism to potentially process different 5000-element segments in parallel. (The types long / Long are just like int / Integer except they are 64 bits instead of 32. They can be a good choice if your data can be large -- a sum could easily exceed 232, but exceeding 264 is less likely.)
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;

class Globals {
    static ForkJoinPool fjPool = new ForkJoinPool();
}

class Sum extends RecursiveTask<Long> {
    static final int SEQUENTIAL_THRESHOLD = 5000;

    int low;
    int high;
    int[] array;

    Sum(int[] arr, int lo, int hi) {
        array = arr;
        low   = lo;
        high  = hi;
    }

    protected Long compute() {
        if(high - low <= SEQUENTIAL_THRESHOLD) {
            long sum = 0;
            for(int i=low; i < high; ++i) 
                sum += array[i];
            return sum;
         } else {
            int mid = low + (high - low) / 2;
            Sum left  = new Sum(array, low, mid);
            Sum right = new Sum(array, mid, high);
            left.fork();
            long rightAns = right.compute();
            long leftAns  = left.join();
            return leftAns + rightAns;
         }
     }

     static long sumArray(int[] array) {
         return Globals.fjPool.invoke(new Sum(array,0,array.length));
     }
}
How does this code work? A Sum object is given an array and a range of that array. The compute method sums the elements in that range. If the range has fewer than SEQUENTIAL_THRESHOLD elements, it uses a simple for-loop like you learned in introductory programming. Otherwise, it creates two Sum objects for problems of half the size. It uses fork to compute the left half in parallel with computing the right half, which this object does itself by calling right.compute(). To get the answer for the left, it calls left.join().
Why do we have a SEQUENTIAL_THRESHOLD? It would be correct instead to keep recurring until high==low+1and then return array[low]. But this creates a lot more Sum objects and calls to fork, so it will end up being much less efficient despite the same asymptotic complexity.
Why do we create more Sum objects than we are likely to have procesors? Because it is the framework's job to make a reasonable number of parallel tasks execute efficiently and to schedule them in a good way. By having lots of fairly small parallel tasks it can do a better job, especially if the number of processors available to your program changes during execution (e.g., because the operating system is also running other programs) or the tasks end up taking different amounts of time.
So setting SEQUENTIAL_THRESHOLD to a good-in-practice value is a trade-off. The documentation for the ForkJoin framework suggests creating parallel subtasks until the number of basic computation steps is somewhere over 100 and less than 10,000. The exact number is not crucial provided you avoid extremes.

Gotchas

There are a few "gotchas" when using the library that you might need to be aware of:
  1. It might seem more natural to call fork twice for the two subproblems and then call join twice. This is naturally a little less efficient than just calling compute for no benefit since you are creating more parallel tasks than is helpful. But it turns out to be a lot less efficient, for reasons that are specific to the current implementation of the library and related to the overhead of creating tasks that do very little work themselves.
  2. Remember that calling join blocks until the answer is ready. So if you look at the code:
        left.fork();
        long rightAns = right.compute();
        long leftAns  = left.join();
        return leftAns + rightAns;
    
    you'll see that the order is crucial. If we had written:
        left.fork();
        long leftAns  = left.join();
        long rightAns = right.compute();
        return leftAns + rightAns;
    
    our entire array-summing algorithm would have no parallelism since each step would completely compute the left before starting to compute the right. Similarly, this version is non-parallel because it computes the right before starting to compute the left:
        long rightAns = right.compute();
        left.fork();
        long leftAns  = left.join();
        return leftAns + rightAns;
    
  3. You should not use the invoke method of a ForkJoinPool from within a RecursiveTask orRecursiveAction. Instead you should always call compute or fork directly even if the object is a different subclass of RecursiveTask or RecursiveAction. You may be conceptually doing a "different" parallel computation, but it is still part of the same parallel task. Only sequential code should call invoke to begin parallelism.
  4. When debugging an uncaught exception, it is common to examine the "stack trace" in the debugger: the methods on the call stack when the exception occurred. With a fork-join computation, this is not as simple since the call to compute occurs in a different thread than the conceptual caller (the code that called fork). The library and debugger try to give helpful information including stack information for the thread running compute and the thread that called fork, but it can be hard to read and it includes a number of calls related to the library implementation that you should ignore. You may find it easier to debug by catching the exception inside the call to compute and just printing that stack trace.
  5. In terms of performance, there are many reasons a fork-join computation might run slower than you expect, even slower than a sequential version of the algorithm. See the next section.

No comments:

Post a Comment