国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專欄INFORMATION COLUMN

zookeeper-選舉源碼分析

阿羅 / 1929人閱讀

摘要:在集群中發生選舉的場景有以下三種集群啟動時節點重啟時節點重啟時本文主要針對集群啟動時發生的選舉實現進行分析。

zookeeper 集群中發生選舉的場景有以下三種:

集群啟動時

Leader 節點重啟時

Follower 節點重啟時

本文主要針對集群啟動時發生的選舉實現進行分析。

ZK 集群中節點在啟動時會調用QuorumPeer.start方法
public synchronized void start() {
    /**
     * 加載數據文件,獲取 lastProcessedZxid, currentEpoch,acceptedEpoch
     */
    loadDataBase();

    /**
     * 啟動主線程 用于處理客戶端連接請求
     */
    cnxnFactory.start();

    /**
     * 開始 leader 選舉; 會相繼創建選舉算法的實現,創建當前節點與集群中其他節點選舉通信的網絡IO,并啟動相應工作線程
     */
    startLeaderElection();

    /**
     * 啟動 QuorumPeer 線程,監聽當前節點服務狀態
     */
    super.start();
}
加載數據文件

loadDataBase 方法中,ZK 會通過加載數據文件獲取 lastProcessedZxid , 并通過讀取 currentEpoch , acceptedEpoch 文件來獲取相對應的值;若上述兩文件不存在,則以 lastProcessedZxid 的高 32 位作為 currentEpoch , acceptedEpoch 值并寫入對應文件中。

初始選舉環境
synchronized public void startLeaderElection() {
    try {
        // 創建投票
        currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
    } catch(IOException e) {
    }
    // 從集群中節點列表,查找當前節點與其他進行信息同步的地址
    for (QuorumServer p : getView().values()) {
        if (p.id == myid) {
            myQuorumAddr = p.addr;
            break;
        }
    }
    if (myQuorumAddr == null) {
        throw new RuntimeException("My id " + myid + " not in the peer list");
    }
    
    // electionType == 3
    this.electionAlg = createElectionAlgorithm(electionType);
}
protected Election createElectionAlgorithm(int electionAlgorithm){
    Election le=null;
            
    //TODO: use a factory rather than a switch
    switch (electionAlgorithm) {
        // 忽略其他算法的實現
    case 3:
        /**
         * 創建 QuorumCnxManager 實例,并啟動 QuorumCnxManager.Listener 線程用于與集群中其他節點進行選舉通信;
         */
        qcm = createCnxnManager();
        QuorumCnxManager.Listener listener = qcm.listener;
        if(listener != null){
            listener.start();
            /**
             * 創建選舉算法 FastLeaderElection 實例
             */
            le = new FastLeaderElection(this, qcm);
        } else {
            LOG.error("Null listener when initializing cnx manager");
        }
        break;
    default:
        assert false;
    }
    return le;
}

初始節點的相關實例之后,執行 super.start() 方法,因 QuorumPeer 類繼承 ZooKeeperThread 故會啟動 QuorumPeer 線程

public void run() {
        // 代碼省略
        try {
            /*
             * Main loop
             */
            while (running) {
                switch (getPeerState()) {
                case LOOKING:
                    LOG.info("LOOKING");

                    if (Boolean.getBoolean("readonlymode.enabled")) {
                        // 只讀模式下代碼省略
                    } else {
                        try {
                            setBCVote(null);
                            setCurrentVote(makeLEStrategy().lookForLeader());
                        } catch (Exception e) {
                            LOG.warn("Unexpected exception", e);
                            setPeerState(ServerState.LOOKING);
                        }
                    }
                    break;
                // 忽略其他狀態下的處理邏輯
                }
            }
        } finally {
            
        }
    }
選舉

從上述代碼可以看出 QuorumPeer 線程在運行過程中輪詢監聽當前節點的狀態并進行相應的邏輯處理,集群啟動時節點狀態為 LOOKING (也就是選舉 Leader 過程),此時會調用 FastLeaderElection.lookForLeader 方法 (也是投票選舉算法的核心)簡化后源碼如下:

public Vote lookForLeader() throws InterruptedException {
        // 忽略
        try {
            HashMap recvset = new HashMap();

            HashMap outofelection = new HashMap();

            int notTimeout = finalizeWait;

            synchronized(this){
                // logicalclock 邏輯時鐘加一
                logicalclock.incrementAndGet();
                /**
                 * 更新提案信息,用于后續投票;集群啟動節點默認選舉自身為 Leader
                 */
                updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
            }

            /**
             * 發送選舉投票提案
             */
            sendNotifications();

            /*
             * Loop in which we exchange notifications until we find a leader
             */

            while ((self.getPeerState() == ServerState.LOOKING) &&
                    (!stop)){
                /*
                 * Remove next notification from queue, times out after 2 times
                 * the termination time
                 */
                /**
                 * 從 recvqueue 隊列中獲取外部節點的選舉投票信息
                 */
                Notification n = recvqueue.poll(notTimeout,
                        TimeUnit.MILLISECONDS);

                /*
                 * Sends more notifications if haven"t received enough.
                 * Otherwise processes new notification.
                 */
                if(n == null){
                    /**
                     * 檢查上一次發送的選舉投票信息是否全部發送;
                     * 若已發送則重新在發送一遍,反之說明當前節點與集群中其他節點未連接,則執行 connectAll() 建立連接 
                     */
                    if(manager.haveDelivered()){
                        sendNotifications();
                    } else {
                        manager.connectAll();
                    }

                    /*
                     * Exponential backoff
                     */
                    int tmpTimeOut = notTimeout*2;
                    notTimeout = (tmpTimeOut < maxNotificationInterval?
                            tmpTimeOut : maxNotificationInterval);
                    LOG.info("Notification time out: " + notTimeout);
                }
                else if(self.getVotingView().containsKey(n.sid)) {
                    /**
                     * 只處理同一集群中節點的投票請求
                     */ 
                    switch (n.state) {
                    case LOOKING:
                        // If notification > current, replace and send messages out
                        if (n.electionEpoch > logicalclock.get()) {
                            /**
                             * 外部投票選舉周期大于當前節點選舉周期
                             * 
                             * step1 : 更新選舉周期值
                             * step2 : 清空已收到的選舉投票數據
                             * step3 : 選舉投票 PK,選舉規則參見 totalOrderPredicate 方法
                             * step4 : 變更選舉投票并發送
                             */
                            logicalclock.set(n.electionEpoch);
                            recvset.clear();
                            if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                    getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
                                updateProposal(n.leader, n.zxid, n.peerEpoch);
                            } else {
                                updateProposal(getInitId(),
                                        getInitLastLoggedZxid(),
                                        getPeerEpoch());
                            }
                            sendNotifications();
                        } else if (n.electionEpoch < logicalclock.get()) {
                            // 丟棄小于當前選舉周期的投票
                            break;
                        } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                proposedLeader, proposedZxid, proposedEpoch)) {
                            /**
                             * 同一選舉周期
                             *                            
                             * step1 : 選舉投票 PK,選舉規則參見 totalOrderPredicate 方法
                             * step2 : 變更選舉投票并發送
                             */
                            updateProposal(n.leader, n.zxid, n.peerEpoch);
                            sendNotifications();
                        }

                        /**
                         * 記錄外部選舉投票信息
                         */
                        recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));

                        /**
                         * 統計選舉投票結果,判斷是否可以結束此輪選舉
                         */
                        if (termPredicate(recvset,
                                new Vote(proposedLeader, proposedZxid,
                                        logicalclock.get(), proposedEpoch))) {

                            // ......
                            
                            if (n == null) {
                                /**
                                 * 選舉結束判斷當前節點狀態; 若提案的 leader == myid 則 state = LEADING, 反之為 FOLLOWING 
                                 */
                                self.setPeerState((proposedLeader == self.getId()) ?
                                        ServerState.LEADING: learningState());
                                // 變更當前投票信息
                                Vote endVote = new Vote(proposedLeader,
                                                        proposedZxid,
                                                        logicalclock.get(),
                                                        proposedEpoch);
                                leaveInstance(endVote);
                                return endVote;
                            }
                        }
                        break;
                    case OBSERVING:
                        LOG.debug("Notification from observer: " + n.sid);
                        break;
                    case FOLLOWING:
                    case LEADING:
                        // ...... 
                        break;
                    default:
                        LOG.warn("Notification state unrecognized: {} (n.state), {} (n.sid)",
                                n.state, n.sid);
                        break;
                    }
                } else {
                    LOG.warn("Ignoring notification from non-cluster member " + n.sid);
                }
            }
            return null;
        } finally {
            // ......
        }
    }

lookForLeader 方法的實現可以看出,選舉流程如下:

發送內部投票

內部投票發送邏輯參考后續小節

接收外部投票

接收外部投票邏輯參考后續小節

選舉投票 PK

當接收到外部節點投票信息后會與內部投票信息進行 PK 已確定投票優先權;PK 規則參見 totalOrderPredicate 方法如下

protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) {
    if(self.getQuorumVerifier().getWeight(newId) == 0){
        return false;
    }
    
    /*
     * We return true if one of the following three cases hold:
     * 1- New epoch is higher
     * 2- New epoch is the same as current epoch, but new zxid is higher
     * 3- New epoch is the same as current epoch, new zxid is the same
     *  as current zxid, but server id is higher.
     */
    return ((newEpoch > curEpoch) || 
            ((newEpoch == curEpoch) &&
            ((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId)))));
}

從其實現可以看出選舉投票 PK 規則如下:

* 比較外部投票與內部投票的選舉周期值,選舉周期大的值優先
* 若選舉周期值一致,則比較事務 ID; 事務 ID 最新的優先
* 若選舉周期值一致且事務 ID 值相同,則比較投票節點的 server id; server id 最大的優先

統計選舉投票

當接收到外部投票之后,都會統計下此輪選舉的投票情況并判斷是否可結束選舉; 參考 termPredicate 方法

protected boolean termPredicate(
            HashMap votes,
            Vote vote) {

    HashSet set = new HashSet();

    /**
     * 統計接收的投票中與當前節點所推舉 leader 投票一致的個數
     */
    for (Map.Entry entry : votes.entrySet()) {
        if (vote.equals(entry.getValue())){
            set.add(entry.getKey());
        }
    }

    /**
     * 如果超過一半的投票一致 則說明可以終止本次選舉
     */
    return self.getQuorumVerifier().containsQuorum(set);
}

確認節點角色

當此輪選舉結束之后,通過判斷所推舉的 leader server id 是否與當前節點 server id 相等; 若相等則說明當前節點為 leader, 反之為 follower。

發送接收投票
上文中主要聊了下 ZK 選舉算法的核心部分,下面接著看下集群節點在選舉過程中是如何發送自己的投票和接收外部的投票及相關處理邏輯。

首先通過 FastLeaderElection.sendNotifications 方法看下發送投票邏輯:

private void sendNotifications() {
    for (QuorumServer server : self.getVotingView().values()) {
        long sid = server.id;

        /**
         * 發送投票通知信息
         *
         * leader : 被推舉的服務器 myid
         * zxid : 被推舉的服務器 zxid
         * electionEpoch : 當前節點選舉周期
         * ServerState state : 當前節點狀態
         * sid : 消息接收方 myid
         * peerEpoch : 被推舉的服務器 epoch
         */
        ToSend notmsg = new ToSend(ToSend.mType.notification,
                proposedLeader,
                proposedZxid,
                logicalclock.get(),
                QuorumPeer.ServerState.LOOKING,
                sid,
                proposedEpoch);

        /**
         * 將消息添加到隊列 sendqueue 中;
         *
         * @see Messenger.WorkerSender sendqueue 隊列會被 WorkerSender 消費
         */
        sendqueue.offer(notmsg);
    }
}

從實現可以看出節點在啟動階段會將自身信息封裝為 ToSend 實例(也就是選舉自身為 leader)并添加到隊列 FastLeaderElection.sendqueue 中;那么此時我們會問到 FastLeaderElection.sendqueue 隊列中的消息被誰消費處理呢 ? 讓我們回過頭看下節點在啟動初始化選舉環境時創建 QuorumCnxManager, FastLeaderElection 實例的過程。

PS : FastLeaderElection.sendqueue 隊列中消息被誰消費 ?
QuorumCnxManager
public QuorumCnxManager(final long mySid,
                            Map view,
                            QuorumAuthServer authServer,
                            QuorumAuthLearner authLearner,
                            int socketTimeout,
                            boolean listenOnAllIPs,
                            int quorumCnxnThreadsSize,
                            boolean quorumSaslAuthEnabled,
                            ConcurrentHashMap senderWorkerMap) {
    this.senderWorkerMap = senderWorkerMap;
    this.recvQueue = new ArrayBlockingQueue(RECV_CAPACITY);
    this.queueSendMap = new ConcurrentHashMap>();

    this.lastMessageSent = new ConcurrentHashMap();
    String cnxToValue = System.getProperty("zookeeper.cnxTimeout");
    if(cnxToValue != null){
        this.cnxTO = Integer.parseInt(cnxToValue);
    }

    this.mySid = mySid;
    this.socketTimeout = socketTimeout;
    this.view = view;
    this.listenOnAllIPs = listenOnAllIPs;

    initializeAuth(mySid, authServer, authLearner, quorumCnxnThreadsSize,
            quorumSaslAuthEnabled);

    listener = new Listener();
}

QuorumCnxManager 實例化后,會啟動一個 QuorumCnxManager.Listener 線程;同時在 QuorumCnxManager 實例中存在三個重要的集合容器變量:

senderWorkerMap : 發送器集合,Map 類型按 server id 分組;為集群中的每個節點分配一個 SendWorker 負責消息的發送

recvQueue : 消息接收隊列,用于存放從外部節點接收到的投票消息

queueSendMap : 消息發送隊列,Map 類型按 server id 分組;為集群中的每個節點分配一個阻塞隊列存放待發送的消息,從而保證各個節點之間的消息發送互不影響

下面我們再看下 QuorumCnxManager.Listener 線程啟動后,主要做了什么:

public void run() {
    int numRetries = 0;
    InetSocketAddress addr;
    while((!shutdown) && (numRetries < 3)){
        try {
            ss = new ServerSocket();
            ss.setReuseAddress(true);

            /**
             * 獲取當前節點的選舉地址并 bind 監聽等待外部節點連接
             */
            addr = view.get(QuorumCnxManager.this.mySid).electionAddr;
            ss.bind(addr);

            while (!shutdown) {

                /**
                 * 接收外部節點連接并處理
                 */
                Socket client = ss.accept();
                setSockOpts(client);                
                receiveConnection(client);

                numRetries = 0;
            }
        } catch (IOException e) {
            LOG.error("Exception while listening", e);
            numRetries++;
            ss.close();
            Thread.sleep(1000);
        }
    }
}

跟蹤代碼發現 receiveConnection 方法最終會調用方法 handleConnection 如下

private void handleConnection(Socket sock, DataInputStream din)
            throws IOException {
    /**
     * 讀取外部節點的 server id 
     * ps : 此時的 server id 是什么時候發送的呢 ?
     */
    Long sid = din.readLong();
  
    if (sid < this.mySid) {
        /**
         * 若外部節點的 server id 小于當前節點的 server id,則關閉此連接,改為由當前節點發起連接
         * ps : 該限制說明選舉過程中,zk 只允許 server id 較大的一方去主動發起連接避免重復連接
         */
        SendWorker sw = senderWorkerMap.get(sid);
        if (sw != null) {
            sw.finish();
        }

        closeSocket(sock);
        connectOne(sid);
    } else {
        SendWorker sw = new SendWorker(sock, sid);
        RecvWorker rw = new RecvWorker(sock, din, sid, sw);
        sw.setRecv(rw);

        SendWorker vsw = senderWorkerMap.get(sid);
        
        if(vsw != null)
            vsw.finish();
        
        /**
         * 按 server id 分組,為外部節點分配 SendWorker, RecvWorker 和一個消息發送隊列
         */
        senderWorkerMap.put(sid, sw);
        queueSendMap.putIfAbsent(sid, new ArrayBlockingQueue(SEND_CAPACITY));
        
        /**
         * 啟動外部節點對應的 SendWorker, RecvWorker 線程
         */
        sw.start();
        rw.start();
        
        return;
    }
}

至此會發現 QuorumCnxManager.Listener 線程處理邏輯如下:

監聽當前節點的 election address 等待接收外部節點連接

讀取外部節點的 server id 并與當前節點的 server id 比較;若前者小則關閉連接,改由當前節點發起連接

反之為外部節點分配 SendWorker,RecvWorker 線程及消息發送隊列

PS : 此處我們會有個疑問外部節點的 server id 是什么時候發送過來的呢 ?

下面我們在看下為每個外部節點開啟了 SendWorkerRecvWorker 線程后做了什么:

SendWorker

public void run() {
    // 省略
    try {
        while (running && !shutdown && sock != null) {

            ByteBuffer b = null;
            try {
                /**
                 * 通過 server id 獲取待發送給集群中節點的消息隊列
                 */
                ArrayBlockingQueue bq = queueSendMap
                        .get(sid);
                if (bq != null) {
                    /**
                     * 從隊列中獲取待發送的消息
                     */
                    b = pollSendQueue(bq, 1000, TimeUnit.MILLISECONDS);
                } else {
                    LOG.error("No queue of incoming messages for " +
                              "server " + sid);
                    break;
                }

                if(b != null){
                    lastMessageSent.put(sid, b);
                    /**
                     * 寫入 socket 的輸出流完成消息的發送
                     */
                    send(b);
                }
            } catch (InterruptedException e) {               
            }
        }
    } catch (Exception e) {        
    }
}

synchronized void send(ByteBuffer b) throws IOException {
    byte[] msgBytes = new byte[b.capacity()];
    try {
        b.position(0);
        b.get(msgBytes);
    } catch (BufferUnderflowException be) {
        LOG.error("BufferUnderflowException ", be);
        return;
    }
    /**
     * 發送的報文包括:消息體正文長度和消息體正文
     */
    dout.writeInt(b.capacity());
    dout.write(b.array());
    dout.flush();
}

通過代碼實現我們知道 SendWorker 的職責就是從 queueSendMap 隊列中獲取待發送給遠程節點的消息并執行發送。

PS : 此處我們會有個疑問 QuorumCnxManager.queueSendMap 中節點對應隊列中待發送的消息是誰生產的呢 ?

RecvWorker

public void run() {
    threadCnt.incrementAndGet();
    try {
        while (running && !shutdown && sock != null) {
            /**
             * 讀取外部節點發送的消息
             * 由 SendWorker 可知前 4 字節為消息載體有效長度
             */
            int length = din.readInt();
            if (length <= 0 || length > PACKETMAXSIZE) {
                throw new IOException(
                        "Received packet with invalid packet: "
                                + length);
            }
            /**
             * 讀取消息體正文
             */
            byte[] msgArray = new byte[length];
            din.readFully(msgArray, 0, length);
            ByteBuffer message = ByteBuffer.wrap(msgArray);
            /**
             * 將讀取的消息包裝為 Message 對象添加到隊列 recvQueue 中
             */
            addToRecvQueue(new Message(message.duplicate(), sid));
        }
    } catch (Exception e) {
        LOG.warn("Connection broken for id " + sid + ", my id = "
                 + QuorumCnxManager.this.mySid + ", error = " , e);
    } finally {
        LOG.warn("Interrupting SendWorker");
        sw.finish();
        if (sock != null) {
            closeSocket(sock);
        }
    }
}

public void addToRecvQueue(Message msg) {
    synchronized(recvQLock) {
        // 省略
        try {
            recvQueue.add(msg);
        } catch (IllegalStateException ie) {
            // This should never happen
            LOG.error("Unable to insert element in the recvQueue " + ie);
        }
    }
}

從上面可以看出 RecvWorker 線程在運行期間會接收 server id 對應的外部節點發送的消息,并將其放入 QuorumCnxManager.recvQueue 隊列中。
到目前為止我們基本完成對 QuorumCnxManager 核心功能的分析,發現其功能主要是負責集群中當前節點與外部節點進行選舉通訊的網絡 IO 操作,譬如接收外部節點選舉投票和向外部節點發送內部投票。

FastLeaderElection

下面我們在接著回頭看下 FastLeaderElection 類實例的過程:

public FastLeaderElection(QuorumPeer self, QuorumCnxManager manager){
    this.stop = false;
    this.manager = manager;
    starter(self, manager);
}

private void starter(QuorumPeer self, QuorumCnxManager manager) {
    this.self = self;
    proposedLeader = -1;
    proposedZxid = -1;

    sendqueue = new LinkedBlockingQueue();
    recvqueue = new LinkedBlockingQueue();
    this.messenger = new Messenger(manager);
}
Messenger(QuorumCnxManager manager) {
    /**
     * 啟動 WorkerSender 線程用于發送消息
     */
    this.ws = new WorkerSender(manager);

    Thread t = new Thread(this.ws,
            "WorkerSender[myid=" + self.getId() + "]");
    t.setDaemon(true);
    t.start();

    /**
     * 啟動 WorkerReceiver 線程用于接收消息
     */
    this.wr = new WorkerReceiver(manager);

    t = new Thread(this.wr,
            "WorkerReceiver[myid=" + self.getId() + "]");
    t.setDaemon(true);
    t.start();
}

FastLeaderElection 實例化過程我們知道,其內部分別啟動了線程 WorkerSenderWorkerReceiver ;那么接下來看下這兩個線程具體做什么吧。

WorkerSender
public void run() {
    while (!stop) {
        try {
            /**
             * 從 sendqueue 隊列中獲取 ToSend 待發送的消息
             */ 
            ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS);
            if(m == null) continue;

            process(m);
        } catch (InterruptedException e) {
            break;
        }
    }
    LOG.info("WorkerSender is down");
}

void process(ToSend m) {
    // 將 ToSend 轉換為 40字節 ByteBuffer
    ByteBuffer requestBuffer = buildMsg(m.state.ordinal(), 
                                            m.leader,
                                            m.zxid, 
                                            m.electionEpoch, 
                                            m.peerEpoch);
    // 交由 QuorumCnxManager 執行發送
    manager.toSend(m.sid, requestBuffer);
}

看了 WorkerSender 的實現是不是明白了什么? 還記得上文中 FastLeaderElection.sendNotifications 方法執行發送通知的時候的疑惑嗎 ? FastLeaderElection.sendqueue 隊列產生的消息就是被 WorkerSender 線程所消費處理, WorkerSender 會將消息轉發至 QuorumCnxManager 處理

public void toSend(Long sid, ByteBuffer b) {
    /*
     * If sending message to myself, then simply enqueue it (loopback).
     * 如果是發給自己的投票,則將其添加到接收隊列中等待處理
     */
    if (this.mySid == sid) {
         b.position(0);
         addToRecvQueue(new Message(b.duplicate(), sid));
        /*
         * Otherwise send to the corresponding thread to send.
         */
    } else {
         /*
          * Start a new connection if doesn"t have one already.
          */
         ArrayBlockingQueue bq = new ArrayBlockingQueue(SEND_CAPACITY);
         ArrayBlockingQueue bqExisting = queueSendMap.putIfAbsent(sid, bq);

         // 將發送的消息放入對應的隊列中,若隊列滿了則將隊列頭部元素移除
         if (bqExisting != null) {
             addToSendQueue(bqExisting, b);
         } else {
             addToSendQueue(bq, b);
         }
         connectOne(sid);
            
    }
}

private void addToSendQueue(ArrayBlockingQueue queue,
          ByteBuffer buffer) {
    // 省略
    try {
        // 將消息插入節點對應的隊列中
        queue.add(buffer);
    } catch (IllegalStateException ie) {
    }
}

QuorumCnxManager 在收到 FastLeaderElection.WorkerSender 轉發的消息時,會判斷當前消息是否發給自己的投票,若是則將消息添加到接收隊列中,反之會將消息添加到 queueSendMap 對應 server id 的隊列中;看到這里的時候是不是就明白了在 QuorumCnxManager.SendWorker 分析時候的疑惑呢 。 這個時候投票消息未必能夠發送出去,因為當前節點與外部節點的通道是否已建立還未知,所以繼續執行 connectOne

synchronized public void connectOne(long sid){
    /**
     * 判斷當前服務節點是否與 sid 外部服務節點建立連接;有可能對方先發起連接
     * 若已連接則等待后續處理,反之發起連接
     */
    if (!connectedToPeer(sid)){
        InetSocketAddress electionAddr;
        if (view.containsKey(sid)) {
            electionAddr = view.get(sid).electionAddr;
        } else {
            LOG.warn("Invalid server id: " + sid);
            return;
        }
        try {

            LOG.debug("Opening channel to server " + sid);
            Socket sock = new Socket();
            setSockOpts(sock);
            sock.connect(view.get(sid).electionAddr, cnxTO);
            LOG.debug("Connected to server " + sid);

            initiateConnection(sock, sid);

        } catch (UnresolvedAddressException e) {
           
        } catch (IOException e) {
           
        }
    } else {
        LOG.debug("There is a connection already for server " + sid);
    }
}

public boolean connectedToPeer(long peerSid) {
    return senderWorkerMap.get(peerSid) != null;
}
private boolean startConnection(Socket sock, Long sid)
            throws IOException {
    DataOutputStream dout = null;
    DataInputStream din = null;
    try {
        /**
         * 發送當前節點的 server id,需告知對方我是哪臺節點
         */
        dout = new DataOutputStream(sock.getOutputStream());
        dout.writeLong(this.mySid);
        dout.flush();

        din = new DataInputStream(
                new BufferedInputStream(sock.getInputStream()));
    } catch (IOException e) {
        LOG.warn("Ignoring exception reading or writing challenge: ", e);
        closeSocket(sock);
        return false;
    }

    // 只允許 sid 值大的服務器去主動和其他服務器連接,否則斷開連接
    if (sid > this.mySid) {
        LOG.info("Have smaller server identifier, so dropping the " +
                 "connection: (" + sid + ", " + this.mySid + ")");
        closeSocket(sock);
        // Otherwise proceed with the connection
    } else {
        SendWorker sw = new SendWorker(sock, sid);
        RecvWorker rw = new RecvWorker(sock, din, sid, sw);
        sw.setRecv(rw);

        SendWorker vsw = senderWorkerMap.get(sid);
        
        if(vsw != null)
            vsw.finish();
        
        senderWorkerMap.put(sid, sw);
        queueSendMap.putIfAbsent(sid, new ArrayBlockingQueue(SEND_CAPACITY));
        
        sw.start();
        rw.start();
        
        return true;    
        
    }
    return false;
}

從上述代碼可以看出節點在與外部節點連接后會先發送 myid 報文告知對方我是哪個節點(這也是為什么 QuorumCnxManager.Listener 線程在接收到一個連接請求時會先執行 getLong 獲取 server id 了);同樣在連接建立的時候也遵循一個原則(只允許 server id 較大的一方發起連接)。

WorkerReceiver
public void run() {

    Message response;
    while (!stop) {
        // Sleeps on receive
        try{
            /**
             * 從 QuorumCnxManager.recvQueue 隊列中獲取接收的外部投票
             */
            response = manager.pollRecvQueue(3000, TimeUnit.MILLISECONDS);
            if(response == null) continue;
          
            if(!self.getVotingView().containsKey(response.sid)){
                // 忽略對方是觀察者的處理
            } else {
                // Instantiate Notification and set its attributes
                Notification n = new Notification();
                
                   // 將 message 轉成 notification 對象

                if(self.getPeerState() == QuorumPeer.ServerState.LOOKING){
                    // 當前節點狀態為 looking,則將外部節點投票添加到 recvqueue 隊列中
                    recvqueue.offer(n);

                    if((ackstate == QuorumPeer.ServerState.LOOKING)
                            && (n.electionEpoch < logicalclock.get())){
                        // 若外部節點選舉周期小于當前節點選舉周期則發送內部投票
                        Vote v = getVote();
                        ToSend notmsg = new ToSend(ToSend.mType.notification,
                                v.getId(),
                                v.getZxid(),
                                logicalclock.get(),
                                self.getPeerState(),
                                response.sid,
                                v.getPeerEpoch());
                        sendqueue.offer(notmsg);
                    }
                } else {
                    // 忽略其他狀態時的處理
                }
            }
        } catch (InterruptedException e) {
        }
    }
    LOG.info("WorkerReceiver is down");
}

此時我們明白 WorkerReceiver 線程在運行期間會一直從 QuorumCnxManager.recvQueue 的隊列中拉取接收到的外部投票信息,若當前節點為 LOOKING 狀態,則將外部投票信息添加到 FastLeaderElection.recvqueue 隊列中,等待 FastLeaderElection.lookForLeader 選舉算法處理投票信息。

到此我們基本明白了 ZK 集群節點發送和接收投票的處理流程,但是這個時候您是不是又有一種懵的狀態呢 笑哭,我們會發現選舉過程中依賴了多個線程 WorkerSender, SendWorker, WorkerReceiver, RecvWorker ,多個阻塞隊列 sendqueue, recvqueue,queueSendMap,recvQueue 而且名字起的很類似,更讓人懵 ; 不過莫慌,我們來通過下面的圖來縷下思路

小結

看了這么長時間的代碼,也夠累的;最后我們就來個小結吧 :

QuorumCnxManager 類主要職能是負責集群中節點與外部節點進行通信及投票信息的中轉

FastLeaderElection 類是選舉投票的核心實現

選舉投票規則

比較外部投票與內部投票的選舉周期值,選舉周期大的值優先

若選舉周期值一致,則比較事務 ID; 事務 ID 最新的優先

若選舉周期值一致且事務 ID 值相同,則比較投票節點的 server id; server id 最大的優先

集群中節點通信時為了避免重復建立連接,遵守一個原則:連接總是由 server id 較大的一方發起

文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。

轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/77695.html

相關文章

  • zookeeper-數據同步源碼分析

    摘要:只有數據同步完成之后集群才具備對外提供服務的能力。當節點在選舉后角色確認為后將會進入狀態,源碼如下在節點狀態變更為之后會創建實例,并觸發過程。 在上一篇對 zookeeper 選舉實現分析之后,我們知道 zookeeper 集群在選舉結束之后,leader 節點將進入 LEADING 狀態,follower 節點將進入 FOLLOWING 狀態;此時集群中節點將進行數據同步操作,以保證...

    plus2047 評論0 收藏0
  • Elasticsearch分布式一致性原理剖析(一)-節點篇

    摘要:摘要目前是最流行的開源分布式搜索引擎系統,其使用作為單機存儲引擎并提供強大的搜索查詢能力。前言分布式一致性原理剖析系列將會對的分布式一致性原理進行詳細的剖析,介紹其實現方式原理以及其存在的問題等基于版本。相當于一次正常情況的新節點加入。 摘要: ES目前是最流行的開源分布式搜索引擎系統,其使用Lucene作為單機存儲引擎并提供強大的搜索查詢能力。學習其搜索原理,則必須了解Lucene,...

    genedna 評論0 收藏0
  • Elasticsearch分布式一致性原理剖析(一)-節點篇

    摘要:摘要目前是最流行的開源分布式搜索引擎系統,其使用作為單機存儲引擎并提供強大的搜索查詢能力。前言分布式一致性原理剖析系列將會對的分布式一致性原理進行詳細的剖析,介紹其實現方式原理以及其存在的問題等基于版本。相當于一次正常情況的新節點加入。 摘要: ES目前是最流行的開源分布式搜索引擎系統,其使用Lucene作為單機存儲引擎并提供強大的搜索查詢能力。學習其搜索原理,則必須了解Lucene,...

    lindroid 評論0 收藏0

發表評論

0條評論

最新活動
閱讀需要支付1元查看
<