IT技术博客大学习 共学习 共进步

在Hadoop中提升task的启动速度

搜索技术博客-淘宝 2012-05-10 23:42:23 浏览 2,123 次

    在增量DUMP过程中,我们的job比较小,但是启动非常频繁,每个job的执行时间短,通过执行的日志发现,有时会出现一个job的启动时间很长,需要几十秒。由于我们很看重增量的速度,所以几十秒的等待是不可接受的。

分析:

    我们当时使用的Hadoop CDH3 Beta4 的版本。通过ganglia图表分析,出问题的tasktracker会出现一些流量的凸起。但是离带宽限制还很远。通过仔细分析TaskTracker的日志发现,Child子进程启动过程中,存在等待的问题。经过分析源码,Child子进程在启动过程是在一个线程中串行完成,启动过程包括了distributedcache文件的获取。由于Hadoop集群同时可能有各种各样的任务提交,所以当某个task的启动时间长,主要是下载distributedcache文件时间长,会影响下一个task的启动,严重的时候会影响tasktracker发送心跳。

处理方案1:

    找出这些比较大的distributedcache,通过命令hadoop dfs -setrep [-R]   设置更多的备份块,让网络负载能更均匀一些,这样对于当台机器而言,可以减少网络的out 流量,但是不能减少in流量,这样修改可以加快下载distributedcache的下载速度,缓解启动慢的问题,但是不能根本解决这个问题,而且当再次出现大的distributedcache任务时,问题还会重现。

处理方案2:

    修改tasktracker的工作方式,最简单的办法,是让每个task各自通过一个task线程来启动,避免各个task之间的竞争,从根本上解决这个问题。代码如下:

  class StartNewTask extends Thread {
    TaskInProgress tip = null;

    public StartNewTask(TaskInProgress tip) {
      this.tip = tip;
    }

    public void run() {
      try {
        LOG.debug("(StartNewTask start) tip="   tip.getTask().toString());
        startNewTask(tip);
        LOG.debug("(StartNewTask finish) tip="   tip.getTask().toString());
      } catch (InterruptedException e) {
        return;
      } catch (Throwable th) {
        LOG.error("TaskLauncher error "   StringUtils.stringifyException(th));
      }
    }
  }

    并且在TaskLauncher.run()中增加 new StartNewTask(tip).start()。

    但是在解决过程这个问题的过程中,带入了新的问题,TaskTracker中关于正在运行的Job有两种锁,一个是runningJobs,用来锁住对Map runningJobs 的访问,这个HashMap里放的是正在运行的所有job。 另一个是rjob,用来存放单个job的信息。 由于之前的代码中,是先获得runningJobs锁,再获得rjob锁,最后再释放runningJobs锁, 代码如下:

private RunningJob addTaskToJob(JobID jobId,
                                  TaskInProgress tip) {
    synchronized (runningJobs) {
      RunningJob rJob = null;
      if (!runningJobs.containsKey(jobId)) {
        rJob = new RunningJob(jobId);
        rJob.tasks = new HashSet();
        runningJobs.put(jobId, rJob);
      } else {
        rJob = runningJobs.get(jobId);
      }
      synchronized (rJob) {
        rJob.tasks.add(tip);
      }
      return rJob;
    }
  }

    由于setupCache的操作是在rJob锁里完成,这样也会间接导致runningJobs一直等待rJob。这样即使task启动是多线程,也会由于别的任务在下载distributedcache,长期暂用rjob的锁,导致其他线程的runningJobs等待rjob,导致当前的task启动在无法获得runningJobs。当然也有一个暴力的方法,就是强行把runningJobs和rjob分开,只有在更新runningJobs这个Map的时候,才需要获得锁,且锁住的范围,不包括rjob。但是这样的改动范围大,而且容易出错,造成死锁。

    通过查看官方的jira https://issues.apache.org/jira/browse/MAPREDUCE-2364 这里解决的是当下载distributedcache时,去掉rjob的锁,从而使runningJobs和rjob锁住的操作中,没有长时间的任务。通过增加以下两个变量:

    volatile boolean localized;

    boolean localizing;

    这两个变量控制是否需要下载distributedcache,从而去掉rjob的锁,具体修改可以查看这里的patch。

    最终,我们的Hadoop版本,包括实现了多线程启动task和MAPREDUCE-2364的Patch。

    结论:

    通过以上2种方法,标本兼治,我们的task启动实现了没有延时,也不会相互干扰,稳定的运行增量DUMP任务。

建议继续学习

  1. Facebook的实时Hadoop系统 (阅读 11,403)
  2. hadoop rpc机制 && 将avro引入hadoop rpc机制初探 (阅读 6,082)
  3. Hadoop的map/reduce作业输入非UTF-8编码数据的处理原理 (阅读 5,546)
  4. 百度是如何使用hadoop的 (阅读 5,004)
  5. Hadoop超级安装手册 (阅读 4,662)
  6. Hadoop集群间Hadoop方案探讨 (阅读 4,443)
  7. 使用hadoop进行大规模数据的全局排序 (阅读 4,424)
  8. Hadoop安装端口已经被占用问题的解决方法 (阅读 3,882)
  9. Hadoop现有测试框架探幽 (阅读 3,803)
  10. 分布式计算平台Hadoop 发展现状乱而稳定的解读 (阅读 3,806)