Skip to main content

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index] [List Home]
Re: [aspectj-users] Use AspectJ with Hadoop MapReduce


On Sep 24, 2015, at 3:23 AM, xeonmailinglist <xeonmailinglist@xxxxxxxxx> wrote:

1. I decompiled the class, and it looks like the ajc calls are that, but I am not sure. Here is a snippet of the decompilation [1] and [2]. Can you confirm if the ajc call are there? I can't see the output anywhere, or even the code does not stop in the breakpoint during the debug.

Yep that is calling the advice. I’m not sure what your advice is doing though.


2. In the decompiled file I have this error [3], but I don't know if it is very important. What do you think?

Nope, it isn’t important. AspectJ creates a custom attribute that javap cannot understand, no big deal.


3. In the last message you said "weave messages are coming out it sounds like you aren’t running the woven version of the code ". How do you know that? 

Simply that if your compiled step is showing weaving messages then weaving is happening. We have now confirmed your code is woven looking at the byte code. So there are only two options now:
1. whatever your advice is doing is not sending data to where you think/hope it is. If it is doing System.out.println() maybe someone is swallowing that so you don’t get it on your console, I’m not sure.
2. you aren’t running the woven code because you’ve picked up the unwoven version of the code earlier in the class path.

If running in debug mode isn’t triggering breakpoints in the advice (not sure where you are putting the breakpoints - the advice?) then that also suggests (2).

cheers
Andy




[1] Decompiled code from javap

```
 public void map(org.apache.hadoop.io.LongWritable, org.apache.hadoop.io.Text, org.apache.hadoop.mapred.OutputCollector<org.apache.hadoop.io.Text, org.apache.hadoop.io.IntWritable>, org.apache.hadoop.mapred.Reporter) throws java.io.IOException;
    descriptor: (Lorg/apache/hadoop/io/LongWritable;Lorg/apache/hadoop/io/Text;Lorg/apache/hadoop/mapred/OutputCollector;Lorg/apache/hadoop/mapred/Reporter;)V
    flags: ACC_PUBLIC
    Code:
      stack=3, locals=6, args_size=5
         0: invokestatic  #143                // Method org/apache/hadoop/mapred/aspects/MapReduceCalls.aspectOf:()Lorg/apache/hadoop/mapred/aspects/MapReduceCalls;
         3: invokevirtual #146                // Method org/apache/hadoop/mapred/aspects/MapReduceCalls.ajc$before$org_apache_hadoop_mapred_aspects_MapReduceCalls$1$9e048e72:()V
         6: new           #39                 // class java/util/StringTokenizer
         9: dup
        10: aload_2
        11: invokevirtual #41                 // Method org/apache/hadoop/io/Text.toString:()Ljava/lang/String;
        14: invokespecial #45                 // Method java/util/StringTokenizer."<init>":(Ljava/lang/String;)V
        17: astore        5
        19: goto          47
        22: aload_0
        23: getfield      #27                 // Field word:Lorg/apache/hadoop/io/Text;
        26: aload         5
        28: invokevirtual #48                 // Method java/util/StringTokenizer.nextToken:()Ljava/lang/String;
        31: invokevirtual #51                 // Method org/apache/hadoop/io/Text.set:(Ljava/lang/String;)V
        34: aload_3
        35: aload_0
        36: getfield      #27                 // Field word:Lorg/apache/hadoop/io/Text;
        39: getstatic     #18                 // Field one:Lorg/apache/hadoop/io/IntWritable;
        42: invokeinterface #54,  3           // InterfaceMethod org/apache/hadoop/mapred/OutputCollector.collect:(Ljava/lang/Object;Ljava/lang/Object;)V
        47: aload         5
        49: invokevirtual #60                 // Method java/util/StringTokenizer.hasMoreTokens:()Z
        52: ifne          22
        55: return
      LocalVariableTable:
        Start  Length  Slot  Name   Signature
            0      56     0  this   Lorg/apache/hadoop/mapred/examples/MyWordCount$MyMap;
            0      56     1   key   Lorg/apache/hadoop/io/LongWritable;
            0      56     2 value   Lorg/apache/hadoop/io/Text;
            0      56     3 output   Lorg/apache/hadoop/mapred/OutputCollector;
            0      56     4 reporter   Lorg/apache/hadoop/mapred/Reporter;
           19      37     5   itr   Ljava/util/StringTokenizer;
      LineNumberTable:
        line 26: 0
        line 27: 19
        line 28: 22
        line 29: 34
        line 27: 47
        line 31: 55
      StackMapTable: number_of_entries = 2
        frame_type = 252 /* append */
          offset_delta = 22
          locals = [ class java/util/StringTokenizer ]
        frame_type = 24 /* same */
    Exceptions:
      throws java.io.IOException
    Signature: #37                          // (Lorg/apache/hadoop/io/LongWritable;Lorg/apache/hadoop/io/Text;Lorg/apache/hadoop/mapred/OutputCollector<Lorg/apache/hadoop/io/Text;Lorg/apache/hadoop/io/IntWritable;>;Lorg/apache/hadoop/mapred/Reporter;)V
Error: unknown attribute
      org.aspectj.weaver.MethodDeclarationLineNumber: length = 0x8
       00 00 00 19 00 00 03 68

```

[2] Decompiled code that intellij generated.
```
    public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
        MapReduceCalls.aspectOf().ajc$before$org_apache_hadoop_mapred_aspects_MapReduceCalls$1$9e048e72();
        StringTokenizer itr = new StringTokenizer(value.toString());

        while(itr.hasMoreTokens()) {
            this.word.set(itr.nextToken());
            output.collect(this.word, one);
        }

    }
```


[3]  unknown error
```
Error: unknown attribute     
org.aspectj.weaver.MethodDeclarationLineNumber: length = 0x8


```

On 09/23/2015 09:36 PM, Andy Clement wrote:
I don’t speak maven very well unfortunately but as the weave messages are coming out it sounds like you aren’t running the woven version of the code (or at least the unwoven version is perhaps ahead of the woven version on your class path).

You can easily javap -private -verbose on the class files you are running to verify they contain calls to ajc$ methods (which represent the advice).

cheers,
Andy

On Sep 23, 2015, at 10:52 AM, xeonmailinglist <xeonmailinglist@xxxxxxxxx> wrote:

Thanks for your help. This is a great example where I can start with. Although, it seems that the aspect is not running. I have weaved the aspect file with maven [1] and everything seems ok according the compilation output [2].

But when I run the example, nothing happens. Do you have any idea of what it might be?

[1] Maven configuration

```
<plugin>
    <!-- compile with maven -->
    <groupId>org.codehaus.mojo</groupId>
    <artifactId>aspectj-maven-plugin</artifactId>
    <version>${maven.aspectj.version}</version>
    <configuration>
        <complianceLevel>1.7</complianceLevel>
        <source>1.7</source>
        <target>1.7</target>
        <Xlint>error</Xlint>
        <verbose>true</verbose>
        <forceAjcCompile>true</forceAjcCompile>
        <showWeaveInfo>true</showWeaveInfo>
    </configuration>
    <executions>
        <execution>
            <goals>
                <goal>compile</goal>
            </goals>
        </execution>
    </executions>
</plugin>
```

[2] Compilation output

```
[INFO] --- aspectj-maven-plugin:1.7:compile (default) @ medusa-java ---
[INFO] Showing AJC message detail for messages of types: [error, warning, fail]
[INFO] Join point 'method-execution(void org.apache.hadoop.mapred.MedusaDigests.cleanup(java.lang.Object))' in Type 'org.apache.hadoop.mapred.MedusaDigests' (MedusaDigests.java:100) advised by before advice from 'org.apache.hadoop.mapred.aspects.MapReduceCalls' (MapReduceCalls.aj:31)
[INFO] Join point 'method-execution(void org.apache.hadoop.mapred.Medusa$MyFilterMapper.map(java.lang.Object, org.apache.hadoop.io.Text, org.apache.hadoop.mapreduce.Mapper$Context))' in Type 'org.apache.hadoop.mapred.Medusa$MyFilterMapper' (Medusa.java:77) advised by before advice from 'org.apache.hadoop.mapred.aspects.MapReduceCalls' (MapReduceCalls.aj:11)
[INFO] Join point 'method-execution(void org.apache.hadoop.mapred.examples.MyWordCount$MyReducer.reduce(org.apache.hadoop.io.Text, java.lang.Iterable, org.apache.hadoop.mapreduce.Reducer$Context))' in Type 'org.apache.hadoop.mapred.examples.MyWordCount$MyReducer' (MyWordCount.java:51) advised by before advice from 'org.apache.hadoop.mapred.aspects.MapReduceCalls' (MapReduceCalls.aj:27)

```



On 09/23/2015 04:33 PM, Andy Clement wrote:
With pure AspectJ, if you want to intercept when map/reduce/cleanup are running, you can use something like this:

=== Foo.aj ===
aspect Foo {
  before(): execution(* map(..)) {
     System.out.println(thisJoinPoint);
  }
  before(): execution(* reduce(..)) {
     System.out.println(thisJoinPoint);
  }
  before(): execution(* cleanup(..)) {
     System.out.println(thisJoinPoint);
  }
}
===

ajc -1.8 Foo.aj MyWordCount.java -showWeaveInfo

This will compile and weave MyWordCount - when you run it you will see the advice output.

Alternatively if you want to advise the call side of these things, you will need to weave the hadoop jar (whatever that is):

=== Foo.aj ===
aspect Foo {
  before(): call(* map(..)) {
     System.out.println(thisJoinPoint);
  }
  before(): call(* reduce(..)) {
     System.out.println(thisJoinPoint);
  }
  before(): call(* cleanup(..)) {
     System.out.println(thisJoinPoint);
  }
}
===

ajc -1.8 Foo.aj -inpath hadoop.jar -outjar woven_hadoop.jar -showWeaveInfo

Now when you run your MyWordCount program you need to be using the woven_hadoop jar instead of the regular hadoop jar.

cheers,
Andy

On Sep 23, 2015, at 7:34 AM, xeonmailinglist <xeonmailinglist@xxxxxxxxx> wrote:

Hi,

I am trying to pointcut calls of map and reduce from Hadoop MapReduce.

When you create a program to work with the application MapReduce, you need to define map and reduce functions. The functions are called by the framework MapReduce. Here is an example of a MapReduce program programmed by the user [1].

I am using Spring AOP to intercept the call to the map , reduce, and cleanup functions, but it is not working. Maybe I must use AspectJ. The problem is that I don’t know how to do it with AspectJ. Any help to try make this work?

[1] Wordcount program


package org.apache.hadoop.mapred.examples;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.mapred.lib.HashPartitioner;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.ReduceContext;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.util.GenericOptionsParser;

import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;

/**
 * My example of a common wordcount. Compare with the official WordCount.class to understand the differences between both classes.
 */
public class MyWordCount {
    public static class MyMap extends Mapper {
        private final static IntWritable _one_ = new IntWritable(1);
        private Text word = new Text();
        private MedusaDigests parser = new MedusaDigests();

        public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
            StringTokenizer itr = new StringTokenizer(value.toString());
            while (itr.hasMoreTokens()) {
                word.set(itr.nextToken());
                output.collect(word, one);
            }
        }

        public void run(Context context) throws IOException, InterruptedException {
            setup(context);
            try {
                while (context.nextKeyValue()) {
                    System.out.println("Key: " + context.getCurrentKey() + " Value: " + context.getCurrentValue());
                    map(context.getCurrentKey(), context.getCurrentValue(), context);
                }
            } finally {
                cleanup(context);
            }
        }

        public void cleanup(Mapper.Context context) {}
    }

    public static class MyReducer extends Reducer {
        private IntWritable result = new IntWritable();
        MedusaDigests parser = new MedusaDigests();

        public void reduce(Text key, Iterable<IntWritable> values, Context context
        ) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable val : values) {
                System.out.println(" - key ( " + key.getClass().toString() + "): " + key.toString()
                        + " value ( " + val.getClass().toString() + " ): " + val.toString());
                sum += val.get();
            }
            result.set(sum);
            context.write(key, result);
        }

        public void run(Context context) throws IOException, InterruptedException {
            setup(context);
            try {
                while (context.nextKey()) {
                    System.out.println("Key: " + context.getCurrentKey());
                    reduce(context.getCurrentKey(), context.getValues(), context);
                    // If a back up store is used, reset it
                    Iterator<IntWritable> iter = context.getValues().iterator();
                    if(iter instanceof ReduceContext.ValueIterator) {
                        ((ReduceContext.ValueIterator<IntWritable>)iter).resetBackupStore();
                    }
                }
            } finally {
                cleanup(context);
            }
        }

        public void cleanup(Reducer.Context context) {}
    }

    public static void main(String[] args) throws Exception {
        GenericOptionsParser parser = new GenericOptionsParser(new Configuration(), args);

        String[] otherArgs = parser.getRemainingArgs();
        if (otherArgs.length < 2) {
            System.err.println("Usage: wordcount [<in>...] <out>");
            System.exit(2);
        }

        // first map tasks
        JobConf conf = new JobConf(MyWordCount.class);
        conf.setJobName("wordcount");
        conf.setClass("mapreduce.job.map.identity.class", MyFullyIndentityMapper.class, Mapper.class);

        conf.setJarByClass(MyWordCount.class);
        conf.setPartitionerClass(HashPartitioner.class);
        conf.setOutputKeyClass(Text.class);
        conf.setOutputValueClass(IntWritable.class);
        conf.setNumReduceTasks(1);

        Path[] inputPaths = new Path[otherArgs.length-1];
        for (int i = 0; i < otherArgs.length - 1; ++i) { inputPaths[i] = new Path(otherArgs[i]); }
        Path outputPath =  new Path(otherArgs[otherArgs.length - 1]);
        FileInputFormat.setInputPaths(conf, inputPaths);
        FileOutputFormat.setOutputPath(conf, outputPath);

        // launch the job directly
        Medusa.runJob(conf);
        System.exit(0);
    }
}

Thanks,

_______________________________________________
aspectj-users mailing list
aspectj-users@xxxxxxxxxxx
To change your delivery options, retrieve your password, or unsubscribe from this list, visit
https://dev.eclipse.org/mailman/listinfo/aspectj-users



_______________________________________________
aspectj-users mailing list
aspectj-users@xxxxxxxxxxx
To change your delivery options, retrieve your password, or unsubscribe from this list, visit
https://dev.eclipse.org/mailman/listinfo/aspectj-users

_______________________________________________
aspectj-users mailing list
aspectj-users@xxxxxxxxxxx
To change your delivery options, retrieve your password, or unsubscribe from this list, visit
https://dev.eclipse.org/mailman/listinfo/aspectj-users



_______________________________________________
aspectj-users mailing list
aspectj-users@xxxxxxxxxxx
To change your delivery options, retrieve your password, or unsubscribe from this list, visit
https://dev.eclipse.org/mailman/listinfo/aspectj-users

_______________________________________________
aspectj-users mailing list
aspectj-users@xxxxxxxxxxx
To change your delivery options, retrieve your password, or unsubscribe from this list, visit
https://dev.eclipse.org/mailman/listinfo/aspectj-users


Back to the top