Skip to main content

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index] [List Home]
Re: [aspectj-users] Use AspectJ with Hadoop MapReduce

I don’t speak maven very well unfortunately but as the weave messages are coming out it sounds like you aren’t running the woven version of the code (or at least the unwoven version is perhaps ahead of the woven version on your class path).

You can easily javap -private -verbose on the class files you are running to verify they contain calls to ajc$ methods (which represent the advice).

cheers,
Andy

On Sep 23, 2015, at 10:52 AM, xeonmailinglist <xeonmailinglist@xxxxxxxxx> wrote:

Thanks for your help. This is a great example where I can start with. Although, it seems that the aspect is not running. I have weaved the aspect file with maven [1] and everything seems ok according the compilation output [2].

But when I run the example, nothing happens. Do you have any idea of what it might be?

[1] Maven configuration

```
<plugin>
    <!-- compile with maven -->
    <groupId>org.codehaus.mojo</groupId>
    <artifactId>aspectj-maven-plugin</artifactId>
    <version>${maven.aspectj.version}</version>
    <configuration>
        <complianceLevel>1.7</complianceLevel>
        <source>1.7</source>
        <target>1.7</target>
        <Xlint>error</Xlint>
        <verbose>true</verbose>
        <forceAjcCompile>true</forceAjcCompile>
        <showWeaveInfo>true</showWeaveInfo>
    </configuration>
    <executions>
        <execution>
            <goals>
                <goal>compile</goal>
            </goals>
        </execution>
    </executions>
</plugin>
```

[2] Compilation output

```
[INFO] --- aspectj-maven-plugin:1.7:compile (default) @ medusa-java ---
[INFO] Showing AJC message detail for messages of types: [error, warning, fail]
[INFO] Join point 'method-execution(void org.apache.hadoop.mapred.MedusaDigests.cleanup(java.lang.Object))' in Type 'org.apache.hadoop.mapred.MedusaDigests' (MedusaDigests.java:100) advised by before advice from 'org.apache.hadoop.mapred.aspects.MapReduceCalls' (MapReduceCalls.aj:31)
[INFO] Join point 'method-execution(void org.apache.hadoop.mapred.Medusa$MyFilterMapper.map(java.lang.Object, org.apache.hadoop.io.Text, org.apache.hadoop.mapreduce.Mapper$Context))' in Type 'org.apache.hadoop.mapred.Medusa$MyFilterMapper' (Medusa.java:77) advised by before advice from 'org.apache.hadoop.mapred.aspects.MapReduceCalls' (MapReduceCalls.aj:11)
[INFO] Join point 'method-execution(void org.apache.hadoop.mapred.examples.MyWordCount$MyReducer.reduce(org.apache.hadoop.io.Text, java.lang.Iterable, org.apache.hadoop.mapreduce.Reducer$Context))' in Type 'org.apache.hadoop.mapred.examples.MyWordCount$MyReducer' (MyWordCount.java:51) advised by before advice from 'org.apache.hadoop.mapred.aspects.MapReduceCalls' (MapReduceCalls.aj:27)

```



On 09/23/2015 04:33 PM, Andy Clement wrote:
With pure AspectJ, if you want to intercept when map/reduce/cleanup are running, you can use something like this:

=== Foo.aj ===
aspect Foo {
  before(): execution(* map(..)) {
     System.out.println(thisJoinPoint);
  }
  before(): execution(* reduce(..)) {
     System.out.println(thisJoinPoint);
  }
  before(): execution(* cleanup(..)) {
     System.out.println(thisJoinPoint);
  }
}
===

ajc -1.8 Foo.aj MyWordCount.java -showWeaveInfo

This will compile and weave MyWordCount - when you run it you will see the advice output.

Alternatively if you want to advise the call side of these things, you will need to weave the hadoop jar (whatever that is):

=== Foo.aj ===
aspect Foo {
  before(): call(* map(..)) {
     System.out.println(thisJoinPoint);
  }
  before(): call(* reduce(..)) {
     System.out.println(thisJoinPoint);
  }
  before(): call(* cleanup(..)) {
     System.out.println(thisJoinPoint);
  }
}
===

ajc -1.8 Foo.aj -inpath hadoop.jar -outjar woven_hadoop.jar -showWeaveInfo

Now when you run your MyWordCount program you need to be using the woven_hadoop jar instead of the regular hadoop jar.

cheers,
Andy

On Sep 23, 2015, at 7:34 AM, xeonmailinglist <xeonmailinglist@xxxxxxxxx> wrote:

Hi,

I am trying to pointcut calls of map and reduce from Hadoop MapReduce.

When you create a program to work with the application MapReduce, you need to define map and reduce functions. The functions are called by the framework MapReduce. Here is an example of a MapReduce program programmed by the user [1].

I am using Spring AOP to intercept the call to the map , reduce, and cleanup functions, but it is not working. Maybe I must use AspectJ. The problem is that I don’t know how to do it with AspectJ. Any help to try make this work?

[1] Wordcount program


package org.apache.hadoop.mapred.examples;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.mapred.lib.HashPartitioner;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.ReduceContext;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.util.GenericOptionsParser;

import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;

/**
 * My example of a common wordcount. Compare with the official WordCount.class to understand the differences between both classes.
 */
public class MyWordCount {
    public static class MyMap extends Mapper {
        private final static IntWritable _one_ = new IntWritable(1);
        private Text word = new Text();
        private MedusaDigests parser = new MedusaDigests();

        public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
            StringTokenizer itr = new StringTokenizer(value.toString());
            while (itr.hasMoreTokens()) {
                word.set(itr.nextToken());
                output.collect(word, one);
            }
        }

        public void run(Context context) throws IOException, InterruptedException {
            setup(context);
            try {
                while (context.nextKeyValue()) {
                    System.out.println("Key: " + context.getCurrentKey() + " Value: " + context.getCurrentValue());
                    map(context.getCurrentKey(), context.getCurrentValue(), context);
                }
            } finally {
                cleanup(context);
            }
        }

        public void cleanup(Mapper.Context context) {}
    }

    public static class MyReducer extends Reducer {
        private IntWritable result = new IntWritable();
        MedusaDigests parser = new MedusaDigests();

        public void reduce(Text key, Iterable<IntWritable> values, Context context
        ) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable val : values) {
                System.out.println(" - key ( " + key.getClass().toString() + "): " + key.toString()
                        + " value ( " + val.getClass().toString() + " ): " + val.toString());
                sum += val.get();
            }
            result.set(sum);
            context.write(key, result);
        }

        public void run(Context context) throws IOException, InterruptedException {
            setup(context);
            try {
                while (context.nextKey()) {
                    System.out.println("Key: " + context.getCurrentKey());
                    reduce(context.getCurrentKey(), context.getValues(), context);
                    // If a back up store is used, reset it
                    Iterator<IntWritable> iter = context.getValues().iterator();
                    if(iter instanceof ReduceContext.ValueIterator) {
                        ((ReduceContext.ValueIterator<IntWritable>)iter).resetBackupStore();
                    }
                }
            } finally {
                cleanup(context);
            }
        }

        public void cleanup(Reducer.Context context) {}
    }

    public static void main(String[] args) throws Exception {
        GenericOptionsParser parser = new GenericOptionsParser(new Configuration(), args);

        String[] otherArgs = parser.getRemainingArgs();
        if (otherArgs.length < 2) {
            System.err.println("Usage: wordcount [<in>...] <out>");
            System.exit(2);
        }

        // first map tasks
        JobConf conf = new JobConf(MyWordCount.class);
        conf.setJobName("wordcount");
        conf.setClass("mapreduce.job.map.identity.class", MyFullyIndentityMapper.class, Mapper.class);

        conf.setJarByClass(MyWordCount.class);
        conf.setPartitionerClass(HashPartitioner.class);
        conf.setOutputKeyClass(Text.class);
        conf.setOutputValueClass(IntWritable.class);
        conf.setNumReduceTasks(1);

        Path[] inputPaths = new Path[otherArgs.length-1];
        for (int i = 0; i < otherArgs.length - 1; ++i) { inputPaths[i] = new Path(otherArgs[i]); }
        Path outputPath =  new Path(otherArgs[otherArgs.length - 1]);
        FileInputFormat.setInputPaths(conf, inputPaths);
        FileOutputFormat.setOutputPath(conf, outputPath);

        // launch the job directly
        Medusa.runJob(conf);
        System.exit(0);
    }
}

Thanks,

_______________________________________________
aspectj-users mailing list
aspectj-users@xxxxxxxxxxx
To change your delivery options, retrieve your password, or unsubscribe from this list, visit
https://dev.eclipse.org/mailman/listinfo/aspectj-users



_______________________________________________
aspectj-users mailing list
aspectj-users@xxxxxxxxxxx
To change your delivery options, retrieve your password, or unsubscribe from this list, visit
https://dev.eclipse.org/mailman/listinfo/aspectj-users

_______________________________________________
aspectj-users mailing list
aspectj-users@xxxxxxxxxxx
To change your delivery options, retrieve your password, or unsubscribe from this list, visit
https://dev.eclipse.org/mailman/listinfo/aspectj-users


Back to the top