Java development 2.0: Big data analysis with Hadoop MapReduce
How mountains of data become gold mines of information
Summary: Apache Hadoop is currently the premier tool used for analyzing distributed data, and like most Java™ 2.0 technologies, it's built to scale. Get started with Hadoop's MapReduce programming model and learn how to use it to analyze data for both big and small business information needs.
Date: 18 Jan 2011
Level: Intermediate
PDF: A4 and Letter (59KB | 13 pages)Get Adobe® Reader®
Also available in: Chinese Korean Russian Japanese
Activity: 39216 views
Comments: 2 (View | Add comment - Sign in)
Level: Intermediate
PDF: A4 and Letter (59KB | 13 pages)Get Adobe® Reader®
Also available in: Chinese Korean Russian Japanese
Activity: 39216 views
Comments: 2 (View | Add comment - Sign in)
Average rating (59 votes)
Rate this article
Rate this article
http://www2007.org/papers/paper570.pdf
When Google launched its image search feature in 2001, it had 250 million indexed images. Less than a decade later, the search giant has indexed over 10 billion images. Thirty-five hours of content are uploaded to YouTube every minute. Twitter is said to handle, on average, 55 million tweets per day. Earlier this year, its search feature was logging 600 million queries daily.That is what we mean when we talk about big data.
When Google launched its image search feature in 2001, it had 250 million indexed images. Less than a decade later, the search giant has indexed over 10 billion images. Thirty-five hours of content are uploaded to YouTube every minute. Twitter is said to handle, on average, 55 million tweets per day. Earlier this year, its search feature was logging 600 million queries daily.That is what we mean when we talk about big data.
Data on such a massive scale was once limited to large corporations, universities, and governments — entities capable of buying hugely expensive supercomputers and the staff to keep them running. Today, with the lowering of storage costs and the commoditization of processing power, smaller companies, and some individuals, have begun storing and mining that same data, fostering a wave of application innovation.
One of the enabling technologies of the big data revolution is MapReduce, a programming model and implementation developed by Google for processing massive-scale, distributed data sets. In this article, I introduce Apache's open source MapReduce implementation, Hadoop, which some have called the killer app of cloud computing.
Apache's Hadoop framework is essentially a mechanism for analyzing huge datasets, which do not necessarily need to be housed in a datastore. Hadoop abstracts MapReduce's massive data-analysis engine, making it more accessible to developers. Hadoop scales out to myriad nodes and can handle all of the activity and coordination related to data sorting.
Hadoop's plethora of features and configurations make it an amazingly useful and powerful framework. Yahoo! and countless other organizations have found it an efficient mechanism for analyzing mountains of bits and bytes. Hadoop is also fairly easy to get working on a single node; all you need is some data to analyze and familiarity with Java code, including generics. Hadoop also works with Ruby, Python, and C++.
As a conceptual framework for processing huge data sets, MapReduce is highly optimized for distributed problem-solving using a large number of computers. The framework consists of two functions, as its name implies. The
map
function is designed to take a large data input and divide it into smaller pieces, which it then hands off to other processes that can do something with it. Thereduce
function digests the individual answers collected by map
and renders them to a final output.
In Hadoop, you define
map
and reduce
implementations by extending Hadoop's own base classes. The implementations are tied together by a configuration that specifies them, along with input and output formats. Hadoop is well-suited for processing huge files containing structured data. One particularly handy aspect of Hadoop is that it handles the raw parsing of an input file, so that you can deal with one line at a time. Defining a map
function is thus really just a matter of determining what you want to grab from an incoming line of text.
The United States government produces a ton of data, much of it of interest to average citizens. Various government agencies freely distribute data related to US economic health and changing social demographics. The U.S. Geological Survey (USGS) publishes international earthquake data.
Multiple small earthquakes happen every day in regions around the world. The majority of them occur deep within the earth's crust, so no one feels them, but listening stations record them nonetheless. The USGS publishes its earthquake data in the form of a weekly CSV (or comma-separated values) file.
An average weekly file isn't terribly big — only about 100KB or so. Still, it will serve as basis for learning about Hadoop. Keep it in mind, though, that Hadoop is capable of handling much larger data sets.
The CSV file I recently downloaded from the USGS website consists of about 920 lines, as shown in Listing 1:
Listing 1. A line count of a USGS earthquake data file
$> wc -l eqs7day-M1.txt 920 eqs7day-M1.txt |
The CVS file's contents looks something like what you see in Listing 2 (the first two lines, that is):
Listing 2. The first two lines of the CVS file
$> head -n 2 eqs7day-M1.txt Src,Eqid,Version,Datetime,Lat,Lon,Magnitude,Depth,NST,Region ci,14896484,2,"Sunday, December 12, 2010 23:23:20 UTC",33.3040,-116.4130,1.0,11.70,22, "Southern California" |
That's what I would call an information rich file, especially when you consider that it totals 920 lines. I only want to know how many earthquakes occurred on each day of the week reported by this file, however. Then I want to know which general area had the most earthquakes over those seven days.
My first thought is that I could use simple
grep
commands to search for the number of quakes per day. Looking at the file, I see that its data starts on December 12. So I do a grep -c
of that string, with the results shown in Listing 3:Listing 3. How many earthquakes on December 12?
$> grep -c 'December 12' eqs7day-M1.txt 98 |
Now I know that on December 12 there were 98 entries, or 98 recorded earthquakes. I could just go down the line and do a
grep
for December 11, 10, and so on. But that sounds tedious to me. Worse, in order to pull it off, I'd need to know which days were in the file. I don't really care about that, and in some cases I might not have access to that information. Really I just want to know the numbers for each given day in any seven-day span, and I can get that information easily with Hadoop.
Hadoop needs just a few pieces of information to answer my first and second questions: namely, which inputs to process and how to handle
map
and reduce
. I'll also have to provide a job that ties everything together. But before I start working on that code, I'm going to take a few minutes to make sure everything is in order with my CSV data.
Aside from the first line of the earthquake CSV file, which is the header, each line is a series of data values separated by commas. I'm primarily interested in three pieces of data: the date, location, and magnitude of each earthquake. To obtain these datums, I'm going to use a nifty open-source library called
opencsv
, which helps with parsing CSV files.
Being a test-first kind of guy, I'll start by writing a quick JUnit test to verify that I can obtain the information I want from a sample line obtained from the CSV file, shown in Listing 4:
Listing 4. Parsing a CSV line
public class CSVProcessingTest { private final String LINE = "ci,14897012,2,\"Monday, December 13, 2010 " + "14:10:32 UTC\",33.0290,-115." + "5388,1.9,15.70,41,\"Southern California\""; @Test public void testReadingOneLine() throws Exception { String[] lines = new CSVParser().parseLine(LINE); assertEquals("should be Monday, December 13, 2010 14:10:32 UTC", "Monday, December 13, 2010 14:10:32 UTC", lines[3]); assertEquals("should be Southern California", "Southern California", lines[9]); assertEquals("should be 1.9", "1.9", lines[6]); } } |
As you can see in Listing 4,
opencsv
makes working with comma-separated values pretty easy. The parser simply returns an array of String
s, so it's possible to obtain positional values (just recall that array and collection access in the Java language is zero-based).
When working with MapReduce, the
map
function's job is to pick some value to work off of, along with some key. That is, map
primarily works with and returns two elements: a key and a value. Going back to my earlier requirements, I want to first find out how many earthquakes occur each day. Accordingly, when I analyze the earthquake file, I'm going to emit two values: my key will be the date, and the value will be a counter. My reduce
function will then sum up the counters (which are just integers with the value of 1), thus providing me with the number of times a date occurs in a target earthquake file.
Because I'm interested in a 24-hour period, I'll have to strip out the time aspect of the date in each file. In Listing 5, I write a quick test that validates how I'll convert the specific date format in an incoming file into a more generic 24-hour-period date:
Listing 5. Date format conversions
@Test public void testParsingDate() throws Exception { String datest = "Monday, December 13, 2010 14:10:32 UTC"; SimpleDateFormat formatter = new SimpleDateFormat("EEEEE, MMMMM dd, yyyy HH:mm:ss Z"); Date dt = formatter.parse(datest); formatter.applyPattern("dd-MM-yyyy"); String dtstr = formatter.format(dt); assertEquals("should be 13-12-2010", "13-12-2010", dtstr); } |
In Listing 5, I've used the
SimpleDateFormat
Java object to format a date String
in the CSV file format of Monday, December 13, 2010 14:10:32 UTC into the more generic 13-12-2010.
Now that I've worked out how I'll deal with the CSV file and its date format, I'm ready to begin implementing my
map
and reduce
functions in Hadoop. This process requires understanding Java generics, because Hadoop prefers explicit type safety.
When defining a map implementation with Hadoop, I simply extend Hadoop's
Mapper
class. I can then use generics to specify the explicit type for both the outgoing key and the value. The type clause also delineates the incoming key and value, which in the case of reading a file are the byte count and the line of text, respectively.
My
EarthQuakesPerDateMapper
class extends Hadoop's Mapper
object. It explicitly delineates its output key as a Text
object and its value as an IntWritable
, which is a Hadoop-specific class that is essentially an integer. Also note that the first two types in the class clause are LongWritable
and Text
, which are the byte count and line of text, respectively.
Because of the type clause in the class definition, my parameter types coming into the
map
method are set along with the outputs of this method inside the context.write
clause. If I try to specify something else, I'll either get a compiler issue or Hadoop will error-out with a message describing a type mismatch.Listing 6. A mapping implementation
public class EarthQuakesPerDateMapper extends Mapper |
My
map
implementation in Listing 6 is simple: Hadoop basically invokes this class for each line of text it finds in an input file. To avoid trying to handle the header of the CSV, I first check to see if the byte count (the key
object) is not zero. Then, I do what you've already seen in Listings 4 and 5: I grab the incoming date, convert it, and then set it as the outgoing key. I also provide a count: 1. That is, I've coded a counter for each date and when a reduce
implementation is invoked, it'll get a key and a collection of values. In this case, the keys will be dates and their values, as shown in Listing 7:Listing 7. A logical view of a map output and reduce inputs
"13-12-2010":[1,1,1,1,1,1,1,1] "14-12-2010":[1,1,1,1,1,1] "15-12-2010":[1,1,1,1,1,1,1,1,1] |
Note that the line
context.write(new Text(dtstr), new IntWritable(1))
(in Listing 6) built the logical collection shown in Listing 7. As you probably have figured out, context
is a Hadoop data structure that holds various pieces of information. Thiscontext
is passed along to a reduce
implementation, which will take those 1 values and sum them. Consequently, a reduce
implementation logically creates data structures like the one in Listing 8:Listing 8. A view of a reduce output
"13-12-2010":8 "14-12-2010":6 "15-12-2010":9 |
My
reduce
implementation is shown in Listing 9. As with Hadoop's Mapper
, the Reducer
is parameterized: the first two parameters are the incoming key type (Text
) and value type (IntWritable
) and the latter two parameters are the output types: the key and value, which in this case are the same.Listing 9. A reduce implementation
public class EarthQuakesPerDateReducer extends Reducer |
My
reduce
implementation is brutally simple. As I pointed out in Listing 7, the incoming value is really a collection of values, which in this case means a collection of 1 values. All I do is sum them, then write out a new key-value pair representing the date and the count. My reduce
code then basically spits out the lines you saw in Listing 8. The logical flow looks something like this:"13-12-2010":[1,1,1,1,1,1,1,1] -> "13-12-2010":8 |
The abstract form of this listing is, of course,
map -> reduce
.
Now that I've coded my
map
and reduce
implementations, all that's left to do is link everything up into a Hadoop Job
. Defining aJob
is simple: you provide inputs and outputs, map
and reduce
implementations (as in Listing 6 and Listing 9), and output types. My output types in this case are the same ones used for my reduce
implementation.Listing 10. A Job ties map and reduce together
public class EarthQuakesPerDayJob { public static void main(String[] args) throws Throwable { Job job = new Job(); job.setJarByClass(EarthQuakesPerDayJob.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.setMapperClass(EarthQuakesPerDateMapper.class); job.setReducerClass(EarthQuakesPerDateReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); System.exit(job.waitForCompletion(true) ? 0 : 1); } } |
In Listing 10, I've tied everything together with a
main
method that takes two parameters: the directory where the earthquakes CSV file is, and the one where the resulting report should be written (Hadoop prefers to create this directory).
To execute this little framework, I'll need to jar these classes up. I'll also need to tell Hadoop where it can find the
opencsv
binary. I can then execute Hadoop via the command line, as shown in Listing 11:Listing 11. Executing Hadoop
$> export HADOOP_CLASSPATH=lib/opencsv-2.2.jar $> hadoop jar target/quake.jar com.b50.hadoop.quake.EarthQuakesPerDayJob ~/temp/mreduce/in/ ~/temp/mreduce/out |
Run this code and you'll see a bunch of text fly across the screen as Hadoop begins doing its work. Remember, the CSV file I'm using is but a puppy compared to the big dogs Hadoop was built to handle. Depending on your processing power, Hadoop should complete within a few seconds.
When it's done, you can view the contents of the output file with virtually any editor. Another option is to use the
hadoop
command directly, as I've done in Listing 12:Listing 12. Reading Hadoop's output
$> hadoop dfs -cat part-r-00000 05-12-2010 43 06-12-2010 143 07-12-2010 112 08-12-2010 136 09-12-2010 178 10-12-2010 114 11-12-2010 114 12-12-2010 79 |
If you're like me, the first thing you'll notice in Listing 12 is the sheer number of earthquakes per day — 178 on December 9 alone! Hopefully you'll also note that Hadoop did exactly what I wanted it to do: neatly tabulated the number of earthquake occurrences for every date in my range.
Next, I want to find out where earthquakes are occurring, and somehow quickly measure which location logs the most earthquakes in my date range. Well, as you've probably guessed, Hadoop makes that easy to do. The key in this case isn't the date, but the location. Thus, I write a new
Mapper
class.Listing 13. A new map implementation
public class EarthQuakeLocationMapper extends Mapper |
Rather than obtaining a date and converting it, all I did in Listing 13 was to grab the location, which was the last positional item in the CSV array.
Rather than a giant list of locations and their numbers, I'd like to limit my results to any location with 10 or more earthquakes in any seven-day period.
Listing 14. Where do more earthquakes happen?
public class EarthQuakeLocationReducer extends Reducer |
The code in Listing 14 is quite similar to that of Listing 9; in this case, however, I've limited output to sums of 10 or more. Next, I can tie together my
map
and reduce
with another Job
implementation, jar things up, and execute Hadoop as normal to obtain my new answer.
Issuing the
hadoop dfs
command displays the new values I've requested:Listing 15. Earthquakes by location
$> hadoop dfs -cat part-r-00000 Andreanof Islands, Aleutian Islands, Alaska 24 Arkansas 40 Baja California, Mexico 101 Central Alaska 74 Central California 68 Greater Los Angeles area, California 16 Island of Hawaii, Hawaii 16 Kenai Peninsula, Alaska 11 Nevada 15 Northern California 114 San Francisco Bay area, California 21 Southern Alaska 97 Southern California 115 Utah 19 western Montana 11 |
What's the takeaway from Listing 15? Well, first, the west coast of North America from Mexico to Alaska is a shaky place. Second, Arkansas apparently sits near a fault line, which I didn't realize. Finally, if you live in Northern or Southern California (which many software developers do) then the ground around you shakes roughly every 13 minutes.
Analyzing data with Hadoop is easy and efficient, and I haven't even scratched the surface of what it has to offer for data analysis. Hadoop is really designed to run in a distributed manner where it handles the coordination of various nodes running
map
andreduce
. For the sake of example, in this article I ran Hadoop in one JVM with a single, puny file.
Hadoop is a terrific tool to have by itself, and there's also an entire, growing ecosystem around it, from sub-projects to cloud-based Hadoop services. The Hadoop ecosystem demonstrates the rich community behind the project. The many tools that have sprung out of that community demonstrate the viability of big data analysis as a global business activity. With Hadoop, distributed data mining and analysis is available to all kinds of software innovators and entrepreneurs, including but not limited to big guns like Google and Yahoo!.
Learn
- Java development 2.0: This dW series explores technologies that are redefining the Java development landscape; recent topics include MongoDB (September 2010); CouchDB (November 2009); and Objectify AppEngine (November 2010).
- "Distributed data processing with Hadoop, Part 1: Getting started" (M. Tim Jones, developerWorks, May 2010): This article — the first in a series — explores the Hadoop framework, including the Hadoop file system (HDFS) and commonly used node types. Learn how to install and configure a single-node Hadoop cluster and delve into the MapReduce application. Finally, discover ways to monitor and manage Hadoop using its core Web interfaces. Also see Part 2 and Part 3.
- "A profile of Apache Hadoop MapReduce computing efficiency, Part 1" (Paul Burkhardt, Cloudera Development Center, December 2010): A two-part exposition of how effectively MapReduce applications use computing resources. The first half is an overview of computing efficiency as it relates to evaluating Hadoop MapReduce applications.
- "Hadoop companies everywhere" (Alex Handy, SD Times, July 2009): Companies are generating more data every day, but many of them aren't deriving business intelligence from it. That spells opportunity, says Handy.
- Browse the Java technology bookstore for books on these and other technical topics.
- developerWorks Java technology zone: Find hundreds of articles about every aspect of Java programming.
Get products and technologies
- Download Hadoop MapReduce: An Apache Software Foundation project.
- Get opencsv: Download it from SourceForge.
Discuss
- Get involved in the developerWorks community. Connect with other developerWorks users while exploring the developer-driven blogs, forums, groups, and wikis.
Andrew Glover is a developer, author, speaker, and entrepreneur with a passion for behavior-driven development, Continuous Integration, and Agile software development. He is the founder of the easyb Behavior-Driven Development (BDD) framework and is the co-author of three books: Continuous Integration, Groovy in Action, andJava Testing Patterns. You can keep up with him at his blog and by following him on Twitter.
No comments:
Post a Comment