Hi,
I am trying to
pointcut calls of map
and reduce
from Hadoop MapReduce.
When you create a
program to work with the application MapReduce, you need to
define map
and reduce
functions. The functions are called by the framework MapReduce.
Here is an example of a MapReduce program programmed by the user
[1].
I am using Spring
AOP to intercept the call to the map
, reduce
,
and cleanup
functions, but it is not working. Maybe I must use AspectJ. The
problem is that I don’t know how to do it with AspectJ. Any help
to try make this work?
[1] Wordcount
program
package org.apache.hadoop.mapred.examples;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.mapred.lib.HashPartitioner;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.ReduceContext;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.util.GenericOptionsParser;
import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;
/**
* My example of a common wordcount. Compare with the official WordCount.class to understand the differences between both classes.
*/
public class MyWordCount {
public static class MyMap extends Mapper {
private final static IntWritable _one_ = new IntWritable(1);
private Text word = new Text();
private MedusaDigests parser = new MedusaDigests();
public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
output.collect(word, one);
}
}
public void run(Context context) throws IOException, InterruptedException {
setup(context);
try {
while (context.nextKeyValue()) {
System.out.println("Key: " + context.getCurrentKey() + " Value: " + context.getCurrentValue());
map(context.getCurrentKey(), context.getCurrentValue(), context);
}
} finally {
cleanup(context);
}
}
public void cleanup(Mapper.Context context) {}
}
public static class MyReducer extends Reducer {
private IntWritable result = new IntWritable();
MedusaDigests parser = new MedusaDigests();
public void reduce(Text key, Iterable<IntWritable> values, Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
System.out.println(" - key ( " + key.getClass().toString() + "): " + key.toString()
+ " value ( " + val.getClass().toString() + " ): " + val.toString());
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
public void run(Context context) throws IOException, InterruptedException {
setup(context);
try {
while (context.nextKey()) {
System.out.println("Key: " + context.getCurrentKey());
reduce(context.getCurrentKey(), context.getValues(), context);
// If a back up store is used, reset it
Iterator<IntWritable> iter = context.getValues().iterator();
if(iter instanceof ReduceContext.ValueIterator) {
((ReduceContext.ValueIterator<IntWritable>)iter).resetBackupStore();
}
}
} finally {
cleanup(context);
}
}
public void cleanup(Reducer.Context context) {}
}
public static void main(String[] args) throws Exception {
GenericOptionsParser parser = new GenericOptionsParser(new Configuration(), args);
String[] otherArgs = parser.getRemainingArgs();
if (otherArgs.length < 2) {
System.err.println("Usage: wordcount [<in>...] <out>");
System.exit(2);
}
// first map tasks
JobConf conf = new JobConf(MyWordCount.class);
conf.setJobName("wordcount");
conf.setClass("mapreduce.job.map.identity.class", MyFullyIndentityMapper.class, Mapper.class);
conf.setJarByClass(MyWordCount.class);
conf.setPartitionerClass(HashPartitioner.class);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setNumReduceTasks(1);
Path[] inputPaths = new Path[otherArgs.length-1];
for (int i = 0; i < otherArgs.length - 1; ++i) { inputPaths[i] = new Path(otherArgs[i]); }
Path outputPath = new Path(otherArgs[otherArgs.length - 1]);
FileInputFormat.setInputPaths(conf, inputPaths);
FileOutputFormat.setOutputPath(conf, outputPath);
// launch the job directly
Medusa.runJob(conf);
System.exit(0);
}
}
Thanks,