Monday, October 21, 2013

How-To: Run a MapReduce Job in CDH4

http://blog.cloudera.com/blog/2012/12/how-to-run-a-mapreduce-job-in-cdh4/
This is the first post in series that will get you going on how to write, compile, and run a simple MapReduce job on Apache Hadoop. The full code, along with tests, is available at http://github.com/cloudera/mapreduce-tutorial. The program will run on either MR1 or MR2.
We’ll assume that you have a running Hadoop installation, either locally or on a cluster, and your environment is set up correctly so that typing “hadoop” into your command line gives you some notes on usage. Detailed instructions for installing CDH, Cloudera’s open-source, enterprise-ready distro of Hadoop and related projects, are available here:https://ccp.cloudera.com/display/CDH4DOC/CDH4+Installation. We’ll also assume you have Maven installed on your system, as this will make compiling your code easier. Note that Maven is not a strict dependency; we could also compile using Java on the command line or with an IDE like Eclipse.

The Use Case

There’s been a lot of brawling on our pirate ship recently. Not so rarely, one of the mates will punch another one in the mouth, knocking a tooth out onto the deck. Our poor sailors will wake up the next day with an empty bottle of rum, wondering who’s responsible for the gap between their teeth. All this violence has gotten out of hand, so as a deterrent, we’d like to provide everyone with a list of everyone that’s ever left them with a gap. Luckily, we’ve been able to set up a Flume source so that every time someone punches someone else, it gets written out as a line in a big log file in Hadoop. To turn this data into these lists, we need a MapReduce job that can 1) invert the mapping from attacker to their victim, 2) group by victims, and 3) eliminate duplicates.

The Input Data

A file or set of text files, in which each line represents a specific instance of punching, containing a pirate, a tab character as a delimiter, and then a victim. An example input file is located in the github repo athttps://github.com/cloudera/mapreduce-tutorial/blob/master/samples/gaplog.txt.

Writing Our GapDeduce Program

Our map function simply inverts the mapping of punchers to their targets.
public class Gapper extends MapReduceBase implements Mapper {
      public void map(Text attacker, Text victim, OutputCollector output, Reporter reporter) throws IOException {
        output.collect(victim, attacker);
      }
}
By using the victim as our map output/reduce input key, the shuffle groups by victim.
Our reduce function is a little more complicated but not by much. It uses a TreeSet to eliminate duplicates and order by name, and then outputs the list of pirates as a single string.
public class Deducer extends MapReduceBase implements Reducer {
  public void reduce(Text key, Iterator values,
      OutputCollector output, Reporter reporter) throws IOException {
    Set attackers = new TreeSet();
    while (values.hasNext()) {
      String valStr = values.next().toString();
      attackers.add(valStr);
    }
    output.collect(key, new Text(attackers.toString()));
  }
}
Finally, we write a driver that will run the job.
public static void main(String[] args) throws Exception {
      JobConf conf = new JobConf();
      conf.setJobName("gapdeduce");
     
      // This line specifies the jar Hadoop should use to run the mapper and
      // reducer by telling it a class that’s inside it
      conf.setJarByClass(GapDeduceRunner.class);

      conf.setMapOutputKeyClass(Text.class);
      conf.setMapOutputValueClass(Text.class);

      conf.setOutputKeyClass(Text.class);
      conf.setOutputValueClass(Text.class);

      conf.setMapperClass(Gapper.class);
      conf.setReducerClass(Deducer.class);

      // KeyValueTextInputFormat treats each line as an input record,
      // and splits the line by the tab character to separate it into key and value
      conf.setInputFormat(KeyValueTextInputFormat.class);
      conf.setOutputFormat(TextOutputFormat.class);

      FileInputFormat.setInputPaths(conf, new Path(args[0]));
      FileOutputFormat.setOutputPath(conf, new Path(args[1]));

      JobClient.runJob(conf);
    }

Compiling and Packaging

To run our program, we need to compile it and package it into a jar that can be sent to the machines in our cluster. To do this with Maven, we set up a project whose POM contains the hadoop-client dependency.

  org.apache.hadoop
  hadoop-client
  2.0.0-cdh4.1.0

Furthermore, we include the following repositories so Maven knows where to get the bits.

  maven-hadoop
  Hadoop Releases
  https://repository.cloudera.com/content/repositories/releases/

  
  cloudera-repos
  Cloudera Repos
  https://repository.cloudera.com/artifactory/cloudera-repos/

The full pom.xml is included in the github repository at https://github.com/cloudera/mapreduce-tutorial/blob/master/pom.xml.
Then, in the project’s root directory, we run  mvn install. This command both compiles our code and generates a jar, which should show up in the target/ directory under the root project directory.

Running Our MapReduce Program

The github repo contains a sample input file and expected output file. We can put the sample input file into a directory on hdfs called “gaps” using
hadoop fs -mkdir /gaps
hadoop fs -put samples/gaplog.txt /gaps
We submit our MapReduce job using “hadoop jar”, which is similar to “java jar”, but includes all the necessary environment for running our jar on Hadoop.
hadoop jar target/gapdeduce-1.0-SNAPSHOT.jar GapDeduceRunner /gaps/gaplog.txt /gaps/output
To inspect the output file of our job, we can run
hadoop fs -cat /gaps/output/part-00000
At last, we can provide our mates with a full accounting of their gaps. 
In the next post in this series, we’ll cover some advanced MapReduce features like the distributed cache, as well as testing MapReduce code with MRUnit.
Sandy Ryza is a Software Engineer at Cloudera, working on the Platform team.

No comments: