Hadoop实现共同出现的单词(Word co(2)

// A Comparator that com.pares serialized StringPair. 
    public static class Comparator extends WritableComparator {
     private static final Text.Comparator TEXT_COMPARATOR = new Text.Comparator();
     public Comparator() {
      super(TextPair.class);
     }
     @Override
     public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2){
      try {
       int firstl1 = WritableUtils.decodeVIntSize(b1[s1]) + readVInt(b1, s1);
       int firstl2 = WritableUtils.decodeVIntSize(b2[s2]) + readVInt(b2, s2);
       int cmp = TEXT_COMPARATOR.compare(b1, s1, firstl1, b2, s2, firstl2);
       if(cmp != 0)
        return cmp;
       return TEXT_COMPARATOR.compare(b1, s1 + firstl1, l1 - firstl1,
                b2, s2 + firstl2, l1 - firstl2);
      }catch (IOException e) {
       throw new IllegalArgumentException(e);
      }
     }
    }//End of Comparator
    static { // register this comparator
      WritableComparator.define(TextPair.class, new Comparator());
    }

// Compare only the first part of the pair, so that reduce is called once for each value of the first part.
    public static class FirstComparator extends WritableComparator {
     private static final Text.Comparator TEXT_COMPARATOR = new Text.Comparator();
     public FirstComparator() {
      super(TextPair.class);
     }   
     @Override
     public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2){
      try {
       int firstl1 = WritableUtils.decodeVIntSize(b1[s1]) + readVInt(b1, s1);
       int firstl2 = WritableUtils.decodeVIntSize(b2[s2]) + readVInt(b2, s2);
       return TEXT_COMPARATOR.compare(b1, s1, firstl1, b2, s2, firstl2);
      }catch (IOException e) {
       throw new IllegalArgumentException(e);
      }
     }
     /*
      @Override
      public int compare(WritableComparator a, WritableComparator b) {
       if(a instanceof TextPair && b instanceof TextPair)
        return ((TextPair)a).first.compareTo(((TextPair)b).first);
       return super.compare(a, b);
      }*/
    }//End of FirstComparator   
  }//End of TextPair
 
  //Partition based on the first part of the pair.
  public static class FirstPartitioner extends Partitioner<TextPair,IntWritable>{
    @Override
    public int getPartition(TextPair key, IntWritable value, int numPartitions) {
      return Math.abs(key.getFirst().toString().indexOf(0) * 127) % numPartitions;//May be some trouble here.
    }
  }//End of FirstPartitioner

public static class MyMapper extends Mapper<LongWritable, Text, TextPair, IntWritable> {   
    private final static IntWritable one = new IntWritable(1);
    private static Text word0 = new Text();
    private static Text word1 = new Text();
    private String pattern = "[^a-zA-Z0-9-']";

@Override
    public void map(LongWritable inKey, Text inValue, Context context)throws IOException, InterruptedException {
     String line = inValue.toString();
     line = line.replaceAll(pattern, " ");
     line = line.toLowerCase();
     String[] str = line.split(" +");
     for(int i=0; i< str.length-1; i++)
     {
      word0.set(str[i]);
      word1.set(str[i+1]);
      TextPair pair = new TextPair(word0, word1);
      context.write(pair, one);
     }
    }
  }//End of MapClass
  public static class MyReducer extends Reducer<TextPair, IntWritable, TextPair, IntWritable> {
    private IntWritable result = new IntWritable();
   
    @Override
    public void reduce(TextPair inKey, Iterable<IntWritable> inValues, Context context) throws IOException, InterruptedException {
     int sum = 0;
        for (IntWritable val : inValues) {
          sum += val.get();
        }
        result.set(sum);
        context.write(inKey, result);
    }
  }//End of MyReducer
 
  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    //conf.set("Hadoop.job.ugi", "sunguoli,cs402");
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    //if (otherArgs.length != 2) {
    //  System.err.println("Usage: CoOccurrence <in> <out>");
    //  System.exit(2);
    //}
    Job job = new Job(conf, "Co-Occurrence");
    job.setJarByClass(CoOccurrence.class);
   
    job.setMapperClass(MyMapper.class);
    job.setMapOutputKeyClass(TextPair.class);
    job.setMapOutputValueClass(IntWritable.class);
   
    job.setCombinerClass(MyReducer.class);

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:http://www.heiqu.com/96f9f507f1404d812256d80db80fcec8.html