Skip to main content

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index] [List Home]
Re: [aspectj-users] Use AspectJ with Hadoop MapReduce

I can imagine that 
pointcut printWrite(): call(* Context.write(..));
might not do what you expect, I mean your type ‘Context’ - is it in a package? You haven’t specified or wildcarded the package.
I don’t understand why 
pointcut printWrite(): call(* write(..));
doesn’t work though, are you *sure* that doesn’t work?
Andy

On Sep 25, 2015, at 7:36 AM, xeonmailinglist <xeonmailinglist@xxxxxxxxx> wrote:

I have found the solution. Strangely, this aspect works [1], but these pointcuts [2] don't. If anyone could try to explain I would be appreciated.

[1] Aspect that works
pointcut printWrite(): call(* *.write(..));
before(): printWrite() {
    System.out.println("Busted2");
}

[2] Aspects that doesn't work
pointcut printWrite(): call(* write(..));
pointcut printWrite(): call(* Context.write(..));




On 09/25/2015 03:12 PM, xeonmailinglist wrote:

In my example, I have the following snippet [1]. I want to execute my aspect before calling context.write(this.word, one) from the example. Therefore, I have created this aspect [2] based on your solution.

The context variable is from this type public abstract class TaskInputOutputContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends TaskAttemptContext implements Progressable, and the superclass is from the type public class TaskAttemptContext extends JobContext implements Progressable.

After I compile my code, I can see that the call invocation was not placed in the class file [3].

Why the call designator is not working?

[1] Snippet of the wordcount example.

while(itr.hasMoreTokens()) {
this.word.set(itr.nextToken());
context.write(this.word, one);
}

[2] My aspect


before(): call(* *Context.write(..)) {
    System.out.println("Aspects Write");
    System.out.println(thisJoinPoint.getSourceLocation().getFileName());
}

[3] Compiled version of the job. This is the class file.

public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        MapReduceCalls.aspectOf().ajc$before$org_apache_hadoop_mapred_aspects_MapReduceCalls$1$9e048e72();
        StringTokenizer itr = new StringTokenizer(value.toString());

        while(itr.hasMoreTokens()) {
            this.word.set(itr.nextToken());
            context.write(this.word, one);
        }
    }

Cheers,


On 09/23/2015 04:33 PM, Andy Clement wrote:

With pure AspectJ, if you want to intercept when map/reduce/cleanup are running, you can use something like this:

=== Foo.aj ===
aspect Foo {
  before(): execution(* map(..)) {
     System.out.println(thisJoinPoint);
  }
  before(): execution(* reduce(..)) {
     System.out.println(thisJoinPoint);
  }
  before(): execution(* cleanup(..)) {
     System.out.println(thisJoinPoint);
  }
}
===

ajc -1.8 Foo.aj MyWordCount.java -showWeaveInfo

This will compile and weave MyWordCount - when you run it you will see the advice output.

Alternatively if you want to advise the call side of these things, you will need to weave the hadoop jar (whatever that is):

=== Foo.aj ===
aspect Foo {
  before(): call(* map(..)) {
     System.out.println(thisJoinPoint);
  }
  before(): call(* reduce(..)) {
     System.out.println(thisJoinPoint);
  }
  before(): call(* cleanup(..)) {
     System.out.println(thisJoinPoint);
  }
}
===

ajc -1.8 Foo.aj -inpath hadoop.jar -outjar woven_hadoop.jar -showWeaveInfo

Now when you run your MyWordCount program you need to be using the woven_hadoop jar instead of the regular hadoop jar.

cheers,
Andy

On Sep 23, 2015, at 7:34 AM, xeonmailinglist <xeonmailinglist@xxxxxxxxx> wrote:

Hi,

I am trying to pointcut calls of map and reduce from Hadoop MapReduce.

When you create a program to work with the application MapReduce, you need to define map and reduce functions. The functions are called by the framework MapReduce. Here is an example of a MapReduce program programmed by the user [1].

I am using Spring AOP to intercept the call to the map , reduce, and cleanup functions, but it is not working. Maybe I must use AspectJ. The problem is that I don’t know how to do it with AspectJ. Any help to try make this work?

[1] Wordcount program


package org.apache.hadoop.mapred.examples;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.mapred.lib.HashPartitioner;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.ReduceContext;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.util.GenericOptionsParser;

import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;

/**
 * My example of a common wordcount. Compare with the official WordCount.class to understand the differences between both classes.
 */
public class MyWordCount {
    public static class MyMap extends Mapper {
        private final static IntWritable _one_ = new IntWritable(1);
        private Text word = new Text();
        private MedusaDigests parser = new MedusaDigests();

        public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
            StringTokenizer itr = new StringTokenizer(value.toString());
            while (itr.hasMoreTokens()) {
                word.set(itr.nextToken());
                output.collect(word, one);
            }
        }

        public void run(Context context) throws IOException, InterruptedException {
            setup(context);
            try {
                while (context.nextKeyValue()) {
                    System.out.println("Key: " + context.getCurrentKey() + " Value: " + context.getCurrentValue());
                    map(context.getCurrentKey(), context.getCurrentValue(), context);
                }
            } finally {
                cleanup(context);
            }
        }

        public void cleanup(Mapper.Context context) {}
    }

    public static class MyReducer extends Reducer {
        private IntWritable result = new IntWritable();
        MedusaDigests parser = new MedusaDigests();

        public void reduce(Text key, Iterable<IntWritable> values, Context context
        ) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable val : values) {
                System.out.println(" - key ( " + key.getClass().toString() + "): " + key.toString()
                        + " value ( " + val.getClass().toString() + " ): " + val.toString());
                sum += val.get();
            }
            result.set(sum);
            context.write(key, result);
        }

        public void run(Context context) throws IOException, InterruptedException {
            setup(context);
            try {
                while (context.nextKey()) {
                    System.out.println("Key: " + context.getCurrentKey());
                    reduce(context.getCurrentKey(), context.getValues(), context);
                    // If a back up store is used, reset it
                    Iterator<IntWritable> iter = context.getValues().iterator();
                    if(iter instanceof ReduceContext.ValueIterator) {
                        ((ReduceContext.ValueIterator<IntWritable>)iter).resetBackupStore();
                    }
                }
            } finally {
                cleanup(context);
            }
        }

        public void cleanup(Reducer.Context context) {}
    }

    public static void main(String[] args) throws Exception {
        GenericOptionsParser parser = new GenericOptionsParser(new Configuration(), args);

        String[] otherArgs = parser.getRemainingArgs();
        if (otherArgs.length < 2) {
            System.err.println("Usage: wordcount [<in>...] <out>");
            System.exit(2);
        }

        // first map tasks
        JobConf conf = new JobConf(MyWordCount.class);
        conf.setJobName("wordcount");
        conf.setClass("mapreduce.job.map.identity.class", MyFullyIndentityMapper.class, Mapper.class);

        conf.setJarByClass(MyWordCount.class);
        conf.setPartitionerClass(HashPartitioner.class);
        conf.setOutputKeyClass(Text.class);
        conf.setOutputValueClass(IntWritable.class);
        conf.setNumReduceTasks(1);

        Path[] inputPaths = new Path[otherArgs.length-1];
        for (int i = 0; i < otherArgs.length - 1; ++i) { inputPaths[i] = new Path(otherArgs[i]); }
        Path outputPath =  new Path(otherArgs[otherArgs.length - 1]);
        FileInputFormat.setInputPaths(conf, inputPaths);
        FileOutputFormat.setOutputPath(conf, outputPath);

        // launch the job directly
        Medusa.runJob(conf);
        System.exit(0);
    }
}

Thanks,

_______________________________________________
aspectj-users mailing list
aspectj-users@xxxxxxxxxxx
To change your delivery options, retrieve your password, or unsubscribe from this list, visit
https://dev.eclipse.org/mailman/listinfo/aspectj-users



_______________________________________________
aspectj-users mailing list
aspectj-users@xxxxxxxxxxx
To change your delivery options, retrieve your password, or unsubscribe from this list, visit
https://dev.eclipse.org/mailman/listinfo/aspectj-users

_______________________________________________
aspectj-users mailing list
aspectj-users@xxxxxxxxxxx
To change your delivery options, retrieve your password, or unsubscribe from this list, visit
https://dev.eclipse.org/mailman/listinfo/aspectj-users


Back to the top