技术头条 - 一个快速在微博传播文章的方式     搜索本站
您现在的位置首页 --> 系统架构 --> 消息分发的同步均衡策略

消息分发的同步均衡策略

浏览:5061次  出处信息

    TimeTunnel在做消息分发时有这样一个场景:

    A类消息需要做实时分析, 且量很大, 故它的消费者不会只是一台机器, 而是一组机器, 并要求这组中每台机器收到的消息量应该平均的, 即A消息在某个时刻有100条, 若有4台机器消费的话, 最佳的情况每台机器应收到25条.

    这个场景就好比, 一个消息队列, 有多个线程并行消费, 如何保证每个消费线程获取的消息数量一样的.

    解决它的切入点可能有两个:

  • 消费线程拉的方式, 这就需要一个协调器, 来同步每个消费线程的进度, 即每个消费线程每次拉完都需要询问协调器是否可以继续;
  • 单一线程推的方式, 创建一组与消费线程数量一致的二级队列(即每个消费线程仅从与之绑定的队列中取); 独立一个分派线程, 它从一级队列里取消息, 然后轮转的推给每个二级消费队列.
  •     这两条思路各有优缺点, 要视系统在时间和空间上的取舍而定. TimeTunnel 选择了第一种方式, 实现分了三步:

    领域对象

    消费进展(Track)

        记录每个线程的当前消费状态.

    同步误差范围(Range)

        任意时刻可容忍的消费线程间消息数量的正负差距, 这个范围直接影响消费的效率, 范围越小则各个消费越均衡, 但同步牺牲了性能.

    协调者(Coordinator)

        根据每个消费线程的消费进展, 调控消费线程是活动还是等待.

    01 /**
    02  * {@link Coordinator} balance movings of {@link Track}s.
    03  * 
    04  * @author jushi
    05  * @created 2010-12-14
    06  * 
    07  */
    08 public interface Coordinator {
    09   /**
    10    * @param name
    11    * @return a new {@link Track} with name.
    12    */
    13   Track track(String name);
    14 
    15   /**
    16    * {@link Track} record movings.
    17    */
    18   public interface Track {
    19     /**
    20      * Destory {@link Track} from {@link Coordinator}.
    21      */
    22     void destory();
    23 
    24     /**
    25      * Move one step.
    26      * 
    27      * @return false means current {@link Track} need wait for balance, else
    28      *         true.
    29      * @throws IllegalStateException if track destoryed.
    30      */
    31     boolean move();
    32   }
    33 }

    验收用例

        假定现在要协调两个人的步调, 每个人通过Track来汇报自己的走动的次数, 再根据汇报的反馈决定是暂定还是继续, 如下代码:

    01   private final class Person implements Callable {
    02     public Person(final Track track) {
    03       this.track = track;
    04     }
    05 
    06     @Override
    07     public Integer call() throws Exception {
    08       int steps = 0;
    09       while (running.get()) {
    10         if (canWalk()) steps++;
    11       }
    12       track.destory();
    13       return steps;
    14     }
    15 
    16     boolean canWalk() {
    17       return track.move();
    18     }
    19 
    20     private final Track track;
    21   }

        接下来, 就是要验证两人走了一段时间后, 二者步数的差距应该不超过预设的范围, 则有:

    01   @Test
    02   public void shouldSynchronizedTwoWalker() throws Exception {
    03     final int value = 10;
    04     final Coordinator coordinator = Coordinators.coordinator(value);
    05 
    06     final List> futures =
    07       Race.run(new Person(coordinator.track("0")),
    08                new Person(coordinator.track("1")),
    09                new DelayHaltCaller());
    10     final int steps0 = futures.get(0).get();
    11     final int steps1 = futures.get(1).get();
    12     assertThat(Math.abs(steps1 - steps0), lessThanOrEqualTo(value));
    13   }
    14 
    15   public final AtomicBoolean running = new AtomicBoolean(true);
    16 
    17   /**
    18    * {@link DelayHaltCaller}
    19    */
    20   private final class DelayHaltCaller implements Callable {
    21     @Override
    22     public Integer call() throws Exception {
    23       Thread.sleep(100L);
    24       running.compareAndSet(true, false);
    25       return 0;
    26     }
    27   }

    设计实现

    Track的状态机

        

    消费者, Coordinator, Track 三者之间的协作关系

        

    实现源码

    01 import java.util.concurrent.atomic.AtomicBoolean;
    02 import java.util.concurrent.atomic.AtomicInteger;
    03 
    04 /**
    05  * {@link Coordinators}
    06  *
    07  * @author jushi
    08  * @created 2010-12-14
    09  */
    10 public final class Coordinators {
    11   private Coordinators() {}
    12 
    13   /**
    14    * Create a new new {@link Coordinator} with valume.
    15    *
    16    * @param range for coordination.
    17    *
    18    * @return a new {@link Coordinator} with valume.
    19    */
    20   public static Coordinator coordinator(final int range) {
    21     return new InnerCoordinator(range);
    22   }
    23 
    24   /** {@link InnerCoordinator} */
    25   private final static class InnerCoordinator implements Coordinator {
    26 
    27     public InnerCoordinator(final int range) { this.range = range; }
    28 
    29     @Override
    30     public Track track(final String name) { return new InnerTrack(name); }
    31 
    32     private void arrived(final InnerTrack track) {
    33       arriveds.incrementAndGet();
    34       startNextRoundIfAllTrackArrived();
    35     }
    36 
    37     private void decrement() {
    38       tracks.decrementAndGet();
    39       startNextRoundIfAllTrackArrived();
    40     }
    41 
    42     private boolean flag() { return flag.get(); }
    43 
    44     private void increment() { tracks.incrementAndGet(); }
    45 
    46     private void reverseFlag() { flag.set(!flag.get()); }
    47 
    48     private void startNextRoundIfAllTrackArrived() {
    49       final int currentArriveds = arriveds.get();
    50       if (currentArriveds < tracks.get()) return;
    51       if (!arriveds.compareAndSet(currentArriveds, 0)) return;
    52       reverseFlag();
    53     }
    54 
    55     private final int range;
    56     private final AtomicInteger arriveds = new AtomicInteger();
    57     private final AtomicInteger tracks = new AtomicInteger();
    58     private final AtomicBoolean flag = new AtomicBoolean();
    59 
    60     /** {@link InnerTrack} */
    61     private final class InnerTrack implements Track {
    62 
    63       public InnerTrack(final String name) {
    64         this.name = name;
    65         init();
    66         increment();
    67       }
    68 
    69       @Override
    70       public synchronized void destory() {
    71         if (destoryed) return;
    72         destoryed = true;
    73         decrement();
    74       }
    75 
    76       @Override
    77       public synchronized boolean move() {
    78         if (destoryed)
    79           throw new IllegalStateException("Can\'t move a destoryed track");
    80         return state.move(this);
    81       }
    82 
    83       private boolean pause() {
    84         arrived(this);
    85         state = State.WAITTING;
    86         return false;
    87       }
    88 
    89       private boolean canResume() { return flag != flag(); }
    90 
    91       private boolean forward() {
    92         steps++;
    93         return true;
    94       }
    95 
    96       private boolean await() { return false; }
    97 
    98       private void init() {
    99         steps = 0;
    100         flag = flag();
    101         state = State.RUNNING;
    102       }
    103 
    104       private boolean isNotArrived() { return steps < range; }
    105 
    106       private boolean resume() {
    107         init();
    108         return false;
    109       }
    110 
    111       private final String name;
    112 
    113       private int steps;
    114       private State state;
    115       private boolean destoryed;
    116       private boolean flag;
    117 
    118     }
    119 
    120     /** {@link State} */
    121     private enum State {
    122       RUNNING {
    123         @Override
    124         boolean move(final InnerTrack track) {
    125           return track.isNotArrived() ? track.forward() : track.pause();
    126         }
    127       },
    128       WAITTING {
    129         @Override
    130         boolean move(final InnerTrack track) {
    131           return track.canResume() ? track.resume() : track.await();
    132         }
    133       };
    134 
    135       abstract boolean move(InnerTrack track);
    136     }
    137 
    138   }
    139 }

        目前TimeTunnel已经在淘蝌蚪上开源, 更多源码请移步https://github.com/huolong/timetunnel.

    建议继续学习:

    1. 关于IO的同步,异步,阻塞,非阻塞    (阅读:14593)
    2. mysql 主从同步原理    (阅读:5731)
    3. 五款最好的免费同步软件    (阅读:3882)
    4. 大量小文件的实时同步方案    (阅读:3753)
    5. MySQL5.5复制/同步的新特性及改进    (阅读:3818)
    6. redis源代码分析 - replication    (阅读:3527)
    7. rsync主动同步代码    (阅读:3152)
    8. timetunnel之系统架构    (阅读:3127)
    9. 三款面向 Amazon S3 的开源文件同步工具之对比    (阅读:3179)
    10. truncate table 不能复制到从库    (阅读:2750)
    QQ技术交流群:445447336,欢迎加入!
    扫一扫订阅我的微信号:IT技术博客大学习
    << 前一篇:ZeroMQ 的模式
    © 2009 - 2025 by blogread.cn 微博:@IT技术博客大学习

    京ICP备15002552号-1