Skip to main content

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index] [List Home]
[aspectj-users] Use AspectJ with Hadoop MapReduce

Hi,

I am trying to pointcut calls of map and reduce from Hadoop MapReduce.

When you create a program to work with the application MapReduce, you need to define map and reduce functions. The functions are called by the framework MapReduce. Here is an example of a MapReduce program programmed by the user [1].

I am using Spring AOP to intercept the call to the map , reduce, and cleanup functions, but it is not working. Maybe I must use AspectJ. The problem is that I don’t know how to do it with AspectJ. Any help to try make this work?

[1] Wordcount program


package org.apache.hadoop.mapred.examples;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.mapred.lib.HashPartitioner;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.ReduceContext;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.util.GenericOptionsParser;

import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;

/**
 * My example of a common wordcount. Compare with the official WordCount.class to understand the differences between both classes.
 */
public class MyWordCount {
    public static class MyMap extends Mapper {
        private final static IntWritable _one_ = new IntWritable(1);
        private Text word = new Text();
        private MedusaDigests parser = new MedusaDigests();

        public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
            StringTokenizer itr = new StringTokenizer(value.toString());
            while (itr.hasMoreTokens()) {
                word.set(itr.nextToken());
                output.collect(word, one);
            }
        }

        public void run(Context context) throws IOException, InterruptedException {
            setup(context);
            try {
                while (context.nextKeyValue()) {
                    System.out.println("Key: " + context.getCurrentKey() + " Value: " + context.getCurrentValue());
                    map(context.getCurrentKey(), context.getCurrentValue(), context);
                }
            } finally {
                cleanup(context);
            }
        }

        public void cleanup(Mapper.Context context) {}
    }

    public static class MyReducer extends Reducer {
        private IntWritable result = new IntWritable();
        MedusaDigests parser = new MedusaDigests();

        public void reduce(Text key, Iterable<IntWritable> values, Context context
        ) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable val : values) {
                System.out.println(" - key ( " + key.getClass().toString() + "): " + key.toString()
                        + " value ( " + val.getClass().toString() + " ): " + val.toString());
                sum += val.get();
            }
            result.set(sum);
            context.write(key, result);
        }

        public void run(Context context) throws IOException, InterruptedException {
            setup(context);
            try {
                while (context.nextKey()) {
                    System.out.println("Key: " + context.getCurrentKey());
                    reduce(context.getCurrentKey(), context.getValues(), context);
                    // If a back up store is used, reset it
                    Iterator<IntWritable> iter = context.getValues().iterator();
                    if(iter instanceof ReduceContext.ValueIterator) {
                        ((ReduceContext.ValueIterator<IntWritable>)iter).resetBackupStore();
                    }
                }
            } finally {
                cleanup(context);
            }
        }

        public void cleanup(Reducer.Context context) {}
    }

    public static void main(String[] args) throws Exception {
        GenericOptionsParser parser = new GenericOptionsParser(new Configuration(), args);

        String[] otherArgs = parser.getRemainingArgs();
        if (otherArgs.length < 2) {
            System.err.println("Usage: wordcount [<in>...] <out>");
            System.exit(2);
        }

        // first map tasks
        JobConf conf = new JobConf(MyWordCount.class);
        conf.setJobName("wordcount");
        conf.setClass("mapreduce.job.map.identity.class", MyFullyIndentityMapper.class, Mapper.class);

        conf.setJarByClass(MyWordCount.class);
        conf.setPartitionerClass(HashPartitioner.class);
        conf.setOutputKeyClass(Text.class);
        conf.setOutputValueClass(IntWritable.class);
        conf.setNumReduceTasks(1);

        Path[] inputPaths = new Path[otherArgs.length-1];
        for (int i = 0; i < otherArgs.length - 1; ++i) { inputPaths[i] = new Path(otherArgs[i]); }
        Path outputPath =  new Path(otherArgs[otherArgs.length - 1]);
        FileInputFormat.setInputPaths(conf, inputPaths);
        FileOutputFormat.setOutputPath(conf, outputPath);

        // launch the job directly
        Medusa.runJob(conf);
        System.exit(0);
    }
}

Thanks,


Back to the top