技术头条 - 一个快速在微博传播文章的方式     搜索本站
您现在的位置首页 --> 系统架构 --> 分布式选主 -- 利用Mysql ACID和Lease协议实现选主和高可用

分布式选主 -- 利用Mysql ACID和Lease协议实现选主和高可用

浏览:680次  出处信息

     在实际生产开发中,遇到一些多节点共存,需要选主,并且要实现HA自动容错的场景,思考了写方法拿出来和大家分享一下。


  1. Lease协议,Mysql ACID

  2. 高可用选主方案设计

  3. 适用场景

  4. Java语言实现描述

  5. 进一步优化

     系统中有很多应用场景要类似主从架构,主服务器(Master)对外提供服务,从服务器(Salve)热备份,不提供服务但随时活着,如果Master出现宕机或者网络问题,Slave即可接替Master对外服务,并由Slave提升为Master(新主)。典型的多节点共存,但只能同时存在一个主,并且所有节点的状态能统一维护

     大家一定首先想到了著名的Paxos算法(http://baike.baidu.com/view/8438269.htm)。简单的说,Paxos通过每个节点的投票算法,来决议一个事情,当多余1/2个节点都投票通过时,Paxos产生一个唯一结果的决议,并通知各个节点维护这个信息。例如Paxos的选主,首先产生一个关于某个节点希望当Master的投票,然后各个节点给出反馈,最终Paxos集群维护唯一的Master的结论。Zookeeper就是Paxos的一种实现。这种场景最适合用zookeeper来选主,但zookeeper有个明显的缺点,当存活的节点小于zookeeper集群的1/2时,就不能工作了。比如zk有10各节点,那么必须满足可用的节点大于5才可。

     在实际环境中,如果对Master要求不是那么严格的话,可以通过某些改进和取舍来达到目的。比如可能在秒级别允许Master暂时不能访问、选主时间内可能存在一定的冲突但通过再次选主即可。本人设计了一个简易的利用Mysql一致性和简易版Lease来workaround。

Mysql ACID保证了一条数据记录的一致性、完整性,不会出现多进程读写的一致性问题和唯一正确性。Lease协议(协议细节可以Google之)通过向Master发送一个lease(租期)包,Master在这个lease期之内充当主角色,如果lease期到了则再次去申请lease,如果lease期到了,但是网络除了问题,这时Master可以i主动下线,让其他节点去竞选Master。举个例子,三个节点A、B、C经过第一轮选主之后,A成为Master,它获得了10秒的lease,当前时间假设是00:00:00,那么它Master地位可以用到00:00:10,当时间到达00:00:10时,A、B、C会重新进行Master选举,每个节点都有可能成为Master(从工程的角度触发,A继续为Master的概率更大),如果这时候A的网络断了,不能联通B、C的集群了,那么A会自动下线,不会去竞争,这样就不会出现“脑裂”的现象。

     

      ---------------------------------------------- 华丽的分割线 ----------------------------------------------

     

       设计方案如下:(server代表集群中的一台机器,也可看作一个进程,server之间是平等的)


  1. 各个server之间用ntpserver时间同步(保证服务器之间秒级同步即可)

  2. 各个server持有一个唯一ID号(ip+进程号),通过此id唯一标识一个server实例

  3. 各个server定义一个lease租期,单位为秒

  4. Mysql唯一表唯一一条记录维护全局Master的信息,ACID保证一致性

  5. Master Server每半个lease期向Mysql更新如上的唯一一条记录,并更新心跳,维护Master状态

  6. Slaver Server每半个lease周期从mysql获取Master Server信息,如果数据库中Master的Lease超过了当前时间(heartbeat_time+ lease > current_time),则申请当Master。


     这其中比较棘手的问题是:

       1、由于数据库访问和休眠的时间(lease的一半),有时延的存在,要处理Mysql异常、网络异常。

       2、可能存在同时抢占Master的server,这个时候就需要一个验证机制保证为抢到Master的server自动退位为Slaver


     下面给出图实例 :(10.0.0.1为Master)


    10.0.0.1 crash了。mysql中维护的10.0.0.1的主信息已过期,其他节点去抢占



     各个节点再次读取数据库,查看是否是自己抢占成功了:



之后,10.0.0.3作为Master对外服务。此时如果10.0.0.1重启,可作为Slaver。如果10.0.0.1因为网络分化或者网络异常而不能维护心跳,则在超过自身lease时自动停止服务,不会出现“双Master”的现象。


     每个Server遵循如下流程:



       数据库设计:


       某一时刻,数据库中Master的信息:



      当前时间: 45分15秒

      当前Master Lease :6秒

      当前Master Lease可用到: 45分21秒

 

      ---------------------------------------------- 华丽的分割线 ----------------------------------------------

      3、适用的场景

       一、生命周期内可使用Mysql、并且各个server之间时间同步。

       二、需要集群中选出唯一主对外提供服务,其他节点作为slaver做standby,主lease过期时竞争为Master

       三、对比zookeeper,可满足如果集群挂掉一半节点,也可正常工作的情况,比如只有一主一备。

       四、允许选主操作在秒级容错的系统,选主的时候可能有lease/2秒的时间窗口,此时服务可能不可用。


       五、允许lease/2秒内出现极限双Master情况,但是概率很小。


       ---------------------------------------------- 华丽的分割线 ----------------------------------------------

       4、Java语言实现描述


  1. 一些配置信息和时间相关、休眠周期相关的时间变量  

  1. final long interval = lease / intervalDivisor;  

  2. long waitForLeaseChallenging = 0L;  

  3. lease = lease / 1000L;  

  4.  

  5. long challengeFailTimes = 0L;  

  6. long takeRest = 0L;  

  7. long dbExceptionTimes = 0L;  

  8. long offlineTime = 0L;  

  9. Random rand = new Random();  

  10. Status stateMechine = Status.START;  

  11.  

  12. long activeNodeLease = 0L;  

  13. long activeNodeTimeStamp = 0L;  


       数据库异常的处理:



  1. KeepAlive keepaliveNode = null;  

  2. try {  

  3.    /* first of all get it from mysql */  

  4.    keepaliveNode = dbService.accquireAliveNode();  

  5.    if (stateMechine != Status.START && keepaliveNode==null)  

  6.        throw new Exception();  

  7.    // recount , avoid network shake  

  8.    dbExceptionTimes = 0L;  

  9. } catch (Exception e) {  

  10.    log.fatal("[Scanner] Database Exception with times : " + dbExceptionTimes++);  

  11.    if (stateMechine == Status.OFFLINE) {  

  12.        log.warn("[Scanner] Database Exception , OFFLINE ");  

  13.    } else if (dbExceptionTimes >= 3) {  

  14.        log.fatal("[Scanner] Database Exception , Node Offline Mode Active , uniqueid : " + uniqueID);  

  15.        stateMechine = Status.OFFLINE;  

  16.        dbExceptionTimes = 0L;  

  17.        offlineTime = System.currentTimeMillis();  

  18.        online = false;  

  19.    } else  

  20.        continue;  

  21. }  


       总的循环和状态机的变迁:



  1.        while (true) {  

  2.  

  3.            SqlSession session = dbConnecction.openSession();  

  4.            ActionScanMapper dbService = session.getMapper(ActionScanMapper.class);  

  5.  

  6.            KeepAlive keepaliveNode = null;  

  7.            try {  

  8.                /* first of all get it from mysql */  

  9.                keepaliveNode = dbService.accquireAliveNode();  

  10.                if (stateMechine != Status.START && keepaliveNode==null)  

  11.                    throw new Exception();  

  12.                // recount , avoid network shake  

  13.                dbExceptionTimes = 0L;  

  14.            } catch (Exception e) {  

  15.                log.fatal("[Scanner] Database Exception with times : " + dbExceptionTimes++);  

  16.                if (stateMechine == Status.OFFLINE) {  

  17.                    log.warn("[Scanner] Database Exception , OFFLINE ");  

  18.                } else if (dbExceptionTimes >= 3) {  

  19.                    log.fatal("[Scanner] Database Exception , Node Offline Mode Active , uniqueid : " + uniqueID);  

  20.                    stateMechine = Status.OFFLINE;  

  21.                    dbExceptionTimes = 0L;  

  22.                    offlineTime = System.currentTimeMillis();  

  23.                    online = false;  

  24.                } else  

  25.                    continue;  

  26.            }  

  27.  

  28.            try {  

  29.  

  30.                activeNodeLease = keepaliveNode!=null ? keepaliveNode.getLease() : activeNodeLease;  

  31.                activeNodeTimeStamp = keepaliveNode!=null ? keepaliveNode.getTimestamp() : activeNodeTimeStamp;  

  32.                takeRest = interval;  

  33.  

  34.                switch (stateMechine) {  

  35.                    case START:  

  36.                        if (keepaliveNode == null) {  

  37.                            log.fatal("[START] Accquire node is null , ignore ");  

  38.                            // if no node register here , we challenge it  

  39.                            stateMechine = Status.CHALLENGE_REGISTER;  

  40.                            takeRest = 0;  

  41.                        } else {  

  42.                            // check the lease , wether myself or others  

  43.                            if (activeNodeLease < timestampGap(activeNodeTimeStamp)) {  

  44.                                log.warn("[START] Lease Timeout scanner for uniqueid : " + uniqueID + ", timeout : "  

  45.                                            + timestampGap(activeNodeTimeStamp));  

  46.                                if (keepaliveNode.getStatus().equals(STAT_CHALLENGE))  

  47.                                    stateMechine = Status.HEARTBEAT;  

  48.                                else {  

  49.                                    stateMechine = Status.CHALLENGE_MASTER;  

  50.                                    takeRest = 0;  

  51.                                }  

  52.                            } else if (keepaliveNode.getUniqueID().equals(uniqueID)) {  

  53.                                // I'am restart  

  54.                                log.info("[START] Restart Scanner for uniqueid : " + uniqueID  

  55.                                                + ", timeout : " + timestampGap(activeNodeTimeStamp));  

  56. stateMechine = Status.HEARTBEAT;  

  57.                            } else {  

  58.                                log.info("[START] Already Exist Keepalive Node with uniqueid : " + uniqueID);  

  59.                                stateMechine = Status.HEARTBEAT;  

  60.                            }  

  61.                        }  

  62.                        break;  

  63.                    case HEARTBEAT:  

  64.                        /* uniqueID == keepaliveNode.uniqueID */  

  65.                        if (keepaliveNode.getUniqueID().equals(uniqueID)) {  

  66.                            if (activeNodeLease < timestampGap(activeNodeTimeStamp)) {  

  67.                                // we should challenge now , without nessesary to checkout Status[CHALLENGE]  

  68.                                log.warn("[HEARTBEAT] HEART BEAT Lease is timeout for uniqueid : " + uniqueID  

  69.                                                + ", time : " + timestampGap(activeNodeTimeStamp));  

  70.                                stateMechine = Status.CHALLENGE_MASTER;  

  71.                                takeRest = 0;  

  72.                                break;  

  73.                            } else {  

  74.                                // lease ok , just update mysql keepalive status  

  75.                                dbService.updateAliveNode(keepaliveNode.setLease(lease));  

  76.                                online = true;  

  77.                                log.info("[HEARTBEAT] update equaled keepalive node , uniqueid : " + uniqueID  

  78.                                        + ", lease : " + lease + "s, remain_usable : " +  

  79.                                        ((activeNodeTimeStamp * 1000L + lease * 1000L) - System.currentTimeMillis()) + " ms");  

  80.                            }  

  81.                        } else {  

  82.                            /* It's others , let's check lease */  

  83.                            if (activeNodeLease < timestampGap(activeNodeTimeStamp)) {  

  84.                                if (keepaliveNode.getStatus().equals(STAT_CHALLENGE)) {  

  85.                                    waitForLeaseChallenging = (long) (activeNodeLease * awaitFactor);  

  86.                                    if ((waitForLeaseChallenging) < timestampGap(activeNodeTimeStamp)) {  

  87.                                        log.info("[HEARTBEAT] Lease Expired , Diff[" + timestampGap(activeNodeTimeStamp) + "] , Lease[" + activeNodeLease + "]");  

  88.                                        stateMechine = Status.CHALLENGE_MASTER;  

  89.                                        takeRest = 0;  

  90.                                    } else {  

  91.                                        log.info("[HEARTBEAT] Other Node Challenging , We wait for a moment ...");  

  92.                                    }  

  93.                                } else {  

  94.                                    log.info("[HEARTBEAT] Lease Expired , Diff[" + timestampGap(activeNodeTimeStamp) + "] , lease[" + activeNodeLease + "]");  

  95.                                    stateMechine = Status.CHALLENGE_MASTER;  

  96.                                    takeRest = 0;  

  97.                                }  

  98.                            } else {  

  99.                                online = false;  

  100.                                log.info("[HEARTBEAT] Exist Active Node On The Way with uniqueid : "  

  101.                                                + keepaliveNode.getUniqueID() + ", lease : " + keepaliveNode.getLease());  

  102.                            }  

  103.                        }  

  104.                        break;  

  105.                    case CHALLENGE_MASTER:  

  106.                        dbService.challengeAliveNode(new KeepAlive().setUniqueID(uniqueID).setLease(lease));  

  107.                        online = false;  

  108.                        // wait for the expired node offline automatic  

  109.                        // and others also have changce to challenge  

  110. takeRest = activeNodeLease;  

  111.                        stateMechine = Status.CHALLENGE_COMPLETE;  

  112.                        log.info("[CHALLENGE_MASTER] Other Node is timeout["  

  113.                                        + timestampGap(activeNodeTimeStamp) + "s] , I challenge with uniqueid : " + uniqueID  

  114.                                        + ", lease : " + lease + ", wait : " + lease);  

  115.                        break;  

  116.                    case CHALLENGE_REGISTER:  

  117.                        dbService.registerNewNode(new KeepAlive().setUniqueID(uniqueID).setLease(lease));  

  118.                        online = false;  

  119.                        // wait for the expired node offline automatic  

  120.                        // and others also have changce to challenge  

  121.                        takeRest = activeNodeLease;  

  122.                        stateMechine = Status.CHALLENGE_COMPLETE;  

  123.                        log.info("[CHALLENGE_REGISTER] Regiter Keepalive uniqueid : " + uniqueID + ", lease : " + lease);  

  124.                        break;  

  125.                    case CHALLENGE_COMPLETE :  

  126.                        if (keepaliveNode.getUniqueID().equals(uniqueID)) {  

  127.                            dbService.updateAliveNode(keepaliveNode.setLease(lease));  

  128.                            online = true;  

  129.                            log.info("[CHALLENGE_COMPLETE] I Will be the Master uniqueid : " + uniqueID);  

  130.                            // make the uptime correct  

  131.                            stateMechine = Status.HEARTBEAT;  

  132.                        } else {  

  133.                            online = false;  

  134.                            log.warn("[CHALLENGE_COMPLETE] So unlucky , Challenge Failed By Other Node with uniqueid : " + keepaliveNode.getUniqueID());  

  135.                            if (challengeFailTimes++ >= (rand.nextLong() % maxChallenge) + minChallenge) {  

  136.                                // need't challenge anymore in a long time  

  137.                                takeRest=maxChallengeAwaitInterval;  

  138.                                stateMechine = Status.HEARTBEAT;  

  139.                                challengeFailTimes = 0L;  

  140.                                log.info("[CHALLENGE_COMPLETE] Challenge Try Times Used Up , let's take a long rest !");  

  141.                            } else {  

  142. stateMechine = Status.HEARTBEAT;  

  143.                                log.info("[CHALLENGE_COMPLETE] Challenge Times : " + challengeFailTimes + ", Never Give Up , to[" + stateMechine + "]");  

  144.                            }  

  145.                        }  

  146.                        break;  

  147.                    case OFFLINE :  

  148.                        log.fatal("[Scanner] Offline Mode Node with uniqueid : " + uniqueID);  

  149.                        if (System.currentTimeMillis() - offlineTime >= maxOfflineFrozen) {  

  150.                            // I am relive forcely  

  151.                            log.info("[Scanner] I am relive to activie node  , uniqueid : " + uniqueID);  

  152.                            stateMechine = Status.HEARTBEAT;  

  153.                            offlineTime = 0L;  

  154.                        } else if (keepaliveNode != null) {  

  155.                            // db is reconnected  

  156.                            stateMechine = Status.HEARTBEAT;  

  157.                            offlineTime = 0L;  

  158.                            log.info("[Scanner] I am relive to activie node  , uniqueid : " + uniqueID);  

  159.                        }  

  160.                        break;  

  161.  

  162.                    default :  

  163.                        System.exit(0);  

  164.                }  

  165.  

  166.                session.commit();  

  167.                session.close();  

  168.  

  169.                if (takeRest != 0)  

  170.                    Thread.sleep(takeRest);  

  171.  

  172.                log.info("[Scanner] State Stage [" + stateMechine + "]");  

  173.  

  174.            } catch (InterruptedException e) {  

  175.                log.fatal("[System] Thread InterruptedException : " + e.getMessage());  

  176.            } finally {  

  177.                log.info("[Scanner] UniqueID : " + uniqueID + ", Mode : " + (online?"online":"offline"));  

  178.            }  

  179.        }  

  180.  

  181.    }  

  182.  

  183.    enum Status {  

  184.        START, HEARTBEAT, CHALLENGE_MASTER, CHALLENGE_REGISTER, CHALLENGE_COMPLETE, OFFLINE  

  185.    }  



5进一步的优化
       一、在各个系统竞争Master时,可能因为节点太多,冲突概率较大,可以通过在数据库中增加字段Status状态字段,标识是否有其他节点正在争抢Master,如果是,则可以暂停等一下,然后在尝试,如果那个节点成功抢到了Master,则会省去很多节点冲突的概率。
       
       二、由于出现很极端的情况,因为竞争Master的时间和lease时间都是固定的,则可能出现”时间轴共振“的现象,最典型的如一直在竞争Master但是一直失败,然后一直重试。所有的server在同一时刻都在赶同样的事情。可以通过增加时间随机性解决问题,如尝试抢占Master连续失败,则通过random产生随机数然后sleep,抵消共振。

建议继续学习:

  1. 分布式缓存系统 Memcached 入门    (阅读:14827)
  2. Zookeeper工作原理    (阅读:10603)
  3. GFS, HDFS, Blob File System架构对比    (阅读:9481)
  4. Zookeeper研究和应用    (阅读:8617)
  5. 分布式日志系统scribe使用手记    (阅读:8132)
  6. 一致性哈希算法及其在分布式系统中的应用    (阅读:8032)
  7. 分布式哈希和一致性哈希    (阅读:7771)
  8. HBase技术介绍    (阅读:6860)
  9. 分布式系统的事务处理    (阅读:6160)
  10. Memcache分布式部署方案    (阅读:5547)
QQ技术交流群:445447336,欢迎加入!
扫一扫订阅我的微信号:IT技术博客大学习
© 2009 - 2025 by blogread.cn 微博:@IT技术博客大学习

京ICP备15002552号-1