IT技术博客大学习 共学习 共进步

Storm源码浅析之topology的提交

BlogJava-庄周梦蝶 2012-01-27 18:43:09 浏览 5,581 次
    原文:http://www.blogjava.net/killme2008/archive/2011/11/17/364112.html

     作者:dennis (killme2008@gmail.com)

     转载请注明出处。

     最近一直在读twitter开源的这个分布式流计算框架——storm的源码,还是有必要记录下一些比较有意思的地方。我按照storm的主要概念进行组织,并且只分析我关注的东西,因此称之为浅析。       

    一、介绍

     Storm的开发语言主要是Java和Clojure,其中Java定义骨架,而Clojure编写核心逻辑。源码统计结果:

     180 text files.

    177 unique files.                                          

    7 files ignored.

    http://cloc.sourceforge.net v 1.55  T=1.0 s (171.0 files/s, 46869.0 lines/s)

    -------------------------------------------------------------------------------

    Language                     files          blank        comment           code

    -------------------------------------------------------------------------------

    Java                           125           5010           2414          25661

    Lisp                            33            732            283           4871

    Python                           7            742            433           4675

    CSS                              1             12             45           1837

    ruby                             2             22              0            104

    Bourne Shell                     1              0              0              6

    Javascript                       2              1             15              6

    -------------------------------------------------------------------------------

    SUM:                           171           6519           3190          37160

    -------------------------------------------------------------------------------

       Java代码25000多行,而Clojure(Lisp)只有4871行,说语言不重要再次证明是扯淡。

    二、Topology和Nimbus       

     Topology是storm的核心理念,将spout和bolt组织成一个topology,运行在storm集群里,完成实时分析和计算的任务。这里我主要想介绍下topology部署到storm集群的大概过程。提交一个topology任务到Storm集群是通过StormSubmitter.submitTopology方法提交:

StormSubmitter.submitTopology(name, conf, builder.createTopology());
    我们将topology打成jar包后,利用bin/storm这个python脚本,执行如下命令:
bin/storm jar xxxx.jar com.taobao.MyTopology args
    将jar包提交给storm集群。storm脚本会启动JVM执行Topology的main方法,执行submitTopology的过程。而submitTopology会将jar文件上传到nimbus,上传是通过socket传输。在storm这个python脚本的jar方法里可以看到:
def jar(jarfile, klass, *args):                                                                                                                               

    exec_storm_class(                                                                                                                                          

    klass,                                                                                                                                                

    jvmtype=\"-client\",                                                                                                                                    

    extrajars=[jarfile, CONF_DIR, STORM_DIR + \"/bin\"],                                                                                                    

    args=args,                                                                                                                                            

    prefix=\"export STORM_JAR=\" + jarfile + \";\")

     将jar文件的地址设置为环境变量STORM_JAR,这个环境变量在执行submitTopology的时候用到:
//StormSubmitter.java 

    private static void submitJar(Map conf) {

    if(submittedJar==null) {

    LOG.info(\"Jar not uploaded to master yet. Submitting jar\"\"\");

    String localJar = System.getenv(\"STORM_JAR\");

    submittedJar = submitJar(conf, localJar);

    } else {

    LOG.info(\"Jar already uploaded to master. Not submitting jar.\");

    }

     }

    通过环境变量找到jar包的地址,然后上传。利用环境变量传参是个小技巧。

     其次,nimbus在接收到jar文件后,存放到数据目录的inbox目录,nimbus数据目录的结构

-nimbus

    -inbox

    -stormjar-57f1d694-2865-4b3b-8a7c-99104fc0aea3.jar

    -stormjar-76b4e316-b430-4215-9e26-4f33ba4ee520.jar

    -stormdist

    -storm-id

    -stormjar.jar

    -stormconf.ser

    -stormcode.ser

     其中inbox用于存放提交的jar文件,每个jar文件都重命名为stormjar加上一个32位的UUID。而stormdist存放的是启动topology后生成的文件,每个topology都分配一个唯一的id,ID的规则是“name-计数-时间戳”。启动后的topology的jar文件名命名为storm.jar ,而它的配置经过java序列化后存放在stormconf.ser文件,而stormcode.ser是将topology本身序列化后存放的文件。这些文件在部署的时候,supervisor会从这个目录下载这些文件,然后在supervisor本地执行这些代码。

     进入重点,topology任务的分配过程(zookeeper路径说明忽略root):

    1.在zookeeper上创建/taskheartbeats/{storm id} 路径,用于任务的心跳检测。storm对zookeeper的一个重要应用就是利用zk的临时节点做存活检测。task将定时刷新节点的时间戳,然后nimbus会检测这个时间戳是否超过timeout设置。

    2.从topology中获取bolts,spouts设置的并行数目以及全局配置的最大并行数,然后产生task id列表,如[1 2 3 4]

    3.在zookeeper上创建/tasks/{strom id}/{task id}路径,并存储task信息

    4.开始分配任务(内部称为assignment), 具体步骤:

    (1)从zk上获得已有的assignment(新的toplogy当然没有了)

    (2)查找所有可用的slot,所谓slot就是可用的worker,在所有supervisor上配置的多个worker的端口。

    (3)将任务均匀地分配给可用的worker,这里有两种情况:

    (a)task数目比worker多,例如task是[1 2 3 4],可用的slot只有[host1:port1 host2:port1],那么最终是这样分配

{1: [host1:port1] 2 : [host2:port1]

    3 : [host1:port1] 4 : [host2:port1]}

,可以看到任务平均地分配在两个worker上。

    (b)如果task数目比worker少,例如task是[1 2],而worker有[host1:port1 host1:port2 host2:port1 host2:port2],那么首先会将woker排序,将不同host间隔排列,保证task不会全部分配到同一个worker上,也就是将worker排列成

[host1:port1 host2:port1 host1:port2 host2:port2]
,然后分配任务为
{1: host1:port1 , 2 : host2:port2}

    (4)记录启动时间

    (5)判断现有的assignment是否跟重新分配的assignment相同,如果相同,不需要变更,否则更新assignment到zookeeper的/assignments/{storm id}上。

    5.启动topology,所谓启动,只是将zookeeper上/storms/{storm id}对应的数据里的active设置为true。

    6.nimbus会检查task的心跳,如果发现task心跳超过超时时间,那么会重新跳到第4步做re-assignment。

    

dennis 2011-12-01 21:48 发表评论

建议继续学习

  1. Storm:最火的流式处理框架 (阅读 7,281)
  2. Storm源码浅析之topology的提交 (阅读 5,761)
  3. storm入门教程 第一章 前言 (阅读 5,020)
  4. storm集群的监控 (阅读 4,182)
  5. Storm入门教程 第二章 构建Topology (阅读 4,100)
  6. storm常见问题解答 (阅读 3,881)
  7. 玩转CPU Topology (阅读 3,582)
  8. Storm配置项详解 (阅读 3,541)
  9. storm集群的监控 (阅读 3,541)
  10. storm入门教程 第四章 消息的可靠处理 (阅读 3,102)