One MapReduce example to showcase what MapReduce is about

Hadoop is based on the MapReduce framework for big data development and computation. Everything seems to be well if the computing task is simple. However, issues appear for those a little bit more complex computations. The poor development efficiency will bring more and more serious impacts with the growing difficulty of problem. One of the commonest computations is the “associative computing”.

For example, in HDFS, there are 2 files holding the client data and the order data respectively, and the customerID is the associated field between them. How to perform the associated computation to add the client name to the order list?

The normal method is to input 2 source files first. Process each row of data in Map according to the file name. If the data is from Order, then mark the foreign key with ”O” to form the combined key; If the data is from Customer, then mark it with ”C”. After being processed with Map, the data is partitioned on keys, and then grouped and sorted on combined keys. Lastly, combine the result in the reduce and output. It is said that the below code is quite common:

public static class JMapper extends Mapper<LongWritable, Text, TextPair, Text> {

    //mark every row with “O” or “C” according to file name

    @Override

    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

             String pathName = ((FileSplit) context.getInputSplit()).getPath().toString();

             if (pathName.contains(“order.txt”)) {//identify order by file name

            String values[] = value.toString().split(“\t”);

            TextPair tp = new TextPair(new Text(values[1]), new Text(“O”));//mark with “O”

            context.write(tp, new Text(values[0] + “\t” + values[2]));

        }

     if (pathName.contains(“customer.txt”)) {//identify customer by file name

           String values[] = value.toString().split(“\t”);

           TextPair tp = new TextPair(new Text(values[0]), new Text(“C”));//mark with “C”

           context.write(tp, new Text(values[1]));

        }

    }

}

public static class JPartitioner extends Partitioner<TextPair, Text> {

    //partition by key, i.e. customerID

    @Override

    public int getPartition(TextPair key, Text value, int numParititon) {

        return Math.abs(key.getFirst().hashCode() * 127) % numParititon;

    }

}

public static class JComparator extends WritableComparator {

    //group by muti-key

    public JComparator() {

        super(TextPair.class, true);

    }

    @SuppressWarnings(“unchecked”)

    public int compare(WritableComparable a, WritableComparable b) {

        TextPair t1 = (TextPair) a;

        TextPair t2 = (TextPair) b;

        return t1.getFirst().compareTo(t2.getFirst());

    }

}

public static class JReduce extends Reducer<TextPair, Text, Text, Text> {

    //merge and output

    protected void reduce(TextPair key, Iterable<Text> values, Context context) throws IOException,InterruptedException {

             Text pid = key.getFirst();

             String desc = values.iterator().next().toString();

             while (values.iterator().hasNext()) {

                 context.write(pid, new Text(values.iterator().next().toString() + “\t” + desc));

     }

    }

}

public class TextPair implements WritableComparable<TextPair> {

    //make muti-key

    private Text first;

    private Text second;

    public TextPair() {

        set(new Text(), new Text());

    }

    public TextPair(String first, String second) {

        set(new Text(first), new Text(second));

    }

    public TextPair(Text first, Text second) {

        set(first, second);

    }

    public void set(Text first, Text second) {

       this.first = first;

       this.second = second;

    }

    public Text getFirst() {

       return first;

    }

    public Text getSecond() {

       return second;

    }

    public void write(DataOutput out) throws IOException {

       first.write(out);

       second.write(out);

    }

    public void readFields(DataInput in) throws IOException {

       first.readFields(in);

       second.readFields(in);

    }

    public int compareTo(TextPair tp) {

       int cmp = first.compareTo(tp.first);

       if (cmp != 0) {

         return cmp;

       }

             return second.compareTo(tp.second);

    }

}

public static void main(String agrs[]) throws IOException, InterruptedException, ClassNotFoundException {

    //job entrance

    Configuration conf = new Configuration();

    GenericOptionsParser parser = new GenericOptionsParser(conf, agrs);

    String[] otherArgs = parser.getRemainingArgs();

    if (agrs.length < 3) {

     System.err.println(“Usage:  J <in_path_one> <in_path_two> <output>”);

     System.exit(2);

    }

    Job job = new Job(conf, “J”);

    job.setJarByClass(J.class);//Join class

    job.setMapperClass(JMapper.class);//Map class

    job.setMapOutputKeyClass(TextPair.class);//Map output key class

    job.setMapOutputValueClass(Text.class);//Map output value class

    job.setPartitionerClass(JPartitioner.class);//partition class

    job.setGroupingComparatorClass(JComparator.class);//condition group class after partition

    job.setReducerClass(Example_Join_01_Reduce.class);//reduce class

    job.setOutputKeyClass(Text.class);//reduce output key class

    job.setOutputValueClass(Text.class);//reduce ouput value class

    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));//one of source files

    FileInputFormat.addInputPath(job, new Path(otherArgs[1]));//another file

    FileOutputFormat.setOutputPath(job, new Path(otherArgs[2]));//output path

    System.exit(job.waitForCompletion(true) ? 0 : 1);//run until job ends

}

esPorc can work out another solution to the same question:

Main Program (in Main Node):

image001

Sub-Program (in sub-node)

image003

More information at: raqsoft

Advertisements

About datathinker

a technical consultant on Database performance optimization, Database storage expansion, Off-database computation. personal blog at: datakeywrod, website: raqsoft
This entry was posted in Big Data and tagged , , , . Bookmark the permalink.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s