Skip to main content

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index] [List Home]
[aspectj-users] AspectJ is not intercepting `collect` call

Hi,

I am trying to use AspectJ with an MapReduce example, although I am not understanding one thing. But first, let me give you the code that I have.

[1] Wordcount example


package org.apache.hadoop.mapred.examples;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.*;

import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;

/**
 * Common Wordcount example
 */
public class WordCount {

    public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {
        private final static IntWritable _one_ = new IntWritable(1);
        private Text word = new Text();

        public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
            String line = value.toString();
            StringTokenizer tokenizer = new StringTokenizer(line);
            while (tokenizer.hasMoreTokens()) {
                word.set(tokenizer.nextToken());
                output.collect(word, one);
            }
        }
    }

    public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
        public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
            int sum = 0;
            while (values.hasNext()) {
                sum += values.next().get();
            }
            output.collect(key, new IntWritable(sum));
        }
    }

    public static void main(String[] args) throws Exception {
        JobConf conf = new JobConf(WordCount.class);
        conf.setJobName("wordcount");

        conf.setOutputKeyClass(Text.class);
        conf.setOutputValueClass(IntWritable.class);

        conf.setMapperClass(Map.class);
        conf.setCombinerClass(Reduce.class);
        conf.setReducerClass(Reduce.class);
        conf.setNumReduceTasks(2);

        conf.setInputFormat(TextInputFormat.class);
        conf.setOutputFormat(TextOutputFormat.class);

        FileInputFormat.setInputPaths(conf, new Path(args[0]));
        FileOutputFormat.setOutputPath(conf, new Path(args[1]));

        JobClient.runJob(conf);
    }
}

[2] My mapreduce aspects


package org.apache.hadoop.mapred.aspects;

import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Before;

@Aspect
public class MapReduceAspects {
    @Before("execution(* map(..))")
    public void mymap(JoinPoint joinPoint) {
        System.out.println("My Map Execution: " + joinPoint.getArgs() + ":" + joinPoint.getTarget());
        Object[] obj = joinPoint.getArgs();
        for (Object o : obj){
            System.out.println(o.toString());
        }
    }

    @Before("execution(* reduce(..))")
    public void myreduce() { System.out.println("My Reduce Execution"); }

    @Before("execution(* collect(..))")
    public void updatehash(JoinPoint joinPoint) {
        System.out.println("Output collect: Args: " + joinPoint.getArgs());

    }
}

[3] bean-aspects.xml


<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p"
       xmlns:aop="http://www.springframework.org/schema/aop" xmlns:context="http://www.springframework.org/schema/context"
       xmlns:jee="http://www.springframework.org/schema/jee" xmlns:tx="http://www.springframework.org/schema/tx"
       xmlns:task="http://www.springframework.org/schema/task"
       xsi:schemaLocation="http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.2.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.2.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spr
ing-context-3.2.xsd http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-3.2.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.2.xsd http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.2.xsd">

    <aop:aspectj-autoproxy proxy-target-class="true">
        <aop:include name="mapreduceAspect"/>
    </aop:aspectj-autoproxy>

    <bean id="mapreduceAspect" class="org.apache.hadoop.mapred.aspects.MapReduceAspects"/>
</beans>

In [1], I have an wordcount example with map and reduce function. When I launch my application in the MapReduce framework, the framework will create a job that will execute the map and reduce functions. The map function accepts an input dir, and the reduce function outputs the result.

I can intercept the map and reduce function calls with AspectJ, but I can’t intercept the collect call in the instruction output.collect(word, one) that is in the map function. Why this happens? Didn`t I configure the Aspects correctly?

I would love if anyone could explain me why AspectJ is behaving like this.

Thanks,


Back to the top