[
Date Prev][
Date Next][
Thread Prev][
Thread Next][
Date Index][
Thread Index]
[
List Home]
Re: [aspectj-users] Use AspectJ with Hadoop MapReduce
|
I have found the solution. Strangely, this aspect works [1], but
these pointcuts [2] don't. If anyone could try to explain I would be
appreciated.
[1] Aspect that works
pointcut printWrite(): call(* *.write(..));
before(): printWrite() {
System.out.println("Busted2");
}
[2] Aspects that doesn't work
pointcut printWrite(): call(* write(..));
pointcut printWrite(): call(* Context.write(..));
On 09/25/2015 03:12 PM, xeonmailinglist
wrote:
In my example, I
have the following snippet [1]. I want to execute my aspect
before calling context.write(this.word, one)
from the example. Therefore, I have created this aspect [2]
based on your solution.
The context
variable is from this type public abstract class TaskInputOutputContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends TaskAttemptContext implements Progressable
,
and the superclass is from the type public class TaskAttemptContext extends JobContext implements Progressable
.
After I compile my
code, I can see that the call invocation was not placed in the
class
file [3].
Why the call
designator is not working?
[1] Snippet of the
wordcount example.
while(itr.hasMoreTokens()) {
this.word.set(itr.nextToken());
context.write(this.word, one);
}
[2] My aspect
before(): call(* *Context.write(..)) {
System.out.println("Aspects Write");
System.out.println(thisJoinPoint.getSourceLocation().getFileName());
}
[3] Compiled
version of the job. This is the class
file.
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
MapReduceCalls.aspectOf().ajc$before$org_apache_hadoop_mapred_aspects_MapReduceCalls$1$9e048e72();
StringTokenizer itr = new StringTokenizer(value.toString());
while(itr.hasMoreTokens()) {
this.word.set(itr.nextToken());
context.write(this.word, one);
}
}
Cheers,
On 09/23/2015 04:33 PM, Andy Clement wrote:
With pure AspectJ, if you want to intercept when
map/reduce/cleanup are running, you can use something like
this:
=== Foo.aj ===
aspect Foo {
 before(): execution(* map(..)) {
   System.out.println(thisJoinPoint);
 }
 before(): execution(* reduce(..)) {
   System.out.println(thisJoinPoint);
 }
 before(): execution(* cleanup(..)) {
   System.out.println(thisJoinPoint);
 }
}
===
ajc -1.8 Foo.aj MyWordCount.java
-showWeaveInfo
This will compile and weave MyWordCount -
when you run it you will see the advice output.
Alternatively if you want to advise the call
side of these things, you will need to weave the hadoop
jar (whatever that is):
=== Foo.aj ===
aspect Foo {
 before(): call(* map(..)) {
   System.out.println(thisJoinPoint);
 }
 before(): call(* reduce(..)) {
   System.out.println(thisJoinPoint);
 }
 before(): call(* cleanup(..)) {
 Â
 System.out.println(thisJoinPoint);
 }
}
===
ajc -1.8 Foo.aj -inpath hadoop.jar
-outjar woven_hadoop.jar -showWeaveInfo
Now when you run your MyWordCount program
you need to be using the woven_hadoop jar instead of the
regular hadoop jar.
cheers,
Andy
Hi,
I am trying to pointcut calls of map
and reduce
from Hadoop MapReduce.
When you create a program to work
with the application MapReduce, you need to
define map
and reduce
functions. The functions are called by the
framework MapReduce. Here is an example of a
MapReduce program programmed by the user [1].
I am using Spring AOP to intercept
the call to the map
, reduce
,
and cleanup
functions, but it is not working. Maybe I must
use AspectJ. The problem is that I donât know
how to do it with AspectJ. Any help to try
make this work?
[1] Wordcount program
package org.apache.hadoop.mapred.examples;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.mapred.lib.HashPartitioner;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.ReduceContext;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.util.GenericOptionsParser;
import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;
/**
* My example of a common wordcount. Compare with the official WordCount.class to understand the differences between both classes.
*/
public class MyWordCount {
public static class MyMap extends Mapper {
private final static IntWritable _one_ = new IntWritable(1);
private Text word = new Text();
private MedusaDigests parser = new MedusaDigests();
public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
output.collect(word, one);
}
}
public void run(Context context) throws IOException, InterruptedException {
setup(context);
try {
while (context.nextKeyValue()) {
System.out.println("Key: " + context.getCurrentKey() + " Value: " + context.getCurrentValue());
map(context.getCurrentKey(), context.getCurrentValue(), context);
}
} finally {
cleanup(context);
}
}
public void cleanup(Mapper.Context context) {}
}
public static class MyReducer extends Reducer {
private IntWritable result = new IntWritable();
MedusaDigests parser = new MedusaDigests();
public void reduce(Text key, Iterable<IntWritable> values, Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
System.out.println(" - key ( " + key.getClass().toString() + "): " + key.toString()
+ " value ( " + val.getClass().toString() + " ): " + val.toString());
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
public void run(Context context) throws IOException, InterruptedException {
setup(context);
try {
while (context.nextKey()) {
System.out.println("Key: " + context.getCurrentKey());
reduce(context.getCurrentKey(), context.getValues(), context);
// If a back up store is used, reset it
Iterator<IntWritable> iter = context.getValues().iterator();
if(iter instanceof ReduceContext.ValueIterator) {
((ReduceContext.ValueIterator<IntWritable>)iter).resetBackupStore();
}
}
} finally {
cleanup(context);
}
}
public void cleanup(Reducer.Context context) {}
}
public static void main(String[] args) throws Exception {
GenericOptionsParser parser = new GenericOptionsParser(new Configuration(), args);
String[] otherArgs = parser.getRemainingArgs();
if (otherArgs.length < 2) {
System.err.println("Usage: wordcount [<in>...] <out>");
System.exit(2);
}
// first map tasks
JobConf conf = new JobConf(MyWordCount.class);
conf.setJobName("wordcount");
conf.setClass("mapreduce.job.map.identity.class", MyFullyIndentityMapper.class, Mapper.class);
conf.setJarByClass(MyWordCount.class);
conf.setPartitionerClass(HashPartitioner.class);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setNumReduceTasks(1);
Path[] inputPaths = new Path[otherArgs.length-1];
for (int i = 0; i < otherArgs.length - 1; ++i) { inputPaths[i] = new Path(otherArgs[i]); }
Path outputPath = new Path(otherArgs[otherArgs.length - 1]);
FileInputFormat.setInputPaths(conf, inputPaths);
FileOutputFormat.setOutputPath(conf, outputPath);
// launch the job directly
Medusa.runJob(conf);
System.exit(0);
}
}
Thanks,
â
_______________________________________________
aspectj-users mailing list
aspectj-users@xxxxxxxxxxx
To change your delivery options, retrieve your
password, or unsubscribe from this list, visit
https://dev.eclipse.org/mailman/listinfo/aspectj-users
_______________________________________________
aspectj-users mailing list
aspectj-users@xxxxxxxxxxx
To change your delivery options, retrieve your password, or unsubscribe from this list, visit
https://dev.eclipse.org/mailman/listinfo/aspectj-users
â