消息分发的同步均衡策略
浏览:5683次 出处信息
TimeTunnel在做消息分发时有这样一个场景:
A类消息需要做实时分析, 且量很大, 故它的消费者不会只是一台机器, 而是一组机器, 并要求这组中每台机器收到的消息量应该平均的, 即A消息在某个时刻有100条, 若有4台机器消费的话, 最佳的情况每台机器应收到25条.
这个场景就好比, 一个消息队列, 有多个线程并行消费, 如何保证每个消费线程获取的消息数量一样的.
解决它的切入点可能有两个:
这两条思路各有优缺点, 要视系统在时间和空间上的取舍而定. TimeTunnel 选择了第一种方式, 实现分了三步:
领域对象
消费进展(Track)
记录每个线程的当前消费状态.
同步误差范围(Range)
任意时刻可容忍的消费线程间消息数量的正负差距, 这个范围直接影响消费的效率, 范围越小则各个消费越均衡, 但同步牺牲了性能.
协调者(Coordinator)
根据每个消费线程的消费进展, 调控消费线程是活动还是等待.
01 /**
02 * {@link Coordinator} balance movings of {@link Track}s.
03 *
04 * @author jushi
05 * @created 2010-12-14
06 *
07 */
08 public interface Coordinator {
09 /**
10 * @param name
11 * @return a new {@link Track} with name.
12 */
13 Track track(String name);
14
15 /**
16 * {@link Track} record movings.
17 */
18 public interface Track {
19 /**
20 * Destory {@link Track} from {@link Coordinator}.
21 */
22 void destory();
23
24 /**
25 * Move one step.
26 *
27 * @return false means current {@link Track} need wait for balance, else
28 * true.
29 * @throws IllegalStateException if track destoryed.
30 */
31 boolean move();
32 }
33 }
验收用例
假定现在要协调两个人的步调, 每个人通过Track来汇报自己的走动的次数, 再根据汇报的反馈决定是暂定还是继续, 如下代码:
01 private final class Person implements Callable{ 02 public Person(final Track track) { 03 this.track = track; 04 } 05 06 @Override 07 public Integer call() throws Exception { 08 int steps = 0; 09 while (running.get()) { 10 if (canWalk()) steps++; 11 } 12 track.destory(); 13 return steps; 14 } 15 16 boolean canWalk() { 17 return track.move(); 18 } 19 20 private final Track track; 21 }
接下来, 就是要验证两人走了一段时间后, 二者步数的差距应该不超过预设的范围, 则有:
01 @Test
02 public void shouldSynchronizedTwoWalker() throws Exception {
03 final int value = 10;
04 final Coordinator coordinator = Coordinators.coordinator(value);
05
06 final List> futures =
07 Race.run(new Person(coordinator.track("0")),
08 new Person(coordinator.track("1")),
09 new DelayHaltCaller());
10 final int steps0 = futures.get(0).get();
11 final int steps1 = futures.get(1).get();
12 assertThat(Math.abs(steps1 - steps0), lessThanOrEqualTo(value));
13 }
14
15 public final AtomicBoolean running = new AtomicBoolean(true);
16
17 /**
18 * {@link DelayHaltCaller}
19 */
20 private final class DelayHaltCaller implements Callable {
21 @Override
22 public Integer call() throws Exception {
23 Thread.sleep(100L);
24 running.compareAndSet(true, false);
25 return 0;
26 }
27 }
设计实现
Track的状态机
消费者, Coordinator, Track 三者之间的协作关系
实现源码
01 import java.util.concurrent.atomic.AtomicBoolean;
02 import java.util.concurrent.atomic.AtomicInteger;
03
04 /**
05 * {@link Coordinators}
06 *
07 * @author jushi
08 * @created 2010-12-14
09 */
10 public final class Coordinators {
11 private Coordinators() {}
12
13 /**
14 * Create a new new {@link Coordinator} with valume.
15 *
16 * @param range for coordination.
17 *
18 * @return a new {@link Coordinator} with valume.
19 */
20 public static Coordinator coordinator(final int range) {
21 return new InnerCoordinator(range);
22 }
23
24 /** {@link InnerCoordinator} */
25 private final static class InnerCoordinator implements Coordinator {
26
27 public InnerCoordinator(final int range) { this.range = range; }
28
29 @Override
30 public Track track(final String name) { return new InnerTrack(name); }
31
32 private void arrived(final InnerTrack track) {
33 arriveds.incrementAndGet();
34 startNextRoundIfAllTrackArrived();
35 }
36
37 private void decrement() {
38 tracks.decrementAndGet();
39 startNextRoundIfAllTrackArrived();
40 }
41
42 private boolean flag() { return flag.get(); }
43
44 private void increment() { tracks.incrementAndGet(); }
45
46 private void reverseFlag() { flag.set(!flag.get()); }
47
48 private void startNextRoundIfAllTrackArrived() {
49 final int currentArriveds = arriveds.get();
50 if (currentArriveds < tracks.get()) return;
51 if (!arriveds.compareAndSet(currentArriveds, 0)) return;
52 reverseFlag();
53 }
54
55 private final int range;
56 private final AtomicInteger arriveds = new AtomicInteger();
57 private final AtomicInteger tracks = new AtomicInteger();
58 private final AtomicBoolean flag = new AtomicBoolean();
59
60 /** {@link InnerTrack} */
61 private final class InnerTrack implements Track {
62
63 public InnerTrack(final String name) {
64 this.name = name;
65 init();
66 increment();
67 }
68
69 @Override
70 public synchronized void destory() {
71 if (destoryed) return;
72 destoryed = true;
73 decrement();
74 }
75
76 @Override
77 public synchronized boolean move() {
78 if (destoryed)
79 throw new IllegalStateException("Can\'t move a destoryed track");
80 return state.move(this);
81 }
82
83 private boolean pause() {
84 arrived(this);
85 state = State.WAITTING;
86 return false;
87 }
88
89 private boolean canResume() { return flag != flag(); }
90
91 private boolean forward() {
92 steps++;
93 return true;
94 }
95
96 private boolean await() { return false; }
97
98 private void init() {
99 steps = 0;
100 flag = flag();
101 state = State.RUNNING;
102 }
103
104 private boolean isNotArrived() { return steps < range; }
105
106 private boolean resume() {
107 init();
108 return false;
109 }
110
111 private final String name;
112
113 private int steps;
114 private State state;
115 private boolean destoryed;
116 private boolean flag;
117
118 }
119
120 /** {@link State} */
121 private enum State {
122 RUNNING {
123 @Override
124 boolean move(final InnerTrack track) {
125 return track.isNotArrived() ? track.forward() : track.pause();
126 }
127 },
128 WAITTING {
129 @Override
130 boolean move(final InnerTrack track) {
131 return track.canResume() ? track.resume() : track.await();
132 }
133 };
134
135 abstract boolean move(InnerTrack track);
136 }
137
138 }
139 }
目前TimeTunnel已经在淘蝌蚪上开源, 更多源码请移步https://github.com/huolong/timetunnel.
建议继续学习:
- 关于IO的同步,异步,阻塞,非阻塞 (阅读:15777)
- mysql 主从同步原理 (阅读:6198)
- 五款最好的免费同步软件 (阅读:4387)
- MySQL5.5复制/同步的新特性及改进 (阅读:4283)
- 大量小文件的实时同步方案 (阅读:4219)
- redis源代码分析 - replication (阅读:4008)
- 三款面向 Amazon S3 的开源文件同步工具之对比 (阅读:3803)
- timetunnel之系统架构 (阅读:3635)
- rsync主动同步代码 (阅读:3548)
- truncate table 不能复制到从库 (阅读:3198)
QQ技术交流群:445447336,欢迎加入!
扫一扫订阅我的微信号:IT技术博客大学习
扫一扫订阅我的微信号:IT技术博客大学习
<< 前一篇:ZeroMQ 的模式
后一篇:Amazon分布式系统Dynamo >>
文章信息
- 作者:聚石 来源: 淘宝数据平台与产品部官方博客 tbdata.org
- 标签: TimeTunnel 同步 均衡 消息分发
- 发布时间:2011-03-02 22:53:12
建议继续学习
近3天十大热文
-
[928] WordPress插件开发 -- 在插件使用 -
[134] 解决 nginx 反向代理网页首尾出现神秘字 -
[52] 整理了一份招PHP高级工程师的面试题 -
[52] 如何保证一个程序在单台服务器上只有唯一实例( -
[51] 用 Jquery 模拟 select -
[50] 海量小文件存储 -
[50] Innodb分表太多或者表分区太多,会导致内 -
[50] 全站换域名时利用nginx和javascri -
[49] CloudSMS:免费匿名的云短信 -
[47] jQuery性能优化指南


