[1] Decompiled code from javap
```
public void map(org.apache.hadoop.io.LongWritable,
org.apache.hadoop.io.Text,
org.apache.hadoop.mapred.OutputCollector<org.apache.hadoop.io.Text,
org.apache.hadoop.io.IntWritable>,
org.apache.hadoop.mapred.Reporter) throws java.io.IOException;
descriptor:
(Lorg/apache/hadoop/io/LongWritable;Lorg/apache/hadoop/io/Text;Lorg/apache/hadoop/mapred/OutputCollector;Lorg/apache/hadoop/mapred/Reporter;)V
flags: ACC_PUBLIC
Code:
stack=3, locals=6, args_size=5
0: invokestatic #143 // Method
org/apache/hadoop/mapred/aspects/MapReduceCalls.aspectOf:()Lorg/apache/hadoop/mapred/aspects/MapReduceCalls;
3: invokevirtual #146 // Method
org/apache/hadoop/mapred/aspects/MapReduceCalls.ajc$before$org_apache_hadoop_mapred_aspects_MapReduceCalls$1$9e048e72:()V
6: new #39 // class
java/util/StringTokenizer
9: dup
10: aload_2
11: invokevirtual #41 // Method
org/apache/hadoop/io/Text.toString:()Ljava/lang/String;
14: invokespecial #45 // Method
java/util/StringTokenizer."<init>":(Ljava/lang/String;)V
17: astore 5
19: goto 47
22: aload_0
23: getfield #27 // Field
word:Lorg/apache/hadoop/io/Text;
26: aload 5
28: invokevirtual #48 // Method
java/util/StringTokenizer.nextToken:()Ljava/lang/String;
31: invokevirtual #51 // Method
org/apache/hadoop/io/Text.set:(Ljava/lang/String;)V
34: aload_3
35: aload_0
36: getfield #27 // Field
word:Lorg/apache/hadoop/io/Text;
39: getstatic #18 // Field
one:Lorg/apache/hadoop/io/IntWritable;
42: invokeinterface #54, 3 // InterfaceMethod
org/apache/hadoop/mapred/OutputCollector.collect:(Ljava/lang/Object;Ljava/lang/Object;)V
47: aload 5
49: invokevirtual #60 // Method
java/util/StringTokenizer.hasMoreTokens:()Z
52: ifne 22
55: return
LocalVariableTable:
Start Length Slot Name Signature
0 56 0 this
Lorg/apache/hadoop/mapred/examples/MyWordCount$MyMap;
0 56 1 key
Lorg/apache/hadoop/io/LongWritable;
0 56 2 value Lorg/apache/hadoop/io/Text;
0 56 3 output
Lorg/apache/hadoop/mapred/OutputCollector;
0 56 4 reporter
Lorg/apache/hadoop/mapred/Reporter;
19 37 5 itr Ljava/util/StringTokenizer;
LineNumberTable:
line 26: 0
line 27: 19
line 28: 22
line 29: 34
line 27: 47
line 31: 55
StackMapTable: number_of_entries = 2
frame_type = 252 /* append */
offset_delta = 22
locals = [ class java/util/StringTokenizer ]
frame_type = 24 /* same */
Exceptions:
throws java.io.IOException
Signature: #37 //
(Lorg/apache/hadoop/io/LongWritable;Lorg/apache/hadoop/io/Text;Lorg/apache/hadoop/mapred/OutputCollector<Lorg/apache/hadoop/io/Text;Lorg/apache/hadoop/io/IntWritable;>;Lorg/apache/hadoop/mapred/Reporter;)V
Error: unknown attribute
org.aspectj.weaver.MethodDeclarationLineNumber: length
= 0x8
00 00 00 19 00 00 03 68
```
[2] Decompiled code that intellij generated.
```
public void map(LongWritable key, Text value,
OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {
MapReduceCalls.aspectOf().ajc$before$org_apache_hadoop_mapred_aspects_MapReduceCalls$1$9e048e72();
StringTokenizer itr = new StringTokenizer(value.toString());
while(itr.hasMoreTokens()) {
this.word.set(itr.nextToken());
output.collect(this.word, one);
}
}
```
[3] unknown error
```
Error: unknown attribute
org.aspectj.weaver.MethodDeclarationLineNumber: length = 0x8
```
On 09/23/2015 09:36 PM, Andy Clement
wrote:
I don’t speak maven very well unfortunately but as the weave
messages are coming out it sounds like you aren’t running the
woven version of the code (or at least the unwoven version is
perhaps ahead of the woven version on your class path).
You can easily javap -private -verbose on the class
files you are running to verify they contain calls to ajc$
methods (which represent the advice).
cheers,
Andy
Thanks for
your help. This is a great example where I can start
with. Although, it seems that the aspect is not running.
I have weaved the aspect file with maven [1] and
everything seems ok according the compilation output
[2].
But when I run the example, nothing happens. Do you have
any idea of what it might be?
[1] Maven configuration
```
<plugin>
<!-- compile with maven -->
<groupId>org.codehaus.mojo</groupId>
<artifactId>aspectj-maven-plugin</artifactId>
<version>${maven.aspectj.version}</version>
<configuration>
<complianceLevel>1.7</complianceLevel>
<source>1.7</source>
<target>1.7</target>
<Xlint>error</Xlint>
<verbose>true</verbose>
<forceAjcCompile>true</forceAjcCompile>
<showWeaveInfo>true</showWeaveInfo>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
```
[2] Compilation output
```
[INFO] ---
aspectj-maven-plugin:1.7:compile (default) @ medusa-java
---
[INFO] Showing AJC message detail for
messages of types: [error, warning, fail]
[INFO] Join point 'method-execution(void
org.apache.hadoop.mapred.MedusaDigests.cleanup(java.lang.Object))'
in Type 'org.apache.hadoop.mapred.MedusaDigests'
(MedusaDigests.java:100) advised by before advice from
'org.apache.hadoop.mapred.aspects.MapReduceCalls'
(MapReduceCalls.aj:31)
[INFO] Join point 'method-execution(void
org.apache.hadoop.mapred.Medusa$MyFilterMapper.map(java.lang.Object,
org.apache.hadoop.io.Text,
org.apache.hadoop.mapreduce.Mapper$Context))' in Type
'org.apache.hadoop.mapred.Medusa$MyFilterMapper'
(Medusa.java:77) advised by before advice from
'org.apache.hadoop.mapred.aspects.MapReduceCalls'
(MapReduceCalls.aj:11)
[INFO] Join point 'method-execution(void
org.apache.hadoop.mapred.examples.MyWordCount$MyReducer.reduce(org.apache.hadoop.io.Text,
java.lang.Iterable,
org.apache.hadoop.mapreduce.Reducer$Context))' in Type
'org.apache.hadoop.mapred.examples.MyWordCount$MyReducer'
(MyWordCount.java:51) advised by before advice from
'org.apache.hadoop.mapred.aspects.MapReduceCalls'
(MapReduceCalls.aj:27)
```
On 09/23/2015 04:33 PM,
Andy Clement wrote:
With pure AspectJ, if you want to intercept when
map/reduce/cleanup are running, you can use something
like this:
=== Foo.aj ===
aspect Foo {
before(): execution(* map(..)) {
System.out.println(thisJoinPoint);
}
before(): execution(* reduce(..)) {
System.out.println(thisJoinPoint);
}
before(): execution(* cleanup(..))
{
System.out.println(thisJoinPoint);
}
}
===
ajc -1.8 Foo.aj MyWordCount.java
-showWeaveInfo
This will compile and weave
MyWordCount - when you run it you will see the
advice output.
Alternatively if you want to advise
the call side of these things, you will need to
weave the hadoop jar (whatever that is):
=== Foo.aj ===
aspect Foo {
before(): call(* map(..)) {
System.out.println(thisJoinPoint);
}
before(): call(* reduce(..)) {
System.out.println(thisJoinPoint);
}
before(): call(* cleanup(..))
{
System.out.println(thisJoinPoint);
}
}
===
ajc -1.8 Foo.aj -inpath hadoop.jar
-outjar woven_hadoop.jar -showWeaveInfo
Now when you run your MyWordCount
program you need to be using the woven_hadoop jar
instead of the regular hadoop jar.
cheers,
Andy
Hi,
I am trying to
pointcut calls of map
and reduce
from Hadoop MapReduce.
When you create a
program to work with the application
MapReduce, you need to define map
and reduce
functions. The functions are called by
the framework MapReduce. Here is an
example of a MapReduce program
programmed by the user [1].
I am using Spring
AOP to intercept the call to the map
, reduce
,
and cleanup
functions, but it is not working. Maybe
I must use AspectJ. The problem is that
I don’t know how to do it with AspectJ.
Any help to try make this work?
[1] Wordcount
program
package org.apache.hadoop.mapred.examples;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.mapred.lib.HashPartitioner;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.ReduceContext;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.util.GenericOptionsParser;
import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;
/**
* My example of a common wordcount. Compare with the official WordCount.class to understand the differences between both classes.
*/
public class MyWordCount {
public static class MyMap extends Mapper {
private final static IntWritable _one_ = new IntWritable(1);
private Text word = new Text();
private MedusaDigests parser = new MedusaDigests();
public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
output.collect(word, one);
}
}
public void run(Context context) throws IOException, InterruptedException {
setup(context);
try {
while (context.nextKeyValue()) {
System.out.println("Key: " + context.getCurrentKey() + " Value: " + context.getCurrentValue());
map(context.getCurrentKey(), context.getCurrentValue(), context);
}
} finally {
cleanup(context);
}
}
public void cleanup(Mapper.Context context) {}
}
public static class MyReducer extends Reducer {
private IntWritable result = new IntWritable();
MedusaDigests parser = new MedusaDigests();
public void reduce(Text key, Iterable<IntWritable> values, Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
System.out.println(" - key ( " + key.getClass().toString() + "): " + key.toString()
+ " value ( " + val.getClass().toString() + " ): " + val.toString());
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
public void run(Context context) throws IOException, InterruptedException {
setup(context);
try {
while (context.nextKey()) {
System.out.println("Key: " + context.getCurrentKey());
reduce(context.getCurrentKey(), context.getValues(), context);
// If a back up store is used, reset it
Iterator<IntWritable> iter = context.getValues().iterator();
if(iter instanceof ReduceContext.ValueIterator) {
((ReduceContext.ValueIterator<IntWritable>)iter).resetBackupStore();
}
}
} finally {
cleanup(context);
}
}
public void cleanup(Reducer.Context context) {}
}
public static void main(String[] args) throws Exception {
GenericOptionsParser parser = new GenericOptionsParser(new Configuration(), args);
String[] otherArgs = parser.getRemainingArgs();
if (otherArgs.length < 2) {
System.err.println("Usage: wordcount [<in>...] <out>");
System.exit(2);
}
// first map tasks
JobConf conf = new JobConf(MyWordCount.class);
conf.setJobName("wordcount");
conf.setClass("mapreduce.job.map.identity.class", MyFullyIndentityMapper.class, Mapper.class);
conf.setJarByClass(MyWordCount.class);
conf.setPartitionerClass(HashPartitioner.class);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setNumReduceTasks(1);
Path[] inputPaths = new Path[otherArgs.length-1];
for (int i = 0; i < otherArgs.length - 1; ++i) { inputPaths[i] = new Path(otherArgs[i]); }
Path outputPath = new Path(otherArgs[otherArgs.length - 1]);
FileInputFormat.setInputPaths(conf, inputPaths);
FileOutputFormat.setOutputPath(conf, outputPath);
// launch the job directly
Medusa.runJob(conf);
System.exit(0);
}
}
Thanks,
_______________________________________________
aspectj-users mailing list
aspectj-users@xxxxxxxxxxx
To change your delivery options, retrieve your
password, or unsubscribe from this list, visit
https://dev.eclipse.org/mailman/listinfo/aspectj-users
_______________________________________________
aspectj-users mailing list
aspectj-users@xxxxxxxxxxx
To change your delivery options, retrieve your password, or unsubscribe from this list, visit
https://dev.eclipse.org/mailman/listinfo/aspectj-users
_______________________________________________
aspectj-users mailing list
aspectj-users@xxxxxxxxxxx
To change your delivery options, retrieve your password,
or unsubscribe from this list, visit
https://dev.eclipse.org/mailman/listinfo/aspectj-users
_______________________________________________
aspectj-users mailing list
aspectj-users@xxxxxxxxxxx
To change your delivery options, retrieve your password, or unsubscribe from this list, visit
https://dev.eclipse.org/mailman/listinfo/aspectj-users
To change your delivery options, retrieve your password, or unsubscribe from this list, visit