IT技术博客大学习 共学习 共进步

Hadoop Job Tuning

搜索技术博客-淘宝 2010-12-29 09:15:37 浏览 3,543 次

Hadoop平台已经成为了大多数公司的分布式数据处理平台,随着数据规模的越来越大,对集群的压力也越来越大,集群的每个节点负担自然就会加重,而且集群内部的网络带宽有限,数据交换吞吐量也在面临考验,由此引发了人们对大规模数据处理进行优化的思考。

本文仅从实践经验出发,针对Hadoop Job优化提出了一些观点,不包含HDFS的优化。

Job Tracker Related

严格来说,下面这个配置项,是决定HDFS文件block数量的多少(也就是文件个数),但是它会间接的影响Job Tracker的调度和内存的占用(其实更能影响name node内存的使用)。

dfs.block.size

这个配置项定义了在HDFS上每个block的大小,它的值是以字节为单位。可以在配置文件hadoop-site.xml(Hadoop 0.20 以前版本)定义,也可以在JobConf里定义。

mapred.map.tasks.speculative.execution=true

mapred.reduce.tasks.speculative.execution=true

这两个是推测执行的配置项,当然如果你从来不关心这两个选项也没关系,它们默认值是true。

所谓的推测执行,就是当所有task都开始运行之后,Job Tracker会统计所有任务的平均进度,如果某个task所在的task node机器配置比较低或者CPU load很高(原因很多),导致任务执行比总体任务的平均执行要慢,此时Job Tracker会启动一个新的任务(duplicate task),原有任务和新任务哪个先执行完就把另外一个kill掉,这也是我们经常在Job Tracker页面看到任务执行成功,但是总有些任务被kill,就是这个原因。

mapred.child.java.opts

一般来说,都是reduce耗费内存比较大,这个选项正是用来设置JVM堆的最大可用内存,但是也不要设置太大,如果超过2G,应该考虑从程序设计角度去优化。

Map Related

Input Split的大小,决定了一个Job拥有多少个map,默认64M每个Split,如果输入的数据量巨大,那么默认的64M的block会有几万甚至几十万的Map Task,集群的网络传输会很大,最严重的是给Job Tracker的调度、队列、内存都会带来很大压力。

mapred.min.split.size

这个配置项决定了每个 Input Split的最小值,也间接决定了一个Job的map 数目。

mapred.compress.map.output

    压缩Map的输出应该作为一个习惯,这样做有两个好处:

    a)      压缩是在内存中进行,所以写入map本地磁盘的数据就会变小,大大减少了本地IO次数

    b)      Reduce从每个map节点copy数据,也会明显降低网络传输的时间

    补充:数据序列化其实效果会更好,无论是磁盘IO还是数据大小,都会明显的降低。

    io.sort.mb

    以MB为单位,默认100M,通常来看,这个值太小了。

    这个选项定义了map输出结果在内存占用buffer的大小,当buffer达到一定阈值,会启动一个后台线程来对buffer的内容进行排序,然后写入本地磁盘(一个spill文件)。

    在内存中的buffer如下图:(kvstart,kvend,kvindex为hadoop源码中操作buffer的变量):

     

    根据map输出数据量的大小,可以适当的调整buffer的大小,注意是适当的调整,不是越大越好,假设内存无限大,io.sort.mb=1024(1G), 和io.sort.mb=300 (300M),前者未必比后者快,因为1G的数据排序一次和排序3次,每次300MB,一定是后者快(分而治之的思想)

    io.sort.spill.percent

    这个值就是上述buffer的阈值,默认是0.8,既80%,当buffer中的数据达到这个阈值,后台线程会起来对buffer中已有的数据进行排序,然后写入磁盘,此时map输出的数据继续往剩余的20% buffer写数据,如果buffer的剩余20%写满,排序还没结束,map task被block等待。

    如果你确认map输出的数据基本有序(很少见),排序时间很短,可以将这个阈值适当调高,更理想的,如果你的map输出是有序的数据(基本不可能吧?),那么可以把buffer设的更大,阈值设置为1.

    Io.sort.factor

    同时打开的文件句柄的数量,默认是10.

    当一个map task执行完之后,本地磁盘上(mapred.local.dir)有若干个spill文件,map task最后做的一件事就是执行merge sort,把这些spill文件合成一个文件(partition),有时候我们会自定义partition函数,就是在这个时候被调用的。

    执行merge sort的时候,每次同时打开多少个spill文件,就是由io.sort.factor决定的。打开的文件越多,不一定merge sort就越快,所以也要根据数据情况适当的调整.

    补充:merge排序的结果是两个文件,一个是index,另一个是数据文件,index文件记录了每个不同的key在数据文件中的偏移量(这就是partition) 

 

Reduce Related

    提高Reduce的执行效率,除了在Hadoop框架方面的优化,重点还是在代码逻辑上的优化!比如:对Reduce接受到的value可能有重复的,此时如果用Java的Set或者STL的Set来达到去重的目的,那么这个程序不是扩展良好的(non-scalable),受到数据量的限制,当数据膨胀,内存势必会溢出?

    Io.sort.mb/io.sort.factor/ io.sort.spill.percent

    其作用和Map阶段的作用一样,都是控制排序时使用内存的大小、句柄数量、阈值大小

    mapred.reduce.parallel.copies

    Reduce copy数据的线程数量,默认值是5

    Reduce到每个完成的Map Task copy数据(通过RPC调用),默认同时启动5个线程到map节点取数据。这个配置还是很关键的,如果你的map输出数据很大,有时候会发现map早就100%了,reduce一直在1% 2%。。。。。。缓慢的变化,那就是copy数据太慢了,5个线程copy 10G的数据,确实会很慢,这时就要调整这个参数了,但是调整的太大,又会事半功倍,容易造成集群拥堵,所以 Job tuning的同时,也是个权衡的过程,你要熟悉你的数据!

    mapred.job.shuffle.input.buffer.percent

    当指定了JVM的堆内存最大值以后,上面这个配置项就是Reduce用来存放从Map节点取过来的数据所用的内存占堆内存的比例,默认是0.7,既70%,通常这个比例是够了,但是我们讨论的还是大数据的情况,所以这个比例还是小了一些,0.8-0.9之间比较合适。(前提是你的reduce函数不会疯狂的吃掉内存)

    mapred.job.shuffle.merge.percent(默认值0.66)

    mapred.inmem.merge.threshold(默认值1000)

    这是两个阈值的配置项,第一个指的从Map节点取数据过来,放到内存,当达到这个阈值之后,后台启动线程(通常是Linux native process)把内存中的数据merge sort,写到reduce节点的本地磁盘;

    第二个指的是从map节点取过来的文件个数,当达到这个个数之后,也进行merger sort,然后写到reduce节点的本地磁盘;这两个配置项第一个优先判断,其次才判断第二个thresh-hold。

    从实际经验来看,mapred.job.shuffle.merge.percent默认值确实太小了,完全可以设置到0.8左右;第二个默认值1000,完全取决于map输出数据的大小,如果map输出的数据很大,默认值1000反倒不好,应该小一些,如果map输出的数据不大(light weight),可以设置2000或者以上,都没问题。

后记

    大规模数据处理是云计算平台的挑战,涉及的技术难点很多:

    1: 并行优化

    2: 海量存储(HDFS存在诸多限制)【NoSQL】

    3: 大规模数据不仅仅是用来静态的处理,怎么样动起来?比如实时查询、实时更新、状态跟踪(数据的生命周期),目前开源社区的NoSQL技术还没有做到,或者很好的达到这些要求,这也是今后海量存储系统面临的技术难点。 

建议继续学习

  1. 分布式缓存系统 Memcached 入门 (阅读 16,044)
  2. Zookeeper工作原理 (阅读 11,944)
  3. Facebook的实时Hadoop系统 (阅读 11,403)
  4. GFS, HDFS, Blob File System架构对比 (阅读 10,344)
  5. Zookeeper研究和应用 (阅读 9,342)
  6. 一致性哈希算法及其在分布式系统中的应用 (阅读 9,044)
  7. 分布式日志系统scribe使用手记 (阅读 8,844)
  8. 分布式哈希和一致性哈希 (阅读 8,666)
  9. HBase技术介绍 (阅读 7,943)
  10. 分布式系统的事务处理 (阅读 7,246)