Wednesday, October 30, 2013




Java Tip 144: When to use ForkJoinPool vs ExecutorService - Web Crawlers Example

Know your options for multithreaded programming on the Java platform



The Fork/Join library introduced in Java 7 extends the existing Java concurrency package with support for hardware parallelism, a key feature of multicore systems. In this Java Tip Madalin Ilie demonstrates the performance impact of replacing the Java 6 ExecutorService class with Java 7's ForkJoinPool in a web crawler application.
Web crawlers, also known as web spiders, are key to the success of search engines. These programs perpetually scan the web, gathering up millions of pages of data and sending it back to search-engine databases. The data is then indexed and processed algorithmically, resulting in faster, more accurate search results. While they are most famously used for search optimization, web crawlers also can be used for automated tasks such as link validation or finding and returning specific data (such as email addresses) in a collection of web pages.
Architecturally, most web crawlers are high-performance multithreaded programs, albeit with relatively simple functionality and requirements. Building a web crawler is therefore an interesting way to practice, as well as compare, multithreaded, or concurrent, programming techniques.

The return of Java Tips!

Java Tips are short, code-driven articles that invite JavaWorld readers to share their programming skills and discoveries. Let us know if you have a tip to share with the JavaWorld community. Also check out the Java Tips Archive for more programming tips from your peers.
In this article I'll walk through two approaches to writing a web crawler: one using the Java 6 ExecutorService, and the other Java 7's ForkJoinPool. In order to follow the examples, you'll need to have (as of this writing)Java 7 update 2 installed in your development environment, as well as the third-party library HtmlParser.

Two approaches to Java concurrency

The ExecutorService class is part of the java.util.concurrent revolution introduced in Java 5 (and part of Java 6, of course), which simplified thread-handling on the Java platform. ExecutorService is anExecutor that provides methods to manage the progress-tracking and termination of asynchronous tasks. Prior to the introduction of java.util.concurrent, Java developers relied on third-party libraries or wrote their own classes to manage concurrency in their programs.
Fork/Join, introduced in Java 7, isn't intended to replace or compete with the existing concurrency utility classes; instead it updates and completes them. Fork/Join addresses the need for divide-and-conquer, or recursivetask-processing in Java programs (see Resources).
Fork/Join's logic is very simple: (1) separate (fork) each large task into smaller tasks; (2) process each task in a separate thread (separating those into even smaller tasks if necessary); (3) join the results.
The two web crawler implementations that follow are simple programs that demonstrate the features and functionality of the Java 6 ExecutorService and the Java 7 ForkJoinPool.

Building and benchmarking the web crawler

Our web crawler's task will be to find and follow links. Its purpose could be link validation, or it could be gathering data. (You might, for instance, instruct the program to search the web for pictures of Angelina Jolie, or Brad Pitt.)
The application architecture consists of the following:
  1. An interface that exposes basic operations to interact with links; i.e., get the number of visited links, add new links to be visited in queue, mark a link as visited
  2. An implementation for this interface that will also be the starting point of the application
  3. A thread/recursive action that will hold the business logic to check whether a link has already been visited. If not, it will gather all the links in the corresponding page, create a new thread/recursive task, and submit it to the ExecutorService or ForkJoinPool
  4. An ExecutorService or ForkJoinPool to handle waiting tasks
Note that a link is considered "visited" after all links in the corresponding page have been returned.
In addition to comparing ease of development using the concurrency tools available in Java 6 and Java 7, we'll compare application performance based on two benchmarks:
  • Search coverage: Measures the time required to visit 1,500 distinct links
  • Processing power: Measures the time in seconds required to visit 3,000 non-distinct links; this is like measuring how many kilobits per second your Internet connection processes.
While relatively simple, these benchmarks will provide at least a small window into the performance of Java concurrency in Java 6 versus Java 7 for certain application requirements.

A Java 6 web crawler built with ExecutorService

For the Java 6 web crawler implementation we'll use a fixed-thread pool of 64 threads, which we create by calling the Executors.newFixedThreadPool(int) factory method. Listing 1 shows the main class implementation.

Listing 1. Constructing a WebCrawler

package insidecoding.webcrawler;

import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import insidecoding.webcrawler.net.LinkFinder;
import java.util.HashSet;

/**
 *
 * @author Madalin Ilie
 */
public class WebCrawler6 implements LinkHandler {

    private final Collection visitedLinks = Collections.synchronizedSet(new HashSet());
//    private final Collection visitedLinks = Collections.synchronizedList(new ArrayList());    
    private String url;
    private ExecutorService execService;

    public WebCrawler6(String startingURL, int maxThreads) {
        this.url = startingURL;
        execService = Executors.newFixedThreadPool(maxThreads);
    }

    @Override
    public void queueLink(String link) throws Exception {
        startNewThread(link);
    }

    @Override
    public int size() {
        return visitedLinks.size();
    }

    @Override
    public void addVisited(String s) {
        visitedLinks.add(s);
    }

    @Override
    public boolean visited(String s) {
        return visitedLinks.contains(s);
    }

    private void startNewThread(String link) throws Exception {
        execService.execute(new LinkFinder(link, this));
    }

    private void startCrawling() throws Exception {
        startNewThread(this.url);
    }

    /**
     * @param args the command line arguments
     */
    public static void main(String[] args) throws Exception {
        new WebCrawler("http://www.javaworld.com", 64).startCrawling();
    }
}
In the above WebCrawler6 constructor, we create a fixed-size thread pool of 64 threads. We then start the program by calling the startCrawling method, which creates the first thread and submits it to theExecutorService.
Next, we create a LinkHandler interface, which exposes helper methods to interact with URLs. Requirements are as follows: (1) mark a URL as visited using the addVisited() method; (2) get the number of the visited URLs through the size() method; (3) determine whether a URL has been already visited using the visited() method; and (4) add a new URL in the queue through the queueLink() method.

Listing 2. The LinkHandler interface

package insidecoding.webcrawler;

/**
 *
 * @author Madalin Ilie
 */
public interface LinkHandler {

    /**
     * Places the link in the queue
     * @param link
     * @throws Exception
     */
    void queueLink(String link) throws Exception;

    /**
     * Returns the number of visited links
     * @return
     */
    int size();

    /**
     * Checks if the link was already visited
     * @param link
     * @return
     */
    boolean visited(String link);

    /**
     * Marks this link as visited
     * @param link
     */
    void addVisited(String link);
}
Now, as we crawl pages, we need to start up the rest of the threads, which we do via the LinkFinder interface, as shown in Listing 3. Note the linkHandler.queueLink(l) line.

Listing 3. LinkFinder

package insidecoding.webcrawler.net;

import java.net.URL;
import org.htmlparser.Parser;
import org.htmlparser.filters.NodeClassFilter;
import org.htmlparser.tags.LinkTag;
import org.htmlparser.util.NodeList;
import insidecoding.webcrawler.LinkHandler;

/**
 *
 * @author Madalin Ilie
 */
public class LinkFinder implements Runnable {

    private String url;
    private LinkHandler linkHandler;
    /**
     * Used fot statistics
     */
    private static final long t0 = System.nanoTime();

    public LinkFinder(String url, LinkHandler handler) {
        this.url = url;
        this.linkHandler = handler;
    }

    @Override
    public void run() {
        getSimpleLinks(url);
    }

    private void getSimpleLinks(String url) {
        //if not already visited
        if (!linkHandler.visited(url)) {
            try {
                URL uriLink = new URL(url);
                Parser parser = new Parser(uriLink.openConnection());
                NodeList list = parser.extractAllNodesThatMatch(new NodeClassFilter(LinkTag.class));
                List urls = new ArrayList();

                 for (int i = 0; i < list.size(); i++) {
                    LinkTag extracted = (LinkTag) list.elementAt(i);

                    if (!extracted.getLink().isEmpty()
                            && !linkHandler.visited(extracted.getLink())) {

                        urls.add(extracted.getLink());
                    }

                }
                //we visited this url
                linkHandler.addVisited(url);

                if (linkHandler.size() == 1500) {
                    System.out.println("Time to visit 1500 distinct links = " + (System.nanoTime() - t0));                   
                }

                for (String l : urls) {
                    linkHandler.queueLink(l);
                }

             } catch (Exception e) {
                //ignore all errors for now
            }
        }
    }
}
The logic of the LinkFinder is simple: (1) we start parsing a URL; (2) after we gather all the links within the corresponding page, we mark the page as visited; and (3) we send each found link to a queue by calling thequeueLink() method. This method will actually create a new thread and send it to the ExecutorService. If "free" threads are available in the pool, the thread will be executed; otherwise it will be placed in a waiting queue. After we reach 1,500 distinct links visited, we print the statistics and the program continues to run.

A Java 7 web crawler with ForkJoinPool

The Fork/Join framework introduced in Java 7 is actually an implementation of the Divide and Conquer algorithm (see Resources), in which a central ForkJoinPool executes branching ForkJoinTasks. For this example we'll use a ForkJoinPool "backed" by 64 threads. I say backed because ForkJoinTasks are lighter than threads. In Fork/Join, a large number of tasks can be hosted by a smaller number of threads.
Similar to the Java 6 implementation, we start by instantiating in the WebCrawler7 constructor a ForkJoinPool object backed by 64 threads.

Listing 4. Java 7 LinkHandler implementation

package insidecoding.webcrawler7;

import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.ForkJoinPool;
import insidecoding.webcrawler7.net.LinkFinderAction;
import java.util.HashSet;

/**
 *
 * @author Madalin Ilie
 */
public class WebCrawler7 implements LinkHandler {

    private final Collection visitedLinks = Collections.synchronizedSet(new HashSet());
//    private final Collection visitedLinks = Collections.synchronizedList(new ArrayList<>());
    private String url;
    private ForkJoinPool mainPool;

    public WebCrawler7(String startingURL, int maxThreads) {
        this.url = startingURL;
        mainPool = new ForkJoinPool(maxThreads);
    }

    private void startCrawling() {
        mainPool.invoke(new LinkFinderAction(this.url, this));
    }

    @Override
    public int size() {
        return visitedLinks.size();
    }

    @Override
    public void addVisited(String s) {
        visitedLinks.add(s);
    }

    @Override
    public boolean visited(String s) {
        return visitedLinks.contains(s);
    }

    /**
     * @param args the command line arguments
     */
    public static void main(String[] args) throws Exception {
        new WebCrawler7("http://www.javaworld.com", 64).startCrawling();
    }
}
Note that the LinkHandler interface in Listing 4 is almost the same as the Java 6 implementation from Listing 2. It's only missing the queueLink() method. The most important methods to look at are the constructor and the startCrawling() method. In the constructor, we create a new ForkJoinPool backed by 64 threads. (I've chosen 64 threads instead of 50 or some other round number because in the ForkJoinPool Javadoc it states that the number of threads must be a power of two.) The pool invokes a new LinkFinderAction, which will recursively invoke further ForkJoinTasks. Listing 5 shows the LinkFinderAction class:

Listing 5. LinkFinderAction

package insidecoding.webcrawler7.net;

import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.RecursiveAction;
import org.htmlparser.Parser;
import org.htmlparser.filters.NodeClassFilter;
import org.htmlparser.tags.LinkTag;
import org.htmlparser.util.NodeList;
import insidecoding.webcrawler7.LinkHandler;

/**
 *
 * @author Madalin Ilie
 */
public class LinkFinderAction extends RecursiveAction {

    private String url;
    private LinkHandler cr;
    /**
     * Used for statistics
     */
    private static final long t0 = System.nanoTime();

    public LinkFinderAction(String url, LinkHandler cr) {
        this.url = url;
        this.cr = cr;
    }

    @Override
    public void compute() {
        if (!cr.visited(url)) {
            try {
                List actions = new ArrayList();
                URL uriLink = new URL(url);
                Parser parser = new Parser(uriLink.openConnection());
                NodeList list = parser.extractAllNodesThatMatch(new NodeClassFilter(LinkTag.class));

                for (int i = 0; i < list.size(); i++) {
                    LinkTag extracted = (LinkTag) list.elementAt(i);

                    if (!extracted.extractLink().isEmpty()
                            && !cr.visited(extracted.extractLink())) {

                        actions.add(new LinkFinderAction(extracted.extractLink(), cr));
                    }
                }
                cr.addVisited(url);

                if (cr.size() == 1500) {
                    System.out.println("Time for visit 1500 distinct links= " + (System.nanoTime() - t0));                   
                }

                //invoke recursively
                invokeAll(actions);
            } catch (Exception e) {
                //ignore 404, unknown protocol or other server errors
            }
        }
    }
}
The application logic so far is the same as it was in the Java 6 implementation. The difference in the code is that instead of manually queuing the new links through the LinkHandler class, we submit them to theForkJoinPool through the invokeAll() static method. Note the invokeAll(actions) line. The ForkJoinPool will schedule these tasks in the best possible way using the available 64 threads. A recursive action is over when the submitted link has been visited (see if (!cr.visited(url))).

Comparative benchmarks for search coverage: 1,500 distinct links

Now it's time to compare benchmarks. I accounted for JVM warmup when timing the two different implementations: first I ran each program 10 times ignoring the results, then I ran it again 10 times again to compute an average timing. Between running the Java 6 and Java 7 code I also called System.gc() numerous times to manually activate the garbage collector. I invoked both applications using the JVM flags -d64 -Xmx1512m, thus setting the platform to 64 bits and the maximum heap size to 1512 MB (see Resources).
I ran the tests on a Windows 7 SP1 64-bit machine, Intel Core i5 @2.67 GHz with 4,00 GB of RAM. I have installed the 64-bit version of JDK 7 update 2.
The timing of the Java 6 code is as follows (an average of all 10 runs):
Time to visit 1,500 distinct links: 45,404,628,454 nanoseconds
Fastest time: 43,989,514,242 nanoseconds
Slowest time: 47,077,714,098 nanoseconds
And here's the timing for the Java 7 implementation:
Time to visit 1,500 distinct links: 45,269,306,013 nanoseconds
Fastest time: 42,365,714,625 nanoseconds
Slowest time: 59,042,391,887 nanoseconds
As you can see, when accounting for search coverage (tasked with following 1,500 distinct links) there's not much difference between the two implementations.

Comparative benchmarks for processing power: 3,000 non-distinct links

In order to to test the second scenario I had to make some adjustments to both implementations. In both the WebCrawler6 and WebCrawler7 classes, I uncommented the synchronized List and commented the synchronized Set. For a benchmark based on following non-distinct links the Set implementation isn't required, but the List is.
// private final Collection visitedLinks = Collections.synchronizedSet(new HashSet());

private final Collection visitedLinks = Collections.synchronizedList(new ArrayList());
I also changed the visited() method to always return false, because for this benchmark it doesn't matter whether a link has been visited or not.
 
 @Override
    public boolean visited(String s) {
        return false;//visitedLinks.contains(s);
    }
Finally, I changed the conditions in the LinkFinder and LinkFinderAction classes to check for 3,000 links instead of 1,500:
if (cr.size() == 3000) {
System.out.println("Time for visit 3000 non-distinct links= " + (System.nanoTime() - t0));
}
The resulting benchmarks show that Fork/Join fared better when measuring processing power -- i.e., how many links each application processed per second.
Here's the timing of the Java 6 code, an average of the results for all 10 runs:
Time to visit 3,000 non-distinct links: 48,510,285,967 nanoseconds
Fastest time: 44,189,380,355 nanoseconds 
Slowest time: 52,132,053,413 nanoseconds
This measurement is equivalent to 61.8425 links per second.
And here's the timing for the program written using Java 7:
Time to visit 3,000 non-distinct links: 31,343,446,584 nanoseconds
Fastest time: 30,533,600,312 nanoseconds 
Slowest time: 33,308,851,937 nanoseconds
This is equivalent to 95.7137 links per second.
The code based on Java 7's ForkJoinPool was 1.5x times faster than the Java 6 code -- a significant performance gain.
Figures 1 and 2 shows the CPU history for each implementation. Note that CPU usage is pretty much the same, even though the ForkJoinPool implementation is faster.

Figure 1. CPU usage for the Java 6 ExecutorService implementation


Figure 2. CPU usage for the Java 7 ForkJoinPool implementation


In conclusion: Fork/Join for recursive programming

While relatively simple, my benchmarks demonstrate that Fork/Join offers serious gains for solving problems that involve recursion. Because recursion is fundamental to parallel programming on multicore platforms (seeResources) Fork/Join is an essential addition to Java platform concurrency. That said, it does not replace the original java.util.concurrency package. As I've demonstrated, ExecutorService continues to be a fine solution for many concurrent programming tasks. In a programming scenario such as the one I set up, where effective recursion is key to processing power, Fork/Join is likely to be the most effective solution.

Learn more

This article has briefly introduced two approaches to Java concurrency and demonstrated each one's applicability to two common program requirements: data collection and search coverage. See the Resources section to learn more about java.util.concurrency and the uses of Fork/Join in Java 7.

About the author

Madalin Ilie is a software development lead at Endava Romania. He has more than five years' programming experience and has worked in a variety of project domains, from mobile development to heavy financial applications.
Read more about Core Java in JavaWorld's Core Java section.
All contents copyright 1995-2013 Java World, Inc. http://www.javaworld.com

No comments: