Storm源码浅析之topology的提交
作者:dennis (killme2008@gmail.com)
转载请注明出处。
最近一直在读twitter开源的这个分布式流计算框架——storm的源码,还是有必要记录下一些比较有意思的地方。我按照storm的主要概念进行组织,并且只分析我关注的东西,因此称之为浅析。
一、介绍
Storm的开发语言主要是Java和Clojure,其中Java定义骨架,而Clojure编写核心逻辑。源码统计结果:
177 unique files.
7 files ignored.
http://cloc.sourceforge.net v 1.55 T=1.0 s (171.0 files/s, 46869.0 lines/s)
-------------------------------------------------------------------------------
Language files blank comment code
-------------------------------------------------------------------------------
Java 125 5010 2414 25661
Lisp 33 732 283 4871
Python 7 742 433 4675
CSS 1 12 45 1837
ruby 2 22 0 104
Bourne Shell 1 0 0 6
Javascript 2 1 15 6
-------------------------------------------------------------------------------
SUM: 171 6519 3190 37160
-------------------------------------------------------------------------------
Java代码25000多行,而Clojure(Lisp)只有4871行,说语言不重要再次证明是扯淡。
二、Topology和Nimbus
Topology是storm的核心理念,将spout和bolt组织成一个topology,运行在storm集群里,完成实时分析和计算的任务。这里我主要想介绍下topology部署到storm集群的大概过程。提交一个topology任务到Storm集群是通过StormSubmitter.submitTopology方法提交:
exec_storm_class(
klass,
jvmtype=\"-client\",
extrajars=[jarfile, CONF_DIR, STORM_DIR + \"/bin\"],
args=args,
prefix=\"export STORM_JAR=\" + jarfile + \";\")
private static void submitJar(Map conf) {
if(submittedJar==null) {
LOG.info(\"Jar not uploaded to master yet. Submitting jar\");
String localJar = System.getenv(\"STORM_JAR\");
submittedJar = submitJar(conf, localJar);
} else {
LOG.info(\"Jar already uploaded to master. Not submitting jar.\");
}
}
其次,nimbus在接收到jar文件后,存放到数据目录的inbox目录,nimbus数据目录的结构:
-inbox
-stormjar-57f1d694-2865-4b3b-8a7c-99104fc0aea3.jar
-stormjar-76b4e316-b430-4215-9e26-4f33ba4ee520.jar
-stormdist
-storm-id
-stormjar.jar
-stormconf.ser
-stormcode.ser
进入重点,topology任务的分配过程(zookeeper路径说明忽略root):
1.在zookeeper上创建/taskheartbeats/{storm id} 路径,用于任务的心跳检测。storm对zookeeper的一个重要应用就是利用zk的临时节点做存活检测。task将定时刷新节点的时间戳,然后nimbus会检测这个时间戳是否超过timeout设置。
2.从topology中获取bolts,spouts设置的并行数目以及全局配置的最大并行数,然后产生task id列表,如[1 2 3 4]
3.在zookeeper上创建/tasks/{strom id}/{task id}路径,并存储task信息
4.开始分配任务(内部称为assignment), 具体步骤:
(1)从zk上获得已有的assignment(新的toplogy当然没有了)
(2)查找所有可用的slot,所谓slot就是可用的worker,在所有supervisor上配置的多个worker的端口。
(3)将任务均匀地分配给可用的worker,这里有两种情况:
(a)task数目比worker多,例如task是[1 2 3 4],可用的slot只有[host1:port1 host2:port1],那么最终是这样分配
3 : [host1:port1] 4 : [host2:port1]}
(b)如果task数目比worker少,例如task是[1 2],而worker有[host1:port1 host1:port2 host2:port1 host2:port2],那么首先会将woker排序,将不同host间隔排列,保证task不会全部分配到同一个worker上,也就是将worker排列成
(4)记录启动时间
(5)判断现有的assignment是否跟重新分配的assignment相同,如果相同,不需要变更,否则更新assignment到zookeeper的/assignments/{storm id}上。
5.启动topology,所谓启动,只是将zookeeper上/storms/{storm id}对应的数据里的active设置为true。
6.nimbus会检查task的心跳,如果发现task心跳超过超时时间,那么会重新跳到第4步做re-assignment。
建议继续学习:
- Storm:最火的流式处理框架 (阅读:6071)
- Storm源码浅析之topology的提交 (阅读:4530)
- storm入门教程 第一章 前言 (阅读:4178)
- storm集群的监控 (阅读:3409)
- Storm入门教程 第二章 构建Topology (阅读:3155)
- storm常见问题解答 (阅读:2855)
- Storm配置项详解 (阅读:2866)
- storm集群的监控 (阅读:2743)
- 玩转CPU Topology (阅读:2615)
- storm入门教程 第四章 消息的可靠处理 (阅读:2249)
扫一扫订阅我的微信号:IT技术博客大学习
- 作者:dennis 来源: BlogJava-庄周梦蝶
- 标签: Storm topology
- 发布时间:2012-01-27 18:43:09
- [52] WEB系统需要关注的一些点
- [49] Oracle MTS模式下 进程地址与会话信
- [49] Go Reflect 性能
- [46] find命令的一点注意事项
- [46] 图书馆的世界纪录
- [46] 如何拿下简短的域名
- [46] Twitter/微博客的学习摘要
- [46] IOS安全–浅谈关于IOS加固的几种方法
- [45] android 开发入门
- [44] 【社会化设计】自我(self)部分――欢迎区