IT技术博客大学习 共学习 共进步
全部 移动开发 后端 数据库 AI 算法 安全 DevOps 前端 设计 开发者

Storm源码浅析之topology的提交

BlogJava-庄周梦蝶 2012-01-27 18:43:09 累计浏览 5,722 次
本机暂存
    原文:http://www.blogjava.net/killme2008/archive/2011/11/17/364112.html

     作者:dennis (killme2008@gmail.com)

     转载请注明出处。

     最近一直在读twitter开源的这个分布式流计算框架——storm的源码,还是有必要记录下一些比较有意思的地方。我按照storm的主要概念进行组织,并且只分析我关注的东西,因此称之为浅析。       

    一、介绍

     Storm的开发语言主要是Java和Clojure,其中Java定义骨架,而Clojure编写核心逻辑。源码统计结果:

     180 text files.

    177 unique files.                                          

    7 files ignored.

    http://cloc.sourceforge.net v 1.55  T=1.0 s (171.0 files/s, 46869.0 lines/s)

    -------------------------------------------------------------------------------

    Language                     files          blank        comment           code

    -------------------------------------------------------------------------------

    Java                           125           5010           2414          25661

    Lisp                            33            732            283           4871

    Python                           7            742            433           4675

    CSS                              1             12             45           1837

    ruby                             2             22              0            104

    Bourne Shell                     1              0              0              6

    Javascript                       2              1             15              6

    -------------------------------------------------------------------------------

    SUM:                           171           6519           3190          37160

    -------------------------------------------------------------------------------

       Java代码25000多行,而Clojure(Lisp)只有4871行,说语言不重要再次证明是扯淡。

    二、Topology和Nimbus       

     Topology是storm的核心理念,将spout和bolt组织成一个topology,运行在storm集群里,完成实时分析和计算的任务。这里我主要想介绍下topology部署到storm集群的大概过程。提交一个topology任务到Storm集群是通过StormSubmitter.submitTopology方法提交:

StormSubmitter.submitTopology(name, conf, builder.createTopology());
    我们将topology打成jar包后,利用bin/storm这个python脚本,执行如下命令:
bin/storm jar xxxx.jar com.taobao.MyTopology args
    将jar包提交给storm集群。storm脚本会启动JVM执行Topology的main方法,执行submitTopology的过程。而submitTopology会将jar文件上传到nimbus,上传是通过socket传输。在storm这个python脚本的jar方法里可以看到:
def jar(jarfile, klass, *args):                                                                                                                               

    exec_storm_class(                                                                                                                                          

    klass,                                                                                                                                                

    jvmtype=\"-client\",                                                                                                                                    

    extrajars=[jarfile, CONF_DIR, STORM_DIR + \"/bin\"],                                                                                                    

    args=args,                                                                                                                                            

    prefix=\"export STORM_JAR=\" + jarfile + \";\")

     将jar文件的地址设置为环境变量STORM_JAR,这个环境变量在执行submitTopology的时候用到:
//StormSubmitter.java 

    private static void submitJar(Map conf) {

    if(submittedJar==null) {

    LOG.info(\"Jar not uploaded to master yet. Submitting jar\"\"\");

    String localJar = System.getenv(\"STORM_JAR\");

    submittedJar = submitJar(conf, localJar);

    } else {

    LOG.info(\"Jar already uploaded to master. Not submitting jar.\");

    }

     }

    通过环境变量找到jar包的地址,然后上传。利用环境变量传参是个小技巧。

     其次,nimbus在接收到jar文件后,存放到数据目录的inbox目录,nimbus数据目录的结构

-nimbus

    -inbox

    -stormjar-57f1d694-2865-4b3b-8a7c-99104fc0aea3.jar

    -stormjar-76b4e316-b430-4215-9e26-4f33ba4ee520.jar

    -stormdist

    -storm-id

    -stormjar.jar

    -stormconf.ser

    -stormcode.ser

     其中inbox用于存放提交的jar文件,每个jar文件都重命名为stormjar加上一个32位的UUID。而stormdist存放的是启动topology后生成的文件,每个topology都分配一个唯一的id,ID的规则是“name-计数-时间戳”。启动后的topology的jar文件名命名为storm.jar ,而它的配置经过java序列化后存放在stormconf.ser文件,而stormcode.ser是将topology本身序列化后存放的文件。这些文件在部署的时候,supervisor会从这个目录下载这些文件,然后在supervisor本地执行这些代码。

     进入重点,topology任务的分配过程(zookeeper路径说明忽略root):

    1.在zookeeper上创建/taskheartbeats/{storm id} 路径,用于任务的心跳检测。storm对zookeeper的一个重要应用就是利用zk的临时节点做存活检测。task将定时刷新节点的时间戳,然后nimbus会检测这个时间戳是否超过timeout设置。

    2.从topology中获取bolts,spouts设置的并行数目以及全局配置的最大并行数,然后产生task id列表,如[1 2 3 4]

    3.在zookeeper上创建/tasks/{strom id}/{task id}路径,并存储task信息

    4.开始分配任务(内部称为assignment), 具体步骤:

    (1)从zk上获得已有的assignment(新的toplogy当然没有了)

    (2)查找所有可用的slot,所谓slot就是可用的worker,在所有supervisor上配置的多个worker的端口。

    (3)将任务均匀地分配给可用的worker,这里有两种情况:

    (a)task数目比worker多,例如task是[1 2 3 4],可用的slot只有[host1:port1 host2:port1],那么最终是这样分配

{1: [host1:port1] 2 : [host2:port1]

    3 : [host1:port1] 4 : [host2:port1]}

,可以看到任务平均地分配在两个worker上。

    (b)如果task数目比worker少,例如task是[1 2],而worker有[host1:port1 host1:port2 host2:port1 host2:port2],那么首先会将woker排序,将不同host间隔排列,保证task不会全部分配到同一个worker上,也就是将worker排列成

[host1:port1 host2:port1 host1:port2 host2:port2]
,然后分配任务为
{1: host1:port1 , 2 : host2:port2}

    (4)记录启动时间

    (5)判断现有的assignment是否跟重新分配的assignment相同,如果相同,不需要变更,否则更新assignment到zookeeper的/assignments/{storm id}上。

    5.启动topology,所谓启动,只是将zookeeper上/storms/{storm id}对应的数据里的active设置为true。

    6.nimbus会检查task的心跳,如果发现task心跳超过超时时间,那么会重新跳到第4步做re-assignment。

    原图已失效

dennis 2011-12-01 21:48 发表评论

同分类推荐文章

  1. 等了十年的 Go 链式管道,终于来了:seq 让你像写 Scala 一样写 Go (2026-06-25 18:38:18)
  2. Go 实验特性详解 (2026-06-21 10:05:27)
  3. amd64 微架构级别对 Go 程序性能提升多少? (2026-06-21 09:38:49)

查看更多 后端 文章 →

建议继续学习

  1. SmartSprites - 命令行形式的CSS Sprites生成器 (累计阅读 123,894)
  2. Java开发岗位面试题归类汇总 (累计阅读 22,155)
  3. android 开发入门 (累计阅读 19,527)
  4. 我的PHP,Python和Ruby之路 (累计阅读 13,146)
  5. HashMap解决hash冲突的方法 (累计阅读 12,652)
  6. 海量数据面试题举例 (累计阅读 11,114)
  7. 一个大二学生有关如何成为一名软件工程师的疑问及答复 (累计阅读 9,178)
  8. Java程序员应该知道的10个eclipse调试技巧 (累计阅读 8,011)
  9. 如何让员工忠于公司? (累计阅读 7,937)
  10. Java技术路线 (累计阅读 7,725)