Wednesday, July 4, 2012


Java development 2.0: Big data analysis with Hadoop MapReduce

How mountains of data become gold mines of information
Andrew Glover, Author and developer, Beacon50
Summary:  Apache Hadoop is currently the premier tool used for analyzing distributed data, and like most Java™ 2.0 technologies, it's built to scale. Get started with Hadoop's MapReduce programming model and learn how to use it to analyze data for both big and small business information needs.
Date:  18 Jan 2011
Level:  Intermediate
PDF:  A4 and Letter (59KB | 13 pages)Get Adobe® Reader®
Also available in:   Chinese  Korean  Russian  Japanese

Activity:  39216 views
Comments:   2 (View | Add comment - Sign in)
Average rating 5 stars based on 59 votes Average rating (59 votes)
Rate this article
http://www2007.org/papers/paper570.pdf

When Google launched its image search feature in 2001, it had 250 million indexed images. Less than a decade later, the search giant has indexed over 10 billion images. Thirty-five hours of content are uploaded to YouTube every minute. Twitter is said to handle, on average, 55 million tweets per day. Earlier this year, its search feature was logging 600 million queries daily.That is what we mean when we talk about big data.

About this series

The Java development landscape has changed radically since Java technology first emerged. Thanks to mature open source frameworks and reliable for-rent deployment infrastructures, it's now possible to assemble, test, run, and maintain Java applications quickly and inexpensively. In this series, Andrew Glover explores the spectrum of technologies and tools that make this new Java development paradigm possible.
Data on such a massive scale was once limited to large corporations, universities, and governments — entities capable of buying hugely expensive supercomputers and the staff to keep them running. Today, with the lowering of storage costs and the commoditization of processing power, smaller companies, and some individuals, have begun storing and mining that same data, fostering a wave of application innovation.
One of the enabling technologies of the big data revolution is MapReduce, a programming model and implementation developed by Google for processing massive-scale, distributed data sets. In this article, I introduce Apache's open source MapReduce implementation, Hadoop, which some have called the killer app of cloud computing.
Apache's Hadoop framework is essentially a mechanism for analyzing huge datasets, which do not necessarily need to be housed in a datastore. Hadoop abstracts MapReduce's massive data-analysis engine, making it more accessible to developers. Hadoop scales out to myriad nodes and can handle all of the activity and coordination related to data sorting.
Hadoop's plethora of features and configurations make it an amazingly useful and powerful framework. Yahoo! and countless other organizations have found it an efficient mechanism for analyzing mountains of bits and bytes. Hadoop is also fairly easy to get working on a single node; all you need is some data to analyze and familiarity with Java code, including generics. Hadoop also works with Ruby, Python, and C++.

More about MapReduce

If you're a reader of this series, then you've already seen MapReduce in action a couple of times. In "REST up with CouchDB and Groovy's RESTClient" I demonstrated how CouchDB leverages MapReduce for views, then I used it again in "MongoDB: A NoSQL datastore with (all the right) RDBMS moves," where it was the mechanism for processing MongoDB documents.
As a conceptual framework for processing huge data sets, MapReduce is highly optimized for distributed problem-solving using a large number of computers. The framework consists of two functions, as its name implies. The map function is designed to take a large data input and divide it into smaller pieces, which it then hands off to other processes that can do something with it. Thereduce function digests the individual answers collected by map and renders them to a final output.
In Hadoop, you define map and reduce implementations by extending Hadoop's own base classes. The implementations are tied together by a configuration that specifies them, along with input and output formats. Hadoop is well-suited for processing huge files containing structured data. One particularly handy aspect of Hadoop is that it handles the raw parsing of an input file, so that you can deal with one line at a time. Defining a map function is thus really just a matter of determining what you want to grab from an incoming line of text.
The United States government produces a ton of data, much of it of interest to average citizens. Various government agencies freely distribute data related to US economic health and changing social demographics. The U.S. Geological Survey (USGS) publishes international earthquake data.
Multiple small earthquakes happen every day in regions around the world. The majority of them occur deep within the earth's crust, so no one feels them, but listening stations record them nonetheless. The USGS publishes its earthquake data in the form of a weekly CSV (or comma-separated values) file.
An average weekly file isn't terribly big — only about 100KB or so. Still, it will serve as basis for learning about Hadoop. Keep it in mind, though, that Hadoop is capable of handling much larger data sets.
The CSV file I recently downloaded from the USGS website consists of about 920 lines, as shown in Listing 1:

Listing 1. A line count of a USGS earthquake data file
$> wc -l eqs7day-M1.txt 
  920 eqs7day-M1.txt

The CVS file's contents looks something like what you see in Listing 2 (the first two lines, that is):

Listing 2. The first two lines of the CVS file
$> head -n 2 eqs7day-M1.txt 
Src,Eqid,Version,Datetime,Lat,Lon,Magnitude,Depth,NST,Region
ci,14896484,2,"Sunday, December 12, 2010 23:23:20 UTC",33.3040,-116.4130,1.0,11.70,22,
  "Southern California"

That's what I would call an information rich file, especially when you consider that it totals 920 lines. I only want to know how many earthquakes occurred on each day of the week reported by this file, however. Then I want to know which general area had the most earthquakes over those seven days.
My first thought is that I could use simple grep commands to search for the number of quakes per day. Looking at the file, I see that its data starts on December 12. So I do a grep -c of that string, with the results shown in Listing 3:

Listing 3. How many earthquakes on December 12?
$> grep -c 'December 12' eqs7day-M1.txt 
98

Install Hadoop

If you haven't previously installed Hadoop, do so now. First, download the latest binary, unzip it, and then set Hadoop's bin directory on your path. Doing this enables you to execute the hadoop command directly. Using Hadoop requires you to execute its hadoopcommand rather than invoking the java command as you'll see. You can pass options to the hadoopcommand, such as where it can find your Java binary files (representing your map and reduceimplementations, for instance). In my case, I create a jar file and tell Hadoop which jobs I'd like run inside my jar. I also add any additional binaries required to run my application to Hadoop's classpath.
Now I know that on December 12 there were 98 entries, or 98 recorded earthquakes. I could just go down the line and do a grepfor December 11, 10, and so on. But that sounds tedious to me. Worse, in order to pull it off, I'd need to know which days were in the file. I don't really care about that, and in some cases I might not have access to that information. Really I just want to know the numbers for each given day in any seven-day span, and I can get that information easily with Hadoop.
Hadoop needs just a few pieces of information to answer my first and second questions: namely, which inputs to process and how to handle map and reduce. I'll also have to provide a job that ties everything together. But before I start working on that code, I'm going to take a few minutes to make sure everything is in order with my CSV data.
Aside from the first line of the earthquake CSV file, which is the header, each line is a series of data values separated by commas. I'm primarily interested in three pieces of data: the date, location, and magnitude of each earthquake. To obtain these datums, I'm going to use a nifty open-source library called opencsv, which helps with parsing CSV files.
Being a test-first kind of guy, I'll start by writing a quick JUnit test to verify that I can obtain the information I want from a sample line obtained from the CSV file, shown in Listing 4:

Listing 4. Parsing a CSV line
public class CSVProcessingTest {

 private final String LINE = "ci,14897012,2,\"Monday, December 13, 2010 " +
            "14:10:32 UTC\",33.0290,-115." +
            "5388,1.9,15.70,41,\"Southern California\"";

 @Test
 public void testReadingOneLine() throws Exception {
  String[] lines = new CSVParser().parseLine(LINE);

  assertEquals("should be Monday, December 13, 2010 14:10:32 UTC",
    "Monday, December 13, 2010 14:10:32 UTC", lines[3]);

  assertEquals("should be Southern California",
    "Southern California", lines[9]);

  assertEquals("should be 1.9", "1.9", lines[6]);
 }
}

As you can see in Listing 4opencsv makes working with comma-separated values pretty easy. The parser simply returns an array of Strings, so it's possible to obtain positional values (just recall that array and collection access in the Java language is zero-based).
When working with MapReduce, the map function's job is to pick some value to work off of, along with some key. That is, mapprimarily works with and returns two elements: a key and a value. Going back to my earlier requirements, I want to first find out how many earthquakes occur each day. Accordingly, when I analyze the earthquake file, I'm going to emit two values: my key will be the date, and the value will be a counter. My reduce function will then sum up the counters (which are just integers with the value of 1), thus providing me with the number of times a date occurs in a target earthquake file.
Because I'm interested in a 24-hour period, I'll have to strip out the time aspect of the date in each file. In Listing 5, I write a quick test that validates how I'll convert the specific date format in an incoming file into a more generic 24-hour-period date:

Listing 5. Date format conversions
@Test
public void testParsingDate() throws Exception {
 String datest = "Monday, December 13, 2010 14:10:32 UTC";
 SimpleDateFormat formatter = new SimpleDateFormat("EEEEE, MMMMM dd, yyyy HH:mm:ss Z");
 Date dt = formatter.parse(datest);

 formatter.applyPattern("dd-MM-yyyy");
 String dtstr = formatter.format(dt);
 assertEquals("should be 13-12-2010", "13-12-2010", dtstr);
}

In Listing 5, I've used the SimpleDateFormat Java object to format a date String in the CSV file format of Monday, December 13, 2010 14:10:32 UTC into the more generic 13-12-2010.
Now that I've worked out how I'll deal with the CSV file and its date format, I'm ready to begin implementing my map and reducefunctions in Hadoop. This process requires understanding Java generics, because Hadoop prefers explicit type safety.
When defining a map implementation with Hadoop, I simply extend Hadoop's Mapper class. I can then use generics to specify the explicit type for both the outgoing key and the value. The type clause also delineates the incoming key and value, which in the case of reading a file are the byte count and the line of text, respectively.
My EarthQuakesPerDateMapper class extends Hadoop's Mapper object. It explicitly delineates its output key as a Text object and its value as an IntWritable, which is a Hadoop-specific class that is essentially an integer. Also note that the first two types in the class clause are LongWritable and Text, which are the byte count and line of text, respectively.
Because of the type clause in the class definition, my parameter types coming into the map method are set along with the outputs of this method inside the context.write clause. If I try to specify something else, I'll either get a compiler issue or Hadoop will error-out with a message describing a type mismatch.

Listing 6. A mapping implementation
public class EarthQuakesPerDateMapper extends Mapper {
 @Override
 protected void map(LongWritable key, Text value, Context context) throws IOException,
   InterruptedException {

  if (key.get() > 0) {
   try {
     CSVParser parser = new CSVParser();
     String[] lines = parser.parseLine(value.toString());

     SimpleDateFormat formatter = 
       new SimpleDateFormat("EEEEE, MMMMM dd, yyyy HH:mm:ss Z");
     Date dt = formatter.parse(lines[3]);
     formatter.applyPattern("dd-MM-yyyy");

     String dtstr = formatter.format(dt);
     context.write(new Text(dtstr), new IntWritable(1));
   } catch (ParseException e) {}
  }
 }
}

My map implementation in Listing 6 is simple: Hadoop basically invokes this class for each line of text it finds in an input file. To avoid trying to handle the header of the CSV, I first check to see if the byte count (the key object) is not zero. Then, I do what you've already seen in Listings 4 and 5: I grab the incoming date, convert it, and then set it as the outgoing key. I also provide a count: 1. That is, I've coded a counter for each date and when a reduce implementation is invoked, it'll get a key and a collection of values. In this case, the keys will be dates and their values, as shown in Listing 7:

Listing 7. A logical view of a map output and reduce inputs 
"13-12-2010":[1,1,1,1,1,1,1,1]
"14-12-2010":[1,1,1,1,1,1]
"15-12-2010":[1,1,1,1,1,1,1,1,1]

Note that the line context.write(new Text(dtstr), new IntWritable(1)) (in Listing 6) built the logical collection shown in Listing 7. As you probably have figured out, context is a Hadoop data structure that holds various pieces of information. Thiscontext is passed along to a reduce implementation, which will take those 1 values and sum them. Consequently, a reduceimplementation logically creates data structures like the one in Listing 8:

Listing 8. A view of a reduce output 
"13-12-2010":8
"14-12-2010":6
"15-12-2010":9

My reduce implementation is shown in Listing 9. As with Hadoop's Mapper, the Reducer is parameterized: the first two parameters are the incoming key type (Text) and value type (IntWritable) and the latter two parameters are the output types: the key and value, which in this case are the same.

Listing 9. A reduce implementation
public class EarthQuakesPerDateReducer extends Reducer {
 @Override
 protected void reduce(Text key, Iterable values, Context context)
  throws IOException, InterruptedException {
  int count = 0;
  for (IntWritable value : values) {
   count++;
  }
  context.write(key, new IntWritable(count));
 }
}

My reduce implementation is brutally simple. As I pointed out in Listing 7, the incoming value is really a collection of values, which in this case means a collection of 1 values. All I do is sum them, then write out a new key-value pair representing the date and the count. My reduce code then basically spits out the lines you saw in Listing 8. The logical flow looks something like this:
"13-12-2010":[1,1,1,1,1,1,1,1] -> "13-12-2010":8

The abstract form of this listing is, of course, map -> reduce.
Now that I've coded my map and reduce implementations, all that's left to do is link everything up into a Hadoop Job. Defining aJob is simple: you provide inputs and outputs, map and reduce implementations (as in Listing 6 and Listing 9), and output types. My output types in this case are the same ones used for my reduce implementation.

Listing 10. A Job ties map and reduce together
public class EarthQuakesPerDayJob {

 public static void main(String[] args) throws Throwable {

  Job job = new Job();
  job.setJarByClass(EarthQuakesPerDayJob.class);
  FileInputFormat.addInputPath(job, new Path(args[0]));
  FileOutputFormat.setOutputPath(job, new Path(args[1]));

  job.setMapperClass(EarthQuakesPerDateMapper.class);
  job.setReducerClass(EarthQuakesPerDateReducer.class);
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(IntWritable.class);

  System.exit(job.waitForCompletion(true) ? 0 : 1);
 }
}

In Listing 10, I've tied everything together with a main method that takes two parameters: the directory where the earthquakes CSV file is, and the one where the resulting report should be written (Hadoop prefers to create this directory).
To execute this little framework, I'll need to jar these classes up. I'll also need to tell Hadoop where it can find the opencsvbinary. I can then execute Hadoop via the command line, as shown in Listing 11:

Listing 11. Executing Hadoop
$> export HADOOP_CLASSPATH=lib/opencsv-2.2.jar
$> hadoop jar target/quake.jar com.b50.hadoop.quake.EarthQuakesPerDayJob
   ~/temp/mreduce/in/ ~/temp/mreduce/out

Run this code and you'll see a bunch of text fly across the screen as Hadoop begins doing its work. Remember, the CSV file I'm using is but a puppy compared to the big dogs Hadoop was built to handle. Depending on your processing power, Hadoop should complete within a few seconds.
When it's done, you can view the contents of the output file with virtually any editor. Another option is to use the hadoop command directly, as I've done in Listing 12:

Listing 12. Reading Hadoop's output
$> hadoop dfs -cat part-r-00000 
05-12-2010      43
06-12-2010      143
07-12-2010      112
08-12-2010      136
09-12-2010      178
10-12-2010      114
11-12-2010      114
12-12-2010      79

If you're like me, the first thing you'll notice in Listing 12 is the sheer number of earthquakes per day — 178 on December 9 alone! Hopefully you'll also note that Hadoop did exactly what I wanted it to do: neatly tabulated the number of earthquake occurrences for every date in my range.
Next, I want to find out where earthquakes are occurring, and somehow quickly measure which location logs the most earthquakes in my date range. Well, as you've probably guessed, Hadoop makes that easy to do. The key in this case isn't the date, but the location. Thus, I write a new Mapper class.

Listing 13. A new map implementation
public class EarthQuakeLocationMapper extends Mapper {
 @Override
 protected void map(LongWritable key, Text value, Context context) throws IOException,
  InterruptedException {
  if (key.get() > 0) {
   String[] lines = new CSVParser().parseLine(value.toString());
   context.write(new Text(lines[9]), new IntWritable(1));
  }
 }
}

Rather than obtaining a date and converting it, all I did in Listing 13 was to grab the location, which was the last positional item in the CSV array.
Rather than a giant list of locations and their numbers, I'd like to limit my results to any location with 10 or more earthquakes in any seven-day period.

Listing 14. Where do more earthquakes happen?
public class EarthQuakeLocationReducer extends Reducer {
 @Override
 protected void reduce(Text key, Iterable values, Context context)
  throws IOException, InterruptedException {
  int count = 0;
  for (IntWritable value : values) {
   count++;
  }
  if (count >= 10) {
   context.write(key, new IntWritable(count));
  }
 }
}

The code in Listing 14 is quite similar to that of Listing 9; in this case, however, I've limited output to sums of 10 or more. Next, I can tie together my map and reduce with another Job implementation, jar things up, and execute Hadoop as normal to obtain my new answer.
Issuing the hadoop dfs command displays the new values I've requested:

Listing 15. Earthquakes by location
$> hadoop dfs -cat part-r-00000 
Andreanof Islands, Aleutian Islands, Alaska     24
Arkansas        40
Baja California, Mexico 101
Central Alaska  74
Central California      68
Greater Los Angeles area, California    16
Island of Hawaii, Hawaii        16
Kenai Peninsula, Alaska 11
Nevada  15
Northern California     114
San Francisco Bay area, California      21
Southern Alaska 97
Southern California     115
Utah    19
western Montana 11


What's the takeaway from Listing 15? Well, first, the west coast of North America from Mexico to Alaska is a shaky place. Second, Arkansas apparently sits near a fault line, which I didn't realize. Finally, if you live in Northern or Southern California (which many software developers do) then the ground around you shakes roughly every 13 minutes.
Analyzing data with Hadoop is easy and efficient, and I haven't even scratched the surface of what it has to offer for data analysis. Hadoop is really designed to run in a distributed manner where it handles the coordination of various nodes running map andreduce. For the sake of example, in this article I ran Hadoop in one JVM with a single, puny file.
Hadoop is a terrific tool to have by itself, and there's also an entire, growing ecosystem around it, from sub-projects to cloud-based Hadoop services. The Hadoop ecosystem demonstrates the rich community behind the project. The many tools that have sprung out of that community demonstrate the viability of big data analysis as a global business activity. With Hadoop, distributed data mining and analysis is available to all kinds of software innovators and entrepreneurs, including but not limited to big guns like Google and Yahoo!.

Learn
  • Java development 2.0: This dW series explores technologies that are redefining the Java development landscape; recent topics include MongoDB (September 2010); CouchDB (November 2009); and Objectify AppEngine (November 2010).
  • "Distributed data processing with Hadoop, Part 1: Getting started" (M. Tim Jones, developerWorks, May 2010): This article — the first in a series — explores the Hadoop framework, including the Hadoop file system (HDFS) and commonly used node types. Learn how to install and configure a single-node Hadoop cluster and delve into the MapReduce application. Finally, discover ways to monitor and manage Hadoop using its core Web interfaces. Also see Part 2 and Part 3.
  • "A profile of Apache Hadoop MapReduce computing efficiency, Part 1" (Paul Burkhardt, Cloudera Development Center, December 2010): A two-part exposition of how effectively MapReduce applications use computing resources. The first half is an overview of computing efficiency as it relates to evaluating Hadoop MapReduce applications.
  • "Hadoop companies everywhere" (Alex Handy, SD Times, July 2009): Companies are generating more data every day, but many of them aren't deriving business intelligence from it. That spells opportunity, says Handy.
  • Browse the Java technology bookstore for books on these and other technical topics.
  • developerWorks Java technology zone: Find hundreds of articles about every aspect of Java programming.
Get products and technologies
Discuss
  • Get involved in the developerWorks community. Connect with other developerWorks users while exploring the developer-driven blogs, forums, groups, and wikis.
Andrew Glover
Andrew Glover is a developer, author, speaker, and entrepreneur with a passion for behavior-driven development, Continuous Integration, and Agile software development. He is the founder of the easyb Behavior-Driven Development (BDD) framework and is the co-author of three books: Continuous IntegrationGroovy in Action, andJava Testing Patterns. You can keep up with him at his blog and by following him on Twitter.

No comments: