消息分发的同步均衡策略
浏览:5061次 出处信息
TimeTunnel在做消息分发时有这样一个场景:
A类消息需要做实时分析, 且量很大, 故它的消费者不会只是一台机器, 而是一组机器, 并要求这组中每台机器收到的消息量应该平均的, 即A消息在某个时刻有100条, 若有4台机器消费的话, 最佳的情况每台机器应收到25条.
这个场景就好比, 一个消息队列, 有多个线程并行消费, 如何保证每个消费线程获取的消息数量一样的.
解决它的切入点可能有两个:
这两条思路各有优缺点, 要视系统在时间和空间上的取舍而定. TimeTunnel 选择了第一种方式, 实现分了三步:
领域对象
消费进展(Track)
记录每个线程的当前消费状态.
同步误差范围(Range)
任意时刻可容忍的消费线程间消息数量的正负差距, 这个范围直接影响消费的效率, 范围越小则各个消费越均衡, 但同步牺牲了性能.
协调者(Coordinator)
根据每个消费线程的消费进展, 调控消费线程是活动还是等待.
01 /** 02 * {@link Coordinator} balance movings of {@link Track}s. 03 * 04 * @author jushi 05 * @created 2010-12-14 06 * 07 */ 08 public interface Coordinator { 09 /** 10 * @param name 11 * @return a new {@link Track} with name. 12 */ 13 Track track(String name); 14 15 /** 16 * {@link Track} record movings. 17 */ 18 public interface Track { 19 /** 20 * Destory {@link Track} from {@link Coordinator}. 21 */ 22 void destory(); 23 24 /** 25 * Move one step. 26 * 27 * @return false means current {@link Track} need wait for balance, else 28 * true. 29 * @throws IllegalStateException if track destoryed. 30 */ 31 boolean move(); 32 } 33 }
验收用例
假定现在要协调两个人的步调, 每个人通过Track来汇报自己的走动的次数, 再根据汇报的反馈决定是暂定还是继续, 如下代码:
01 private final class Person implements Callable{ 02 public Person(final Track track) { 03 this.track = track; 04 } 05 06 @Override 07 public Integer call() throws Exception { 08 int steps = 0; 09 while (running.get()) { 10 if (canWalk()) steps++; 11 } 12 track.destory(); 13 return steps; 14 } 15 16 boolean canWalk() { 17 return track.move(); 18 } 19 20 private final Track track; 21 }
接下来, 就是要验证两人走了一段时间后, 二者步数的差距应该不超过预设的范围, 则有:
01 @Test 02 public void shouldSynchronizedTwoWalker() throws Exception { 03 final int value = 10; 04 final Coordinator coordinator = Coordinators.coordinator(value); 05 06 final List> futures = 07 Race.run(new Person(coordinator.track("0")), 08 new Person(coordinator.track("1")), 09 new DelayHaltCaller()); 10 final int steps0 = futures.get(0).get(); 11 final int steps1 = futures.get(1).get(); 12 assertThat(Math.abs(steps1 - steps0), lessThanOrEqualTo(value)); 13 } 14 15 public final AtomicBoolean running = new AtomicBoolean(true); 16 17 /** 18 * {@link DelayHaltCaller} 19 */ 20 private final class DelayHaltCaller implements Callable { 21 @Override 22 public Integer call() throws Exception { 23 Thread.sleep(100L); 24 running.compareAndSet(true, false); 25 return 0; 26 } 27 }
设计实现
Track的状态机
消费者, Coordinator, Track 三者之间的协作关系
实现源码
01 import java.util.concurrent.atomic.AtomicBoolean; 02 import java.util.concurrent.atomic.AtomicInteger; 03 04 /** 05 * {@link Coordinators} 06 * 07 * @author jushi 08 * @created 2010-12-14 09 */ 10 public final class Coordinators { 11 private Coordinators() {} 12 13 /** 14 * Create a new new {@link Coordinator} with valume. 15 * 16 * @param range for coordination. 17 * 18 * @return a new {@link Coordinator} with valume. 19 */ 20 public static Coordinator coordinator(final int range) { 21 return new InnerCoordinator(range); 22 } 23 24 /** {@link InnerCoordinator} */ 25 private final static class InnerCoordinator implements Coordinator { 26 27 public InnerCoordinator(final int range) { this.range = range; } 28 29 @Override 30 public Track track(final String name) { return new InnerTrack(name); } 31 32 private void arrived(final InnerTrack track) { 33 arriveds.incrementAndGet(); 34 startNextRoundIfAllTrackArrived(); 35 } 36 37 private void decrement() { 38 tracks.decrementAndGet(); 39 startNextRoundIfAllTrackArrived(); 40 } 41 42 private boolean flag() { return flag.get(); } 43 44 private void increment() { tracks.incrementAndGet(); } 45 46 private void reverseFlag() { flag.set(!flag.get()); } 47 48 private void startNextRoundIfAllTrackArrived() { 49 final int currentArriveds = arriveds.get(); 50 if (currentArriveds < tracks.get()) return; 51 if (!arriveds.compareAndSet(currentArriveds, 0)) return; 52 reverseFlag(); 53 } 54 55 private final int range; 56 private final AtomicInteger arriveds = new AtomicInteger(); 57 private final AtomicInteger tracks = new AtomicInteger(); 58 private final AtomicBoolean flag = new AtomicBoolean(); 59 60 /** {@link InnerTrack} */ 61 private final class InnerTrack implements Track { 62 63 public InnerTrack(final String name) { 64 this.name = name; 65 init(); 66 increment(); 67 } 68 69 @Override 70 public synchronized void destory() { 71 if (destoryed) return; 72 destoryed = true; 73 decrement(); 74 } 75 76 @Override 77 public synchronized boolean move() { 78 if (destoryed) 79 throw new IllegalStateException("Can\'t move a destoryed track"); 80 return state.move(this); 81 } 82 83 private boolean pause() { 84 arrived(this); 85 state = State.WAITTING; 86 return false; 87 } 88 89 private boolean canResume() { return flag != flag(); } 90 91 private boolean forward() { 92 steps++; 93 return true; 94 } 95 96 private boolean await() { return false; } 97 98 private void init() { 99 steps = 0; 100 flag = flag(); 101 state = State.RUNNING; 102 } 103 104 private boolean isNotArrived() { return steps < range; } 105 106 private boolean resume() { 107 init(); 108 return false; 109 } 110 111 private final String name; 112 113 private int steps; 114 private State state; 115 private boolean destoryed; 116 private boolean flag; 117 118 } 119 120 /** {@link State} */ 121 private enum State { 122 RUNNING { 123 @Override 124 boolean move(final InnerTrack track) { 125 return track.isNotArrived() ? track.forward() : track.pause(); 126 } 127 }, 128 WAITTING { 129 @Override 130 boolean move(final InnerTrack track) { 131 return track.canResume() ? track.resume() : track.await(); 132 } 133 }; 134 135 abstract boolean move(InnerTrack track); 136 } 137 138 } 139 }
目前TimeTunnel已经在淘蝌蚪上开源, 更多源码请移步https://github.com/huolong/timetunnel.
建议继续学习:
- 关于IO的同步,异步,阻塞,非阻塞 (阅读:14593)
- mysql 主从同步原理 (阅读:5731)
- 五款最好的免费同步软件 (阅读:3882)
- 大量小文件的实时同步方案 (阅读:3753)
- MySQL5.5复制/同步的新特性及改进 (阅读:3818)
- redis源代码分析 - replication (阅读:3527)
- rsync主动同步代码 (阅读:3152)
- timetunnel之系统架构 (阅读:3127)
- 三款面向 Amazon S3 的开源文件同步工具之对比 (阅读:3179)
- truncate table 不能复制到从库 (阅读:2750)
QQ技术交流群:445447336,欢迎加入!
扫一扫订阅我的微信号:IT技术博客大学习
扫一扫订阅我的微信号:IT技术博客大学习
<< 前一篇:ZeroMQ 的模式
后一篇:Amazon分布式系统Dynamo >>
文章信息
- 作者:聚石 来源: 淘宝数据平台与产品部官方博客 tbdata.org
- 标签: TimeTunnel 同步 均衡 消息分发
- 发布时间:2011-03-02 22:53:12
建议继续学习
近3天十大热文
- [51] WEB系统需要关注的一些点
- [48] Oracle MTS模式下 进程地址与会话信
- [48] Go Reflect 性能
- [46] IOS安全–浅谈关于IOS加固的几种方法
- [45] android 开发入门
- [45] find命令的一点注意事项
- [45] Twitter/微博客的学习摘要
- [44] 【社会化设计】自我(self)部分――欢迎区
- [44] 图书馆的世界纪录
- [43] 关于恐惧的自白