hadoop二次排序的原理和实现方法(3)

Combiner 的输出是 reduce 的输入。除非重新定义一个Combiner。

3、代码实现

Hadoop的example包中自带了一个MapReduce的二次排序算法,下面对 example包中的二次排序进行改进

package com.buaa;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
/** 
* @ProjectName SecondarySort
* @PackageName com.buaa
* @ClassName IntPair
* @Description 将示例数据中的key/value封装成一个整体作为Key,同时实现 WritableComparable接口并重写其方法
* @Author 刘吉超
* @Date 2016-06-07 22:31:53
*/
public class IntPair implements WritableComparable<IntPair>{
  private int first;
  private int second;
  public IntPair(){
  }
  public IntPair(int left, int right){
    set(left, right);
  }
  public void set(int left, int right){
    first = left;
    second = right;
  }
  @Override
  public void readFields(DataInput in) throws IOException{
    first = in.readInt();
    second = in.readInt();
  }
  @Override
  public void write(DataOutput out) throws IOException{
    out.writeInt(first);
    out.writeInt(second);
  }
  @Override
  public int compareTo(IntPair o)
  {
    if (first != o.first){
      return first < o.first ? -1 : 1;
    }else if (second != o.second){
      return second < o.second ? -1 : 1;
    }else{
      return 0;
    }
  }
  @Override
  public int hashCode(){
    return first * 157 + second;
  }
  @Override
  public boolean equals(Object right){
    if (right == null)
      return false;
    if (this == right)
      return true;
    if (right instanceof IntPair){
      IntPair r = (IntPair) right;
      return r.first == first && r.second == second;
    }else{
      return false;
    }
  }
  public int getFirst(){
    return first;
  }
  public int getSecond(){
    return second;
  }
}
package com.buaa;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
/** 
* @ProjectName SecondarySort
* @PackageName com.buaa
* @ClassName SecondarySort
* @Description TODO
* @Author 刘吉超
* @Date 2016-06-07 22:40:37
*/
@SuppressWarnings("deprecation")
public class SecondarySort {
  public static class Map extends Mapper<LongWritable, Text, IntPair, IntWritable> {
    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
      String line = value.toString();
      StringTokenizer tokenizer = new StringTokenizer(line);
      int left = 0;
      int right = 0;
      if (tokenizer.hasMoreTokens()) {
        left = Integer.parseInt(tokenizer.nextToken());
        if (tokenizer.hasMoreTokens())
          right = Integer.parseInt(tokenizer.nextToken());
        context.write(new IntPair(left, right), new IntWritable(right));
      }
    }
  }
  /*
   * 自定义分区函数类FirstPartitioner,根据 IntPair中的first实现分区
   */
  public static class FirstPartitioner extends Partitioner<IntPair, IntWritable>{
    @Override
    public int getPartition(IntPair key, IntWritable value,int numPartitions){
      return Math.abs(key.getFirst() * 127) % numPartitions;
    }
  }
  /*
   * 自定义GroupingComparator类,实现分区内的数据分组
   */
  @SuppressWarnings("rawtypes")
  public static class GroupingComparator extends WritableComparator{
    protected GroupingComparator(){
      super(IntPair.class, true);
    }
    @Override
    public int compare(WritableComparable w1, WritableComparable w2){
      IntPair ip1 = (IntPair) w1;
      IntPair ip2 = (IntPair) w2;
      int l = ip1.getFirst();
      int r = ip2.getFirst();
      return l == r ? 0 : (l < r ? -1 : 1);
    }
  }
  public static class Reduce extends Reducer<IntPair, IntWritable, Text, IntWritable> {
    public void reduce(IntPair key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
      for (IntWritable val : values) {
        context.write(new Text(Integer.toString(key.getFirst())), val);
      }
    }
  }
  public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
    // 读取配置文件
    Configuration conf = new Configuration();
    // 判断路径是否存在,如果存在,则删除  
    Path mypath = new Path(args[1]); 
    FileSystem hdfs = mypath.getFileSystem(conf); 
    if (hdfs.isDirectory(mypath)) { 
      hdfs.delete(mypath, true); 
    } 
    Job job = new Job(conf, "secondarysort");
    // 设置主类
    job.setJarByClass(SecondarySort.class);
    // 输入路径
    FileInputFormat.setInputPaths(job, new Path(args[0]));
    // 输出路径
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    // Mapper
    job.setMapperClass(Map.class);
    // Reducer
    job.setReducerClass(Reduce.class);
    // 分区函数
    job.setPartitionerClass(FirstPartitioner.class);
    // 本示例并没有自定义SortComparator,而是使用IntPair中compareTo方法进行排序 job.setSortComparatorClass();
    // 分组函数
    job.setGroupingComparatorClass(GroupingComparator.class);
    // map输出key类型
    job.setMapOutputKeyClass(IntPair.class);
    // map输出value类型
    job.setMapOutputValueClass(IntWritable.class);
    // reduce输出key类型
    job.setOutputKeyClass(Text.class);
    // reduce输出value类型
    job.setOutputValueClass(IntWritable.class);
    // 输入格式
    job.setInputFormatClass(TextInputFormat.class);
    // 输出格式
    job.setOutputFormatClass(TextOutputFormat.class);
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}
      

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:http://www.heiqu.com/1675.html