MapReduce编程模型

MapReduce编程模型

  • Hadoop架构图

    Hadoop由HDFS分布式存储、MapReduce分布式计算、Yarn资源调度三部分组成

  • MapReduce是采用一种分而治之的思想设计出来的分布式计算框架
  • MapReduce由两个阶段组成:
    • Map阶段(切分成一个个小的任务)
    • Reduce阶段(汇总小任务的结果)
  • 那什么是分而治之呢?
    • 比如一复杂、计算量大、耗时长的的任务,暂且称为“大任务”;
    • 此时使用单台服务器无法计算或较短时间内计算出结果时,可将此大任务切分成一个个小的任务,小任务分别在不同的服务器上并行的执行
    • 最终再汇总每个小任务的结果

Map阶段

  • map阶段有一个关键的map()函数;
  • 此函数的输入是键值对
  • 输出是一系列键值对,输出写入本地磁盘

Reduce阶段

  • reduce阶段有一个关键的函数reduce()函数

  • 此函数的输入也是键值对(即map的输出(kv对))

  • 输出也是一系列键值对,结果最终写入HDFS

Map&Reduce

MapReduce编程示例

  • MapReduce的词频统计为例:统计一批英文文章当中,每个单词出现的总次数

MapReduce原理图

  • Map阶段
    • 假设MR的输入文件“Gone With The Wind”有三个block;block1、block2、block3
    • MR编程时,每个block对应一个分片split
    • 每一个split对应一个map任务(map task)
    • 如图共3个map任务(map1、map2、map3);这3个任务的逻辑一样,所以以第一个map任务(map1)为例分析
    • map1读取block1的数据;一次读取block1的一行数据;
      • 产生键值对(key/value),作为map()的参数传入,调用map();
      • 假设当前所读行是第一行
      • 将当前所读行的行首相对于当前block开始处的字节偏移量作为key(0)
      • 当前行的内容作为value(Dear Bear River)
    • map()内
      • (按需求,写业务代码),将value当前行内容按空格切分,得到三个单词Dear | Bear | River
      • 将每个单词变成键值对,输出出去(Dear, 1) | (Bear, 1) | (River, 1);最终结果写入map任务所在节点的本地磁盘中(内里还有细节,讲到shuffle时,再细细展开)
      • block的第一行的数据被处理完后,接着处理第二行;逻辑同上
      • 当map任务将当前block中所有的数据全部处理完后,此map任务即运行结束
    • 其它的每一个map任务都是如上逻辑,不再赘述
  • Reduce阶段
    • reduce任务(reduce task)的个数由自己写的程序编程指定,main()内的job.setNumReduceTasks(4)指定reduce任务是4个(reduce1、reduce2、reduce3、reduce4)
    • 每一个reduce任务的逻辑一下,所以以第一个reduce任务(reduce1)为例分析
    • map1任务完成后,reduce1通过网络,连接到map1,将map1输出结果中属于reduce1的分区的数据,通过网络获取到reduce1端(拷贝阶段)
    • 同样也如此连接到map2、map3获取结果
    • 最终reduce1端获得4个(Dear, 1)键值对;由于key键相同,它们分到同一组;
    • 4个(Dear, 1)键值对,转换成[Dear, Iterable(1, 1, 1, )],作为两个参数传入reduce()
    • 在reduce()内部,计算Dear的总数为4,并将(Dear, 4)作为键值对输出
    • 每个reduce任务最终输出文件(内里还有细节,讲到shuffle时,再细细展开),文件写入到HDFS

MR中key的作用

  • MapReduce编程中,key有特殊的作用
    • ①数据中,若要针对某个值进行分组、聚合时,需将此值作为MR中的reduce的输入的key

    • 如当前的词频统计例子,按单词进行分组,每组中对出现次数做聚合(计算总和);所以需要将每个单词作为reduce输入的key,MapReduce框架自动按照单词分组,进而求出每组即每个单词的总次数

    • ②另外,key还具有可排序的特性,因为MR中的key类需要实现WritableComparable接口;而此接口又继承Comparable接口

    • MR编程时,要充分利用以上两点;结合实际业务需求,设置合适的key

map - reduce代码

Mapper代码

/**
 * 类Mapper<LongWritable, Text, Text, IntWritable>的四个泛型分别表示
 * map方法的输入的键的类型kin、值的类型vin;输出的键的类型kout、输出的值的类型vout
 * kin指的是当前所读行行首相对于split分片开头的字节偏移量,所以是long类型,对应序列化类型LongWritable
 * vin指的是当前所读行,类型是String,对应序列化类型Text
 * kout根据需求,输出键指的是单词,类型是String,对应序列化类型是Text
 * vout根据需求,输出值指的是单词的个数,1,类型是int,对应序列化类型是IntWritable
 */
public class WordCountMap extends Mapper<LongWritable, Text, Text, IntWritable> {
    /**
     * 处理分片split中的每一行的数据;针对每行数据,会调用一次map方法
     * 在一次map方法调用时,从一行数据中,获得一个个单词word,再将每个单词word变成键值对形式(word, 1)输出出去
     * 输出的值最终写到本地磁盘中
     * @param key 当前所读行行首相对于split分片开头的字节偏移量
     * @param value  当前所读行
     */
    public void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
            context.write(new Text(word), new IntWritable(1));
        }
    }
}

Reducer代码

/**
 * Reducer<Text, IntWritable, Text, IntWritable>的四个泛型分别表示
 * reduce方法的输入的键的类型kin、输入值的类型vin;输出的键的类型kout、输出的值的类型vout
 * 注意:因为map的输出作为reduce的输入,所以此处的kin、vin类型分别与map的输出的键类型、值类型相同
 * kout根据需求,输出键指的是单词,类型是String,对应序列化类型是Text
 * vout根据需求,输出值指的是每个单词的总个数,类型是int,对应序列化类型是IntWritable
 */
public class WordCountReduce extends Reducer<Text, IntWritable, Text, IntWritable> {
    public void reduce(Text key, Iterable<IntWritable> values,
                          Context context) throws IOException, InterruptedException {
        //定义变量,用于累计当前单词出现的次数
        int sum = 0;
        for (IntWritable count : values) {
            //从count中获得值,累加到sum中
            sum += count.get();
        }
        //将单词、单词次数,分别作为键值对,输出
        context.write(key, new IntWritable(sum));// 输出最终结果
    };
}

2.4.3 Main程序入口

Job job = Job.getInstance(configuration, WordCountMain.class.getSimpleName());
//设置job的jar包,如果参数指定的类包含在一个jar包中,则此jar包作为job的jar包; 参数class跟主类在一个工程即可;一般设置成主类
job.setJarByClass(WordCountMain.class);

//通过job设置输入/输出格式
//MR的默认输入格式是TextInputFormat,输出格式是TextOutputFormat;所以下两行可以注释掉
//        job.setInputFormatClass(TextInputFormat.class);
//        job.setOutputFormatClass(TextOutputFormat.class);

//设置输入/输出路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

//设置处理Map阶段的自定义的类
job.setMapperClass(WordCountMap.class);
//设置map combine类,减少网路传出量
job.setCombinerClass(WordCountReduce.class);
//设置处理Reduce阶段的自定义的类
job.setReducerClass(WordCountReduce.class);

//注意:如果map、reduce的输出的kv对类型一致,直接设置reduce的输出的kv对就行;如果不一样,需要分别设置map, reduce的输出的kv类型
//注意:此处设置的map输出的key/value类型,一定要与自定义map类输出的kv对类型一致;否则程序运行报错
// job.setMapOutputKeyClass(Text.class);
// job.setMapOutputValueClass(IntWritable.class);

//设置reduce task最终输出key/value的类型
//注意:此处设置的reduce输出的key/value类型,一定要与自定义reduce类输出的kv对类型一致;否则程序运行报错
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

// 提交作业
job.waitForCompletion(true);

运行 / 查看

# 查看运行情况 -> job: http://node01:8088
# outpath -> http://node01:50070
hadoop jar [jar path] [main class path] /inpath /outpath

Shuffle

map端

  • 每个map任务都有一个对应的环形内存缓冲区;输出是kv对,先写入到环形缓冲区(默认大小100M),当内容占据80%缓冲区空间后,由一个后台线程将缓冲区中的数据溢出写到一个磁盘文件
  • 在溢出写的过程中,map任务可以继续向环形缓冲区写入数据;但是若写入速度大于溢出写的速度,最终造成100m占满后,map任务会暂停向环形缓冲区中写数据的过程;只执行溢出写的过程;直到环形缓冲区的数据全部溢出写到磁盘,才恢复向缓冲区写入
  • 后台线程溢写磁盘过程,有以下几个步骤:
    • 先对每个溢写的kv对做分区;分区的个数由MR程序的reduce任务数决定;默认使用HashPartitioner计算当前kv对属于哪个分区;计算公式:(key.hashCode() & Integer.MAX_VALUE) % numReduceTasks
    • 每个分区中,根据kv对的key做内存中排序;
    • 若设置了map端本地聚合combiner,则对每个分区中,排好序的数据做combine操作;
    • 若设置了对map输出压缩的功能,会对溢写数据压缩
  • 随着不断的向环形缓冲区中写入数据,会多次触发溢写(每当环形缓冲区写满100m),本地磁盘最终会生成多个溢出文件
  • 合并溢写文件:在map task完成之前,所有溢出文件会被合并成一个大的溢出文件;且是已分区、已排序的输出文件
  • 小细节:
    • 在合并溢写文件时,如果至少有3个溢写文件,并且设置了map端combine的话,会在合并的过程中触发combine操作;
    • 但是若只有2个或1个溢写文件,则不触发combine操作(因为combine操作,本质上是一个reduce,需要启动JVM虚拟机,有一定的开销)

reduce端

  • reduce task会在每个map task运行完成后,通过HTTP获得map task输出中,属于自己的分区数据(许多kv对)

  • 如果map输出数据比较小,先保存在reduce的jvm内存中,否则直接写入reduce磁盘

  • 一旦内存缓冲区达到阈值(默认0.66)或map输出数的阈值(默认1000),则触发归并merge,结果写到本地磁盘

  • 若MR编程指定了combine,在归并过程中会执行combine操作

  • 随着溢出写的文件的增多,后台线程会将它们合并大的、排好序的文件

  • reduce task将所有map task复制完后,将合并磁盘上所有的溢出文件

  • 默认一次合并10个

  • 最后一批合并,部分数据来自内存,部分来自磁盘上的文件

  • 进入“归并、排序、分组阶段”

  • 每组数据调用一次reduce方法

自定义Partitioner

  • HashPartitioner
public class HashPartitioner<K2, V2> implements Partitioner<K2, V2> {
  public int getPartition(K2 key, V2 value, int numReduceTasks) {
    // numReduceTasks : reduce个数,可设置
    return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
  }
}

自定义Combiner

// 实际上Combiner就是reduce操作,需要设置 
job.setReducerClass(CustomReduce.class);
job.setCombinerClass(CustomReduce.class);  // open combiner
  • map端combine本地聚合(本质是reduce
  • 不论运行多少次Combine操作,都不能影响最终的结果
  • 并非所有的mr都适合combine操作,比如求平均值

  • 当每个map任务的环形缓冲区添满80%,开始溢写磁盘文件

  • 此过程会分区、每个分区内按键排序、再combine操作(若设置了combine的话)、若设置map输出压缩的话则再压缩

    • 在合并溢写文件时,如果至少有3个溢写文件,并且设置了map端combine的话,会在合并的过程中触发combine操作;
    • 但是若只有2个或1个溢写文件,则不触发combine操作(因为combine操作,本质上是一个reduce,需要启动JVM虚拟机,有一定的开销)
  • combine本质上也是reduce;因为自定义的combine类继承自Reducer父类

  • map: (K1, V1) -> list(K2, V2)

  • combiner: (K2, list(V2)) -> (K2, V2)

  • reduce: (K2, list(V2)) -> (K3, V3)

    • reduce函数与combine函数通常是一样的
    • K3与K2类型相同;
    • V3与V2类型相同
    • 即reduce的输入的kv类型分别与输出的kv类型相同

mr设置压缩

//开启map输出进行压缩的功能
configuration.set("mapreduce.map.output.compress", "true");
//设置map输出的压缩算法是:BZip2Codec,它是hadoop默认支持的压缩算法,且支持切分
configuration.set("mapreduce.map.output.compress.codec", "org.apache.hadoop.io.compress.BZip2Codec");
//开启job输出压缩功能
configuration.set("mapreduce.output.fileoutputformat.compress", "true");
//指定job输出使用的压缩算法
configuration.set("mapreduce.output.fileoutputformat.compress.codec", "org.apache.hadoop.io.compress.BZip2Codec");

自定义InputFormat

MapReduce执行过程

  • 上图也描述了mapreduce的一个完整的过程;我们主要看map任务是如何从hdfs读取分片数据的部分

    • 涉及3个关键的类

    • ①InputFormat输入格式类

      ②InputSplit输入分片类:getSplits()

      • InputFormat输入格式类将输入文件分成一个个分片InputSplit
      • 每个Map任务对应一个split分片

      ③RecordReader记录读取器类:createRecordReader()

      • RecordReader(记录读取器)读取分片数据,一行记录生成一个键值对
      • 传入map任务的map()方法,调用map()
  • 详细流程:

    • 客户端调用InputFormat的getSplits()方法,获得输入文件的分片信息

      public abstract class InputFormat<K, V> {
          public abstract List<InputSplit> getSplits(JobContext var1);
      }
      
    • 针对每个MR job会生成一个相应的app master,负责map 、 reduce任务的调度及监控执行情况

    • 将分片信息传递给MR job的app master

    • app master根据分片信息,尽量将map任务尽量调度在split分片数据所在节点(移动计算不移动数据

      public abstract class InputSplit {
          public abstract String[] getLocations() ;
      }
      
    • 有几个分片,就生成几个map任务

    • 每个map任务将split分片传递给createRecordReader()方法,生成此分片对应的RecordReader

    • RecordReader用来读取分片的数据,生成记录的键值对

      • nextKeyValue()判断是否有下一个键值对,如果有,返回true;否则,返回false
      • 如果返回true,调用getCurrentKey()获得当前的键
      • 调用getCurrentValue()获得当前的值
    • map任务运行过程

      // mapper
      public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
            // 1. map任务运行时,会调用run()
          public void run(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
            // 2. 首先运行一次setup()方法;只在map任务启动时,运行一次;一些初始化的工作可以在setup方法中完成;如要连接数据库之类的操作
              this.setup(context);
            // 3. while循环,调用context.nextKeyValue();会委托给RecordRecord的nextKeyValue(),判断是否有下一个键值对
            // 当读取分片尾,context.nextKeyValue()返回false;退出循环
              while(context.nextKeyValue()) {
                    //4.  如果有下一个键值对,调用context.getCurrentKey()、context.getCurrentValue()获得当前的键、值的值(也是调用RecordReader的同名方法[见5])
                  this.map(context.getCurrentKey(), context.getCurrentValue(), context);
              }
                //6. 调用cleanup()方法,只在map任务结束之前,调用一次;所以,一些回收资源的工作可在此方法中实现,如关闭数据库连接
              this.cleanup(context);
          }
        // 5. - 作为参数传入map(key, value, context),调用一次map()
        protected void map(KEYIN key, VALUEIN value, Mapper.Context context){
              context.write(key, value);
          }
      }
      
      // recordReader
      public abstract class RecordReader<KEYIN, VALUEIN> implements Closeable {
          public abstract void initialize(InputSplit var1, TaskAttemptContext var2);
          public abstract boolean nextKeyValue();
          public abstract KEYIN getCurrentKey();
          public abstract VALUEIN getCurrentValue();
          public abstract float getProgress();
          public abstract void close();
      }
      

示例代码

  • 小文件的优化无非以下几种方式:

    • 在数据采集的时候,就将小文件或小批数据合成大文件再上传HDFS(SequenceFile方案)
    • 在业务处理之前,在HDFS上使用mapreduce程序对小文件进行合并;可使用自定义InputFormat实现
    • 在mapreduce处理时,可采用CombineFileInputFormat提高效率
  • 自定义InputFormat

    /**
     * 自定义InputFormat类;
     * 泛型:
     *  键:因为不需要使用键,所以设置为NullWritable
     *  值:值用于保存小文件的内容,此处使用BytesWritable
     */
    public class WholeFileInputFormat extends FileInputFormat<NullWritable, BytesWritable> {
           // 返回false,表示输入文件不可切割
        protected boolean isSplitable(JobContext context, Path file) {
            return false;
        }
        // 生成读取分片split的RecordReader
        public RecordReader<NullWritable, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException,InterruptedException {
            WholeFileRecordReader reader = new WholeFileRecordReader();
              // split传如WholeFileRecordReader进行读取,组装value
            reader.initialize(split, context);
        }
    }
    
  • 自定义RecordReader

    public class WholeFileRecordReader extends RecordReader<NullWritable, BytesWritable> {
        private BytesWritable value = new BytesWritable();
        @Override
        public boolean nextKeyValue(){
           value.set(splitBytes, 0, splitBytes.length);
        }
    }
    

自定义OutputFormat

  • 输出结果到不同目录
public class MyOutPutFormat extends FileOutputFormat<Text, NullWritable> {
    public RecordWriter getRecordWriter(TaskAttemptContext context){
        // 两个输出文件路径
        FSDataOutputStream badOut = fs.create(badPath);
        FSDataOutputStream goodOut = fs.create(goodPath);
        return new MyRecordWriter(badOut,goodOut);
    }
    static class MyRecordWriter extends RecordWriter<Text, NullWritable>{
        public void write(Text key, NullWritable value){
            if
                 goodOut.write();
            else
              badOut.write();
        }
    }
}
// 设置自定义的输出类
job.setOutputFormatClass(MyOutPutFormat.class);
// 设置一个输出目录,这个目录会输出一个success的成功标志的文件
MyOutPutFormat.setOutputPath(job, new Path(args[1]));

二次排序

  • hadoop自带的key类型无法满足需求,自定义key

    • 实现WritableComparable接口
    • 实现compareTo比较方法
    • 实现write序列化方法
    • 实现readFields反序列化方法
  • 示例代码
//根据输入文件格式,定义JavaBean,作为MR时,Map的输出key类型;要求此类可序列化、可比较
public class Person implements WritableComparable<Person> {
    private String name;
    private int age;
    private int salary;

    public Person() {}

    //两个Person对象的比较规则:①先比较salary,高的排序在前;②若相同,age小的在前
    public int compareTo(Person other) {}

    //序列化,将NewKey转化成使用流传送的二进制
    public void write(DataOutput dataOutput) throws IOException {}

    //使用in读字段的顺序,要与write方法中写的顺序保持一致:name、age、salary
    public void readFields(DataInput dataInput) throws IOException {}
}
job.setOutputKeyClass(Person.class);

知识点小例子

  • 现有一个淘宝用户订单历史记录文件;每条记录有6个字段,分别表示

    • userid、datetime、title商品标题、unitPrice商品单价、purchaseNum购买量、productId商品ID
  • 现使用MR编程,求出每个用户、每个月消费金额最多的两笔订单,花了多少钱

    • 所以得相同用户、同一个年月的数据,分到同一组

逻辑分析

  • 根据文件格式,自定义JavaBean类OrderBean
    • 实现WritableComparable接口
    • 包含6个字段分别对应文件中的6个字段
    • 重点实现compareTo方法
      • 先比较userid是否相等;若不相等,则userid升序排序
      • 若相等,比较两个Bean的日期是否相等;若不相等,则日期升序排序
      • 若相等,再比较总开销,降序排序
    • 实现序列化方法write()
    • 实现反序列化方法readFields()
  • 自定义分区类
    • 继承Partitioner类
    • getPartiton()实现,userid相同的,处于同一个分区
  • 自定义Mapper类
    • 输出key是当前记录对应的Bean对象
    • 输出的value对应当前下单的总开销
  • 自定义分组类
    • 决定userid相同、日期(年月)相同的记录,分到同一组中,调用一次reduce()
  • 自定义Reduce类
    • reduce()中求出当前一组数据中,开销头两笔的信息
  • main方法
    • job.setMapperClass
    • job.setPartitionerClass
    • job.setReducerClass
    • job.setGroupingComparatorClass

示例代码

  • OrderBean
public class OrderBean implements WritableComparable<OrderBean> {

    //用户ID 等字段
    private String userid;
    public OrderBean() {}

    //key的比较规则
    public int compareTo(OrderBean other) {}
    // 序列化
    public void write(DataOutput dataOutput) throws IOException {}
        // 反序列化
    public void readFields(DataInput dataInput) throws IOException { }

    /**
     * 使用默认分区器,那么userid相同的,落入同一分区;
     * 另外一个方案:此处不覆写hashCode方法,而是自定义分区器,getPartition方法中,对OrderBean的userid求hashCode值%reduce任务数
     */
//    public int hashCode() {
//        return this.userid.hashCode();
//    }
}

  • MyPartitioner
//mapper的输出key类型是自定义的key类型OrderBean;输出value类型是单笔订单的总开销double -> DoubleWritable
public class MyPartitioner extends Partitioner<OrderBean, DoubleWritable> {
    @Override
    public int getPartition{
        //userid相同的,落入同一分区
        return (orderBean.getUserid().hashCode() & Integer.MAX_VALUE) % numReduceTasks;
    }
}

  • MyMapper
public class MyMapper extends Mapper<LongWritable, Text, OrderBean, DoubleWritable> {
    protected void map(LongWritable key, Text value, Context context){
            // 生成OrderBean对象
            OrderBean orderBean = getOrderBean();
            context.write(orderBean, valueOut);
        }
    }
}
  • MyReducer
public class MyReducer extends Reducer<OrderBean, DoubleWritable, Text, DoubleWritable> {
    /**
     * ①由于自定义分组逻辑,相同用户、相同年月的订单是一组,调用一次reduce();
     * ②由于自定义的key类OrderBean中,比较规则compareTo规定,相同用户、相同年月的订单,按总金额降序排序
     * 所以取出头两笔,就实现需求
     */
    @Override
    protected void reduce(OrderBean key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException {
        //求每个用户、每个月、消费金额最多的两笔多少钱
        int num = 0;
        for(DoubleWritable value: values) {
            if(num < 2) {
                String keyOut = key.getUserid() + "  " + key.getDatetime();
                context.write(new Text(keyOut), value);
                num++;
            } else {
                break;
            }
        }

    }
}
  • MyGroup
//自定义分组类:reduce端调用reduce()前,对数据做分组;每组数据调用一次reduce()
public class MyGroup extends WritableComparator {
      // 注意: 分组实现的方法是这个
    public int compare(WritableComparable a, WritableComparable b) {
        //userid、年、月相同的,作为一组
        int ret1 = aUserId.compareTo(bUserId);
        if(ret1 == 0) {//同一用户
            //年月也相同返回0,在同一组;
            return aOrderBean.getDatetime().compareTo(bOrderBean.getDatetime());
        } else {
            return ret1;
        }
    }
}

  • CustomGroupingMain
//设置处理Map阶段的自定义的类
job.setMapperClass(MyMapper.class);
//设置map combine类,减少网路传出量
//job.setCombinerClass(MyReducer.class);
job.setPartitionerClass(MyPartitioner.class);
//设置处理Reduce阶段的自定义的类
job.setReducerClass(MyReducer.class);
job.setGroupingComparatorClass(MyGroup.class);

MapReduce数据倾斜

  • 什么是数据倾斜?
    • 数据中不可避免地会出现离群值(outlier),并导致数据倾斜。这些离群值会显著地拖慢MapReduce的执行。
  • 常见的数据倾斜有以下几类:

    • 数据频率倾斜——某一个区域的数据量要远远大于其他区域。比如某一个key对应的键值对远远大于其他键的键值对。
    • 数据大小倾斜——部分记录的大小远远大于平均值。
  • 在map端和reduce端都有可能发生数据倾斜

    • 在map端的数据倾斜可以考虑使用combine
    • 在reduce端的数据倾斜常常来源于MapReduce的默认分区器
  • 数据倾斜会导致map和reduce的任务执行时间大为延长,也会让需要缓存数据集的操作消耗更多的内存资源

诊断是否存在数据倾斜

  • 发现倾斜数据之后,有必要诊断造成数据倾斜的那些键。有一个简便方法就是在代码里实现追踪每个键的最大值
  • 为了减少追踪量,可以设置数据量阀值,只追踪那些数据量大于阀值的键,并输出到日志中。实现代码如下
  • 运行作业后就可以从日志中判断发生倾斜的键以及倾斜程度;跟踪倾斜数据是了解数据的重要一步,也是设计MapReduce作业的重要基础
public class WordCountReduce extends Reducer<Text, IntWritable, Text, IntWritable> {

  private int maxValueThreshold;

  @Override
  protected void setup(Context context) throws IOException, InterruptedException {

    //一个键达到多少后,会做数据倾斜记录
    maxValueThreshold = 10000;
  }

  public void reduce(Text key, Iterable<IntWritable> values,
                     Context context) throws IOException, InterruptedException {
    int sum = 0;
    //用于记录键出现的次数
    int i = 0;

    for (IntWritable count : values) {
      sum += count.get();
      i++;
    }

    //如果当前键超过10000个,则打印日志
    if(i > maxValueThreshold) {
      LOGGER.info("Received " + i + " values for key " + key);
    }

    context.write(key, new IntWritable(sum));// 输出最终结果
  };
}

减缓数据倾斜

  • Reduce数据倾斜一般是指map的输出数据中存在数据频率倾斜的状况,即部分输出键的数据量远远大于其它的输出键

  • 如何减小reduce端数据倾斜的性能损失?常用方式有:

    • 自定义分区

      • 基于输出键的背景知识进行自定义分区。

      • 例如,如果map输出键的单词来源于一本书。其中大部分必然是省略词(stopword)。那么就可以将自定义分区将这部分省略词发送给固定的一部分reduce实例。而将其他的都发送给剩余的reduce实例。

    • Combine

      • 使用Combine可以大量地减小数据频率倾斜和数据大小倾斜。
      • combine的目的就是聚合并精简数据。
    • 抽样和范围分区

      • Hadoop默认的分区器是HashPartitioner,基于map输出键的哈希值分区。这仅在数据分布比较均匀时比较好。在有数据倾斜时就很有问题

      • 使用分区器需要首先了解数据的特性。TotalOrderPartitioner中,可以通过对原始数据进行抽样得到的结果集来预设分区边界值

      • TotalOrderPartitioner中的范围分区器可以通过预设的分区边界值进行分区。因此它也可以很好地用在矫正数据中的部分键的数据倾斜问题。
    • 数据大小倾斜的自定义策略

      • 在map端或reduce端的数据大小倾斜都会对缓存造成较大的影响,乃至导致OutOfMemoryError异常。处理这种情况并不容易。可以参考以下方法。

      • 设置mapreduce.input.linerecordreader.line.maxlength来限制RecordReader读取的最大长度。

      • RecordReader在TextInputFormat和KeyValueTextInputFormat类中使用。默认长度没有上限。

抽样分区案例

  • 使用全排序分区器TotalOrderPartitioner

//分区器:全局排序分区器
job.setPartitionerClass(TotalOrderPartitioner.class);

/**
     * 随机采样器从所有的分片中采样
     * 每一个参数:采样率;
     * 第二个参数:总的采样数
     * 第三个参数:采样的最大分区数;
     * 只要numSamples和maxSplitSampled(第二、第三参数)任一条件满足,则停止采样
     */
InputSampler.Sampler<IntWritable, Text> sampler =
  new InputSampler.RandomSampler<IntWritable, Text>(0.1, 5000, 10);
//    TotalOrderPartitioner.setPartitionFile();
/**
     * 存储定义分区的键;即整个数据集中温度的大致分布情况;
     * 由TotalOrderPartitioner读取,作为全排序的分区依据,让每个分区中的数据量近似
     */
InputSampler.writePartitionFile(job, sampler);

// 根据上边的SequenceFile文件(包含键的近似分布情况),创建分区
String partitionFile = TotalOrderPartitioner.getPartitionFile(job.getConfiguration());
URI partitionUri = new URI(partitionFile);

//与所有map任务共享此文件,添加到分布式缓存中
DistributedCache.addCacheFile(partitionUri, job.getConfiguration());
// job.addCacheFile(partitionUri);

示例代码