Monday, October 21, 2013



Hadoop Tutorial

This document describes the most important user-facing facets of the Apache Hadoop MapReduce framework and serves as a tutorial. Apache Hadoop MapReduce consists of client APIs for writing applications and a runtime on which to run the applications. There are two versions of the API: old and new, and two versions of the runtime: MRv1 and MRv2. This tutorial describes the old API and MRv1.

Prerequisites

Ensure that CDH is installed, configured, and running. The easiest way to get going quickly is to use a CDH4 QuickStart VM.

Overview

Hadoop MapReduce is a software framework for writing applications that process vast amounts of data (multi-petabyte datasets) in parallel on large clusters consisting of thousands of nodes of commodity hardware in a reliable, fault-tolerant manner.
A MapReduce job usually splits the input dataset into independent chunks which are processed by the map tasks in a parallel manner. The framework sorts the outputs of the maps, which are then input to the reduce tasks. Typically both the input and the output of the job are stored in a distributed filesystem.
Typically the compute nodes and the storage nodes are the same, that is, the MapReduce framework and the Hadoop Distributed File System (HDFS) are running on the same set of nodes. This configuration allows the framework to effectively schedule tasks on the nodes where data is already present, resulting in very high aggregate bandwidth across the cluster.
The MapReduce runtime framework consists of a single JobTracker and one TaskTracker per cluster node. The JobTracker is responsible for scheduling the jobs' component tasks on the TaskTracker nodes, monitoring the tasks, and re-executing failed tasks. The TaskTracker nodes execute the tasks as directed by the JobTracker.
Minimally, applications specify the input and output locations and supply map and reduce functions via implementations of appropriate interfaces and/or abstract classes. These locations and functions and other job parameters comprise the job configuration. The Hadoop job client then submits the job (executable, and so on) and configuration to the JobTracker which then distributes the software and configuration to the TaskTracker nodes, scheduling tasks and monitoring them, and providing status and diagnostic information to the job client.
The Hadoop framework is implemented in Java, and you can develop MapReduce applications in Java or any JVM-based language or use one of the following interfaces:
  • Hadoop Streaming - a utility that allows you to create and run jobs with any executables (for example, shell utilities) as the mapper and/or the reducer.
  • Hadoop Pipes - a SWIG-compatible (not based on JNI) C++ API to implement MapReduce applications.
This tutorial describes applications written using the older org.apache.hadoop.mapred Java API instead of the new org.apache.hadoop.mapreduce Java API.

Example: WordCount v1.0

Before we jump into the details, let's walk through an example MapReduce application, WordCount, to get a flavor for how MapReduce works. WordCount is a simple application that counts the number of occurrences of each word in an input set:

Source Code

package org.myorg;

import java.io.IOException;
import java.util.*;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;

public class WordCount {

  public static class Map extends MapReduceBase implements Mapper {
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    public void map(LongWritable key, Text value, OutputCollector output, Reporter reporter) throws IOException {
      String line = value.toString();
      StringTokenizer tokenizer = new StringTokenizer(line);
      while (tokenizer.hasMoreTokens()) {
        word.set(tokenizer.nextToken());
        output.collect(word, one);
      }
    }
  }

  public static class Reduce extends MapReduceBase implements Reducer {
    public void reduce(Text key, Iterator values, OutputCollector output, Reporter reporter) throws IOException {
      int sum = 0;
      while (values.hasNext()) {
        sum += values.next().get();
      }
      output.collect(key, new IntWritable(sum));
    }
  }

  public static void main(String[] args) throws Exception {
    JobConf conf = new JobConf(WordCount.class);
    conf.setJobName("wordcount");

    conf.setOutputKeyClass(Text.class);
    conf.setOutputValueClass(IntWritable.class);

    conf.setMapperClass(Map.class);
    conf.setCombinerClass(Reduce.class);
    conf.setReducerClass(Reduce.class);

    conf.setInputFormat(TextInputFormat.class);
    conf.setOutputFormat(TextOutputFormat.class);

    FileInputFormat.setInputPaths(conf, new Path(args[0]));
    FileOutputFormat.setOutputPath(conf, new Path(args[1]));

    JobClient.runJob(conf);
  }
}

Usage

  1. Create the input directory /user/cloudera/wordcount/input in HDFS:
    $ sudo su hdfs
    $ hadoop fs -mkdir /user/cloudera 
    $ hadoop fs -chown cloudera /user/cloudera
    $ exit
    $ sudo su cloudera
    $ hadoop fs -mkdir /user/cloudera/wordcount /user/cloudera/wordcount/input
  2. Create sample text files as input and move to the input directory:
    $ echo "Hello World Bye World" > file0
    $ echo "Hello Hadoop Goodbye Hadoop" > file1
    $ hadoop fs -put file* /user/cloudera/wordcount/input
  3. Compile WordCount.java:
    $ mkdir wordcount_classes
    $ javac -cp  -d wordcount_classes WordCount.java
    where  is:
    • CDH4 - /usr/lib/hadoop/*:/usr/lib/hadoop/client-0.20/*
    • CDH3 - /usr/lib/hadoop-0.20/hadoop-0.20.2-cdh3u6-core.jar
  4. Create a JAR:
    $ jar -cvf wordcount.jar -C wordcount_classes/ . 
  5. Run the application:
    $ hadoop jar wordcount.jar org.myorg.WordCount /user/cloudera/wordcount/input /user/cloudera/wordcount/output
  6. Examine the output:
    $ hadoop fs -cat /user/cloudera/wordcount/output/part-00000
    Bye 1
    Goodbye 1
    Hadoop 2
    Hello 2
    World 2
  7. Remove the output directory /user/cloudera/wordcount/output so that you can run the sample again:
    $ hadoop fs -rm -r /user/cloudera/wordcount/output
Hadoop provides several command-line options for passing information to applications:
  • The -files option allows applications to specify a comma-separated list of paths that would be present in the current working directory of the task.
  • The -libjars option allows applications to add JARs to the classpaths of the maps and reduces.
  • The -archives option allows applications to pass archives as arguments that are unzipped/unjarred and a link with name of the zip/JAR are created in the current working directory of tasks.
More details about the command-line options are available at Hadoop Command Guide.

Walk-through

The WordCount application is quite straightforward.
The Mapper implementation, via the map method , processes one line at a time, as provided by the specified TextInputFormat. It then splits the line into tokens separated by whitespace, via the StringTokenizer, and emits a key-value pair of <word, 1>.
For the given sample input the first map emits:    
The second map emits:    
We'll learn more about the number of maps spawned for a given job, and how to control them in a fine-grained manner, a bit later in the tutorial.
WordCount also specifies a combiner. Hence, the output of each map is passed through the local combiner (which is same as the Reducer as per the job configuration) for local aggregation, after being sorted on the keys.
The output of the first map:   
The output of the second map:   
The Reducer implementation, via the reduce method just sums up the values, which are the occurrence counts for each key (that is, words in this example).
Thus the output of the job is:     
The run method specifies various facets of the job, such as the input/output paths (passed via the command line), key-value types, input/output formats etc., in the JobConf. It then calls the JobClient.runJob to submit the and monitor its progress.
We'll learn more about JobConfJobClientTool, and other interfaces and classes a bit later in the tutorial.

1 comment:

Alfred Avina said...

Big data platform managed service should understand the need of Data, and they should work to build more appropriate services to meet the requirements of their clients.