Wednesday, October 30, 2013

A walkthrough for the fork/join framework introduced in Java SE 7

Java SE 7 brought some neat features on the table for Java developers, one of these features is the fork/join framework or basically the new parallel programming framework we can use to easily to implement our divide and conquer solutions. The point is that a solution to a problem should be devised with the following characteristics to use the fork/join framework effectively:
  • The problem domain whether it is a file, list, etc to be processed or a computation should be dividable to smaller subtasks.
  • Processing chunks should be possible without requiring the result of other chunks.
To summarize it, solving or processing the problem domain should require no self-feedback to make it possible to use the framework. For example if you want to process a list and processing each element in the list require the result of processing previous element then it is impossible to use any parallel computing for doing that job. If you want to apply some FFT over a sound stream which require feedback for processing each pulse from the previous pulses it is not possible to speedup the processing using the fork/join framework, etc.
Well, before we start learning the fork/join framework we better know what it is and what it is not: What fork/join framework is:
  • A parallel programming framework for Java
  • Part of Java SE 7
  • Suitable for implementing parallel processing solutions, mostly data intensive with small or no shared resources between the workers who process the data chunks.
  • Suitable when no synchronization is required between the workers
What fork/join framework is not:
  • It is not a magic that turns your code to run fast on machines with multiple processors, you need to think and implement your solutions in a parallel manner.
  • It is not hard and obscure like other frameworks, MPI for example. Using the framework is way easier than anything I used before.
If you want to learn the mechanics behind the fork/join framework you can read the original article written by Doug Le which explains the motive and the design. The article is available at http://gee.cs.oswego.edu/dl/papers/fj.pdf. If you want to see how we can use the framework then continue on reading this article.
First let’s see what are the important classes that one need to know in order to implement a divide and conquer solution using fork/join framework and then we will start using those classes.
  • The ForkJoinPool: This is the workers pool where you can post your ForkJoinTaskto be executed. The default parallelism level is the number of processors available to the runtime.
  • The RecursiveTask: This is a task, subclass of the ForkJoinTask which can return some value of type V. For example processing a list of DTOs and returning the result of process.
  • The RecursiveAction: Another subclass of the ForkJoinTask without any return value, for example processing an array…
I looked at this new API mainly for data pipelining in which I need to process a pretty huge list of object and turn it to another format to keep the processing result of one library consumable for the next one in the data flow and I am happy with the result pretty easy and straight forward.
Following is an small sample showing how to process a list of Row objects and convert them a list of Entity Objects. In my case it was something similar with processing Row objects and turning them to OData OEntity objects.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
 
/**
 *
 * @author Masoud Kalali
 */
class RowConverter extends RecursiveTask> {
 
    //if more than 5000 we will use the parallel processing
    static final int SINGLE_TREAD_TOP = 5000;
    int begin;
    int end;
    List rows;
 
    public RowConverter(int begin, int end, List rows) {
        this.begin = begin;
        this.end = end;
        this.rows = rows;
    }
 
    @Override
    protected List compute() {
 
        if (end - begin <= SINGLE_TREAD_TOP) {
            //actual processing happens here
            List preparedEntities = new ArrayList(end - begin);
            System.out.println("  beging: " + begin + " end: " + end);
            for (int i = begin; i < end; ++i) {
                preparedEntities.add(convertRow(rows.get(i)));
            }
            return preparedEntities;
        } else {
            //here we do the dividing the work and combining the results
            // specifies the number of chunks you want to break the data to
            int divider = 5000;
            // one can calculate the divider based on the list size and the number of processor available
            // using the http://download.oracle.com/javase/7/docs/api/java/lang/Runtime.html#availableProcessors()
            // decrease the divider number and examine the changes.
 
            RowConverter curLeft = new RowConverter(begin, divider, rows);
            RowConverter curRight = new RowConverter(divider, end, rows);
            curLeft.fork();
            List leftReslt = curRight.compute();
            List rightRes = curLeft.join();
            leftReslt.addAll(rightRes);
            return leftReslt;
        }
    }
 
    //dummy converted method converting one DTO to another
    private Entity convertRow(Row row) {
 
        return new Entity(row.getId());
    }
}
 
// the driver class which own the pool
public class Fjf {
 
    public static void main(String[] args) {
 
        List rawData = initDummyList(10000);
        ForkJoinPool pool = new ForkJoinPool();
        System.out.println("number of worker threads: " + pool.getParallelism());
 
 
        List res = pool.invoke(new RowConverter(0, rawData.size(), rawData));
 
        // add a breakpoint here and examine the pool object.
        //check how the stealCount, which shows number of subtasks taken on by available workers,
        //changes when you use an smaller divider and thus produce more tasks
        System.out.println("processed list: " + res.size());
 
    }
 
    /**
     * creates a dummy list of rows
     *
     * @param size number of rows int he list
     * @return the list of @see Row objects
     */
    private static List initDummyList(int size) {
 
        List rows = new ArrayList(size);
 
        for (int i = 0; i < size; i++) {
            rows.add(new Row(i));
        }
        return rows;
    }
}
 
//dummy classes which should be converted from one form to another
class Row {
 
    int id;
 
    public Row(int id) {
        this.id = id;
    }
 
    public int getId() {
        return id;
    }
}
 
class Entity {
 
    int id;
 
    public Entity(int id) {
        this.id = id;
    }
 
    public int getId() {
        return id;
    }
}
Just copy and paste the code into your IDE and try running and examining it to get deeper understanding of how the framework can be used. post any comment and possible questions that you may have here and I will try to help you own with them.

One Response to A walkthrough for the fork/join framework introduced in 

No comments: