国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專欄INFORMATION COLUMN

zookeeper-數據同步源碼分析

plus2047 / 1697人閱讀

摘要:只有數據同步完成之后集群才具備對外提供服務的能力。當節點在選舉后角色確認為后將會進入狀態,源碼如下在節點狀態變更為之后會創建實例,并觸發過程。

在上一篇對 zookeeper 選舉實現分析之后,我們知道 zookeeper 集群在選舉結束之后,leader 節點將進入 LEADING 狀態,follower 節點將進入 FOLLOWING 狀態;此時集群中節點將進行數據同步操作,以保證數據一致。 只有數據同步完成之后 zookeeper 集群才具備對外提供服務的能力。

LEADING

當節點在選舉后角色確認為 leader 后將會進入 LEADING 狀態,源碼如下:

public void run() {
    try {
        /*
         * Main loop
         */
        while (running) {
            switch (getPeerState()) {
            case LEADING:
                LOG.info("LEADING");
                try {
                    setLeader(makeLeader(logFactory));
                    leader.lead();
                    setLeader(null);
                } catch (Exception e) {
                    LOG.warn("Unexpected exception",e);
                } finally {
                    if (leader != null) {
                        leader.shutdown("Forcing shutdown");
                        setLeader(null);
                    }
                    setPeerState(ServerState.LOOKING);
                }
                break;
            }
        }
    } finally {
        
    }
}

QuorumPeer 在節點狀態變更為 LEADING 之后會創建 leader 實例,并觸發 lead 過程。

void lead() throws IOException, InterruptedException {
    try {
        // 省略

        /**
         * 開啟線程用于接收 follower 的連接請求
         */
        cnxAcceptor = new LearnerCnxAcceptor();
        cnxAcceptor.start();
        
        readyToStart = true;

        /**
         * 阻塞等待計算新的 epoch 值,并設置 zxid
         */
        long epoch = getEpochToPropose(self.getId(), self.getAcceptedEpoch());          
        zk.setZxid(ZxidUtils.makeZxid(epoch, 0));
        
        
        /**
         * 阻塞等待接收過半的 follower 節點發送的 ACKEPOCH 信息; 此時說明已經確定了本輪選舉后 epoch 值
         */
        waitForEpochAck(self.getId(), leaderStateSummary);
        self.setCurrentEpoch(epoch);

        try {
            /**
             * 阻塞等待 超過半數的節點 follower 發送了 NEWLEADER ACK 信息;此時說明過半的 follower 節點已經完成數據同步
             */
            waitForNewLeaderAck(self.getId(), zk.getZxid(), LearnerType.PARTICIPANT);
        } catch (InterruptedException e) {
            // 省略
        }

        /**
         * 啟動 zk server,此時集群可以對外正式提供服務
         */
        startZkServer();

        // 省略
}

lead 方法的實現可得知,leaderfollower 在數據同步過程中會執行如下過程:

接收 follower 連接

計算新的 epoch 值

通知統一 epoch 值

數據同步

啟動 zk server 對外提供服務

FOLLOWING

下面在看下 follower 節點進入 FOLLOWING 狀態后的操作:

public void run() {
    try {
        /*
         * Main loop
         */
        while (running) {
            switch (getPeerState()) {
            case LOOKING:
                // 省略
            case OBSERVING:
                // 省略
            case FOLLOWING:
                try {
                    LOG.info("FOLLOWING");
                    setFollower(makeFollower(logFactory));
                    follower.followLeader();
                } catch (Exception e) {
                    LOG.warn("Unexpected exception",e);
                } finally {
                    follower.shutdown();
                    setFollower(null);
                    setPeerState(ServerState.LOOKING);
                }
                break;
            }
        }
    } finally {
        
    }
}

QuorumPeer 在節點狀態變更為 FOLLOWING 之后會創建 follower 實例,并觸發 followLeader 過程。

void followLeader() throws InterruptedException {
    // 省略
    try {
        QuorumServer leaderServer = findLeader();            
        try {
            /**
             * follower 與 leader 建立連接
             */
            connectToLeader(leaderServer.addr, leaderServer.hostname);

            /**
             * follower 向 leader 提交節點信息用于計算新的 epoch 值
             */
            long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);

            
            /**
             * follower 與 leader 數據同步
             */
            syncWithLeader(newEpochZxid);                
            
             // 省略

        } catch (Exception e) {
             // 省略
        }
    } finally {
        // 省略
    }
}

followLeader 方法的實現可得知,followerleader 在數據同步過程中會執行如下過程:

請求連接 leader

提交節點信息計算新的 epoch 值

數據同步

下面我們看下在各個環節的實現細節;

Leader Follower 建立通信
follower 請求連接
protected QuorumServer findLeader() {
    QuorumServer leaderServer = null;
    // Find the leader by id
    Vote current = self.getCurrentVote();
    for (QuorumServer s : self.getView().values()) {
        if (s.id == current.getId()) {
            // Ensure we have the leader"s correct IP address before
            // attempting to connect.
            s.recreateSocketAddresses();
            leaderServer = s;
            break;
        }
    }
    if (leaderServer == null) {
        LOG.warn("Couldn"t find the leader with id = "
                + current.getId());
    }
    return leaderServer;
}           
protected void connectToLeader(InetSocketAddress addr, String hostname)
            throws IOException, ConnectException, InterruptedException {
    sock = new Socket();        
    sock.setSoTimeout(self.tickTime * self.initLimit);
    for (int tries = 0; tries < 5; tries++) {
        try {
            sock.connect(addr, self.tickTime * self.syncLimit);
            sock.setTcpNoDelay(nodelay);
            break;
        } catch (IOException e) {
            if (tries == 4) {
                LOG.error("Unexpected exception",e);
                throw e;
            } else {
                LOG.warn("Unexpected exception, tries="+tries+
                        ", connecting to " + addr,e);
                sock = new Socket();
                sock.setSoTimeout(self.tickTime * self.initLimit);
            }
        }
        Thread.sleep(1000);
    }

    self.authLearner.authenticate(sock, hostname);

    leaderIs = BinaryInputArchive.getArchive(new BufferedInputStream(
            sock.getInputStream()));
    bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
    leaderOs = BinaryOutputArchive.getArchive(bufferedOutput);
}

follower 會通過選舉后的投票信息確認 leader 節點地址,并發起連接(總共有 5 次嘗試連接的機會,若連接不通則重新進入選舉過程)

leader 接收連接
class LearnerCnxAcceptor extends ZooKeeperThread{
    private volatile boolean stop = false;

    public LearnerCnxAcceptor() {
        super("LearnerCnxAcceptor-" + ss.getLocalSocketAddress());
    }

    @Override
    public void run() {
        try {
            while (!stop) {
                try{
                    /**
                     * 接收 follower 的連接,并開啟 LearnerHandler 線程用于處理二者之間的通信
                     */
                    Socket s = ss.accept();
                    s.setSoTimeout(self.tickTime * self.initLimit);
                    s.setTcpNoDelay(nodelay);

                    BufferedInputStream is = new BufferedInputStream(
                            s.getInputStream());
                    LearnerHandler fh = new LearnerHandler(s, is, Leader.this);
                    fh.start();
                } catch (SocketException e) {
                    // 省略
                } catch (SaslException e){
                    LOG.error("Exception while connecting to quorum learner", e);
                }
            }
        } catch (Exception e) {
            LOG.warn("Exception while accepting follower", e);
        }
    }
}

LearnerCnxAcceptor 實現可以看出 leader 節點在為每個 follower 節點連接建立之后都會為之分配一個 LearnerHandler 線程用于處理二者之間的通信。

計算新的 epoch 值
follower 在與 leader 建立連接之后,會發出 FOLLOWERINFO 信息
long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);
protected long registerWithLeader(int pktType) throws IOException{
    /**
     * 發送 follower info 信息,包括 last zxid 和 sid
     */
    long lastLoggedZxid = self.getLastLoggedZxid();
    QuorumPacket qp = new QuorumPacket();                
    qp.setType(pktType);
    qp.setZxid(ZxidUtils.makeZxid(self.getAcceptedEpoch(), 0));
    
    /*
     * Add sid to payload
     */
    LearnerInfo li = new LearnerInfo(self.getId(), 0x10000);
    ByteArrayOutputStream bsid = new ByteArrayOutputStream();
    BinaryOutputArchive boa = BinaryOutputArchive.getArchive(bsid);
    boa.writeRecord(li, "LearnerInfo");
    qp.setData(bsid.toByteArray());
    
    /**
     * follower 向 leader 發送 FOLLOWERINFO 信息,包括 zxid,sid,protocol version
     */
    writePacket(qp, true);
    
    // 省略
} 

接下來我們看下 leader 在接收到 FOLLOWERINFO 信息之后做什么(參考 LearnerHandler)

public void run() {
    try {
        // 省略
        /**
         * leader 接收 follower 發送的 FOLLOWERINFO 信息,包括 follower 節點的 zxid,sid,protocol version
         * @see Learner.registerWithleader()
         */
        QuorumPacket qp = new QuorumPacket();
        ia.readRecord(qp, "packet");

        byte learnerInfoData[] = qp.getData();
        if (learnerInfoData != null) {
            if (learnerInfoData.length == 8) {
                // 省略
            } else {
                /**
                 * 高版本的 learnerInfoData 包括 long 類型的 sid, int 類型的 protocol version 占用 12 字節
                 */
                LearnerInfo li = new LearnerInfo();
                ByteBufferInputStream.byteBuffer2Record(ByteBuffer.wrap(learnerInfoData), li);
                this.sid = li.getServerid();
                this.version = li.getProtocolVersion();
            }
        }

        /**
         * 通過 follower 發送的 zxid,解析出 foloower 節點的 epoch 值
         */
        long lastAcceptedEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());
        
        long peerLastZxid;
        StateSummary ss = null;
        long zxid = qp.getZxid();

        /**
         * 阻塞等待計算新的 epoch 值
         */
        long newEpoch = leader.getEpochToPropose(this.getSid(), lastAcceptedEpoch);
      
        // 省略
    }

從上述代碼可知,leader 在接收到 follower 發送的 FOLLOWERINFO 信息之后,會解析出 follower 節點的 acceptedEpoch 值并參與到新的 epoch 值計算中。 (具體計算邏輯參考方法 getEpochToPropose

public long getEpochToPropose(long sid, long lastAcceptedEpoch) throws InterruptedException, IOException {
    synchronized(connectingFollowers) {
        if (!waitingForNewEpoch) {
            return epoch;
        }
        // epoch 用來記錄計算后的選舉周期值
        // follower 或 leader 的 acceptedEpoch 值與 epoch 比較;若前者大則將其加一
        if (lastAcceptedEpoch >= epoch) {
            epoch = lastAcceptedEpoch+1;
        }
        // connectingFollowers 用來記錄與 leader 已連接的 follower
        connectingFollowers.add(sid);
        QuorumVerifier verifier = self.getQuorumVerifier();
        // 判斷是否已計算出新的 epoch 值的條件是 leader 已經參與了 epoch 值計算,以及超過一半的節點參與了計算
        if (connectingFollowers.contains(self.getId()) && 
                                        verifier.containsQuorum(connectingFollowers)) {
            // 將 waitingForNewEpoch 設置為 false 說明不需要等待計算新的 epoch 值了
            waitingForNewEpoch = false;
            // 設置 leader 的 acceptedEpoch 值
            self.setAcceptedEpoch(epoch);
            // 喚醒 connectingFollowers wait 的線程
            connectingFollowers.notifyAll();
        } else {
            long start = Time.currentElapsedTime();
            long cur = start;
            long end = start + self.getInitLimit()*self.getTickTime();
            while(waitingForNewEpoch && cur < end) {
                // 若未完成新的 epoch 值計算則阻塞等待
                connectingFollowers.wait(end - cur);
                cur = Time.currentElapsedTime();
            }
            if (waitingForNewEpoch) {
                throw new InterruptedException("Timeout while waiting for epoch from quorum");        
            }
        }
        return epoch;
    }
}

從方法 getEpochToPropose 可知 leader 會收集集群中過半的 follower acceptedEpoch 信息后,選出一個最大值然后加 1 就是 newEpoch 值; 在此過程中 leader 會進入阻塞狀態直到過半的 follower 參與到計算才會進入下一階段。

通知新的 epoch 值

leader 在計算出新的 newEpoch 值后,會進入下一階段發送 LEADERINFO 信息 (同樣參考 LearnerHandler

public void run() {
    try {
        // 省略

        /**
         * 阻塞等待計算新的 epoch 值
         */
        long newEpoch = leader.getEpochToPropose(this.getSid(), lastAcceptedEpoch);
            
        if (this.getVersion() < 0x10000) {
            // we are going to have to extrapolate the epoch information
            long epoch = ZxidUtils.getEpochFromZxid(zxid);
            ss = new StateSummary(epoch, zxid);
            // fake the message
            leader.waitForEpochAck(this.getSid(), ss);
        } else {
            byte ver[] = new byte[4];
            ByteBuffer.wrap(ver).putInt(0x10000);
            /**
             * 計算出新的 epoch 值后,leader 向 follower 發送 LEADERINFO 信息;包括新的 newEpoch
             */
            QuorumPacket newEpochPacket = new QuorumPacket(Leader.LEADERINFO, ZxidUtils.makeZxid(newEpoch, 0), ver, null);
            oa.writeRecord(newEpochPacket, "packet");
            bufferedOutput.flush();

               // 省略
        }
    }
    // 省略
}
protected long registerWithLeader(int pktType) throws IOException{
    // 省略

    /**
     * follower 向 leader 發送 FOLLOWERINFO 信息,包括 zxid,sid,protocol version
     */
    writePacket(qp, true);

    /**
     * follower 接收 leader 發送的 LEADERINFO 信息
     */
    readPacket(qp);

    /**
     * 解析 leader 發送的 new epoch 值
     */        
    final long newEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());
    if (qp.getType() == Leader.LEADERINFO) {
        // we are connected to a 1.0 server so accept the new epoch and read the next packet
        leaderProtocolVersion = ByteBuffer.wrap(qp.getData()).getInt();
        byte epochBytes[] = new byte[4];
        final ByteBuffer wrappedEpochBytes = ByteBuffer.wrap(epochBytes);

        /**
         * new epoch > current accepted epoch 則更新 acceptedEpoch 值
         */
        if (newEpoch > self.getAcceptedEpoch()) {
            wrappedEpochBytes.putInt((int)self.getCurrentEpoch());
            self.setAcceptedEpoch(newEpoch);
        } else if (newEpoch == self.getAcceptedEpoch()) {           
            wrappedEpochBytes.putInt(-1);
        } else {
            throw new IOException("Leaders epoch, " + newEpoch + " is less than accepted epoch, " + self.getAcceptedEpoch());
        }

        /**
         * follower 向 leader 發送 ACKEPOCH 信息,包括 last zxid
         */
        QuorumPacket ackNewEpoch = new QuorumPacket(Leader.ACKEPOCH, lastLoggedZxid, epochBytes, null);
        writePacket(ackNewEpoch, true);
        return ZxidUtils.makeZxid(newEpoch, 0);
    } 
} 

從上述代碼可以看出在完成 newEpoch 值計算后的 leaderfollower 的交互過程:

leaderfollower 發送 LEADERINFO 信息,告知 follower 新的 epoch

follower 接收解析 LEADERINFO 信息,若 new epoch 值大于 current accepted epoch 值則更新 acceptedEpoch

followerleader 發送 ACKEPOCH 信息,反饋 leader 已收到新的 epoch 值,并附帶 follower 節點的 last zxid

數據同步
LearnerHandler 中 leader 在收到過半的 ACKEPOCH 信息之后將進入數據同步階段
public void run() {
        try {
            // 省略
            // peerLastZxid 為 follower 的 last zxid
            peerLastZxid = ss.getLastZxid();
            
            /* the default to send to the follower */
            int packetToSend = Leader.SNAP;
            long zxidToSend = 0;
            long leaderLastZxid = 0;
            /** the packets that the follower needs to get updates from **/
            long updates = peerLastZxid;
           
            ReentrantReadWriteLock lock = leader.zk.getZKDatabase().getLogLock();
            ReadLock rl = lock.readLock();
            try {
                rl.lock();        
                final long maxCommittedLog = leader.zk.getZKDatabase().getmaxCommittedLog();
                final long minCommittedLog = leader.zk.getZKDatabase().getminCommittedLog();

                LinkedList proposals = leader.zk.getZKDatabase().getCommittedLog();

                if (peerLastZxid == leader.zk.getZKDatabase().getDataTreeLastProcessedZxid()) {
                    /**
                     * follower 與 leader 的 zxid 相同說明 二者數據一致;同步方式為差量同步 DIFF,同步的zxid 為 peerLastZxid, 也就是不需要同步
                     */
                    packetToSend = Leader.DIFF;
                    zxidToSend = peerLastZxid;
                } else if (proposals.size() != 0) {
                    // peerLastZxid 介于 minCommittedLog ,maxCommittedLog 中間
                    if ((maxCommittedLog >= peerLastZxid)
                            && (minCommittedLog <= peerLastZxid)) {
                        /**
                         * 在遍歷 proposals 時,用來記錄上一個 proposal 的 zxid
                         */
                        long prevProposalZxid = minCommittedLog;

                        boolean firstPacket=true;
                        packetToSend = Leader.DIFF;
                        zxidToSend = maxCommittedLog;

                        for (Proposal propose: proposals) {
                            // 跳過 follower 已經存在的提案
                            if (propose.packet.getZxid() <= peerLastZxid) {
                                prevProposalZxid = propose.packet.getZxid();
                                continue;
                            } else {
                                if (firstPacket) {
                                    firstPacket = false;
                                    if (prevProposalZxid < peerLastZxid) {
                                        /**
                                         * 此時說明有部分 proposals 提案在 leader 節點上不存在,則需告訴 follower 丟棄這部分 proposals
                                         * 也就是告訴 follower 先執行回滾 TRUNC ,需要回滾到 prevProposalZxid 處,也就是 follower 需要丟棄 prevProposalZxid ~ peerLastZxid 范圍內的數據
                                         * 剩余的 proposals 則通過 DIFF 進行同步
                                         */
                                        packetToSend = Leader.TRUNC;                                        
                                        zxidToSend = prevProposalZxid;
                                        updates = zxidToSend;
                                    }
                                }

                                /**
                                 * 將剩余待 DIFF 同步的提案放入到隊列中,等待發送
                                 */
                                queuePacket(propose.packet);
                                /**
                                 * 每個提案后對應一個 COMMIT 報文
                                 */
                                QuorumPacket qcommit = new QuorumPacket(Leader.COMMIT, propose.packet.getZxid(),
                                        null, null);
                                queuePacket(qcommit);
                            }
                        }
                    } else if (peerLastZxid > maxCommittedLog) {                    
                        /**
                         * follower 的 zxid 比 leader 大 ,則告訴 follower 執行 TRUNC 回滾
                         */
                        packetToSend = Leader.TRUNC;
                        zxidToSend = maxCommittedLog;
                        updates = zxidToSend;
                    } else {
                    }
                } 

            } finally {
                rl.unlock();
            }

             QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,
                    ZxidUtils.makeZxid(newEpoch, 0), null, null);
             if (getVersion() < 0x10000) {
                oa.writeRecord(newLeaderQP, "packet");
            } else {
                 // 數據同步完成之后會發送 NEWLEADER 信息
                queuedPackets.add(newLeaderQP);
            }
            bufferedOutput.flush();
            //Need to set the zxidToSend to the latest zxid
            if (packetToSend == Leader.SNAP) {
                zxidToSend = leader.zk.getZKDatabase().getDataTreeLastProcessedZxid();
            }
            /**
             * 發送數據同步方式信息,告訴 follower 按什么方式進行數據同步
             */
            oa.writeRecord(new QuorumPacket(packetToSend, zxidToSend, null, null), "packet");
            bufferedOutput.flush();
            
            /* if we are not truncating or sending a diff just send a snapshot */
            if (packetToSend == Leader.SNAP) {
                /**
                 * 如果是全量同步的話,則將 leader 本地數據序列化寫入 follower 的輸出流
                 */
                leader.zk.getZKDatabase().serializeSnapshot(oa);
                oa.writeString("BenWasHere", "signature");
            }
            bufferedOutput.flush();
            
            /**
             * 開啟個線程執行 packet 發送
             */
            sendPackets();
            
            /**
             * 接收 follower ack 響應
             */
            qp = new QuorumPacket();
            ia.readRecord(qp, "packet");

            /**
             * 阻塞等待過半的 follower ack
             */
            leader.waitForNewLeaderAck(getSid(), qp.getZxid(), getLearnerType());

            /**
             * leader 向 follower 發送 UPTODATE,告知其可對外提供服務
             */
            queuedPackets.add(new QuorumPacket(Leader.UPTODATE, -1, null, null));

            // 省略
        } 
    }

從上述代碼可以看出 leaderfollower 在進行數據同步時會通過 peerLastZxidmaxCommittedLogminCommittedLog 兩個值比較最終決定數據同步方式。

DIFF(差異化同步)

followerpeerLastZxid 等于 leaderpeerLastZxid

此時說明 followerleader 數據一致,采用 DIFF 方式同步,也即是無需同步

followerpeerLastZxid 介于 maxCommittedLogminCommittedLog 兩者之間

此時說明 followerleader 數據存在差異,需對差異的部分進行同步;首先 leader 會向 follower 發送 DIFF 報文告知其同步方式,隨后會發送差異的提案及提案提交報文

交互流程如下:

    Leader                 Follower

      |          DIFF         |  
      | --------------------> |
      |        PROPOSAL       |  
      | --------------------> |  
      |         COMMIT        |  
      | --------------------> |
      |        PROPOSAL       |  
      | --------------------> |  
      |         COMMIT        |  
      | --------------------> |
         

示例: 假設 leader 節點的提案緩存隊列對應的 zxid 依次是:

 0x500000001, 0x500000002, 0x500000003, 0x500000004, 0x500000005

follower 節點的 peerLastZxid0x500000003,則需要將 0x5000000040x500000005 兩個提案進行同步;那么數據包發送過程如下表:

報文類型 ZXID
DIFF 0x500000005
PROPOSAL 0x500000004
COMMIT 0x500000004
PROPOSAL 0x500000005
COMMIT 0x500000005
TRUNC+DIFF(先回滾再差異化同步)
在上文 DIFF 差異化同步時會存在一個特殊場景就是 雖然 followerpeerLastZxid 介于 maxCommittedLogminCommittedLog 兩者之間,但是 followerpeerLastZxidleader 節點中不存在; 此時 leader 需告知 follower 先回滾到 peerLastZxid 的前一個 zxid, 回滾后再進行差異化同步。

交互流程如下:

    Leader                 Follower

      |         TRUNC         |  
      | --------------------> |
      |        PROPOSAL       |  
      | --------------------> |  
      |         COMMIT        |  
      | --------------------> |
      |        PROPOSAL       |  
      | --------------------> |  
      |         COMMIT        |  
      | --------------------> |
         

示例: 假設集群中三臺節點 A, B, C 某一時刻 A 為 Leader 選舉周期為 5, zxid 包括: (0x500000004, 0x500000005, 0x500000006); 假設某一時刻 leader A 節點在處理完事務為 0x500000007 的請求進行廣播時 leader A 節點服務器宕機導致 0x500000007 該事物沒有被同步出去;在集群進行下一輪選舉之后 B 節點成為新的 leader,選舉周期為 6 對外提供服務處理了新的事務請求包括 0x600000001, 0x600000002;

集群節點 ZXID 列表
A 0x500000004, 0x500000005, 0x500000006, 0x500000007
B 0x500000004, 0x500000005, 0x500000006, 0x600000001, 0x600000002
C 0x500000004, 0x500000005, 0x500000006, 0x600000001, 0x600000002

此時節點 A 在重啟加入集群后,在與 leader B 節點進行數據同步時會發現事務 0x500000007 在 leader 節點中并不存在,此時 leader 告知 A 需先回滾事務到 0x500000006,在差異同步事務 0x600000001,0x600000002;那么數據包發送過程如下表:

報文類型 ZXID
TRUNC 0x500000006
PROPOSAL 0x600000001
COMMIT 0x600000001
PROPOSAL 0x600000002
COMMIT 0x600000002
TRUNC(回滾同步)
followerpeerLastZxid 大于 leadermaxCommittedLog,則告知 follower 回滾至 maxCommittedLog; 該場景可以認為是 TRUNC+DIFF 的簡化模式

交互流程如下:

    Leader                 Follower

      |         TRUNC         |  
      | --------------------> |
         
SNAP(全量同步)
followerpeerLastZxid 小于 leaderminCommittedLog 或者 leader 節點上不存在提案緩存隊列時,將采用 SNAP 全量同步方式。 該模式下 leader 首先會向 follower 發送 SNAP 報文,隨后從內存數據庫中獲取全量數據序列化傳輸給 followerfollower 在接收全量數據后會進行反序列化加載到內存數據庫中。

交互流程如下:

    Leader                 Follower

      |         SNAP          |  
      | --------------------> |
      |         DATA          |  
      | --------------------> |
         

leader 在完成數據同步之后,會向 follower 發送 NEWLEADER 報文,在收到過半的 follower 響應的 ACK 之后此時說明過半的節點完成了數據同步,接下來 leader 會向 follower 發送 UPTODATE 報文告知 follower 節點可以對外提供服務了,此時 leader 會啟動 zk server 開始對外提供服務。

FOLLOWER 數據同步
下面我們在看下數據同步階段 FOLLOWER 是如何處理的,參考 Learner.syncWithLeader
protected void syncWithLeader(long newLeaderZxid) throws IOException, InterruptedException{
        QuorumPacket ack = new QuorumPacket(Leader.ACK, 0, null, null);
        QuorumPacket qp = new QuorumPacket();
        long newEpoch = ZxidUtils.getEpochFromZxid(newLeaderZxid);

        /**
         * 接收 leader 發送的數據同步方式報文
         */
        readPacket(qp);
        
        synchronized (zk) {
            if (qp.getType() == Leader.DIFF) {
                
            }
            else if (qp.getType() == Leader.SNAP) {
                // 執行加載全量數據
            } else if (qp.getType() == Leader.TRUNC) {
                // 執行回滾
            }
            else {
            
            }
            
            outerLoop:
            while (self.isRunning()) {
                readPacket(qp);
                switch(qp.getType()) {
                case Leader.PROPOSAL:
                    // 處理提案
                    break;
                case Leader.COMMIT:
                    // commit proposal
                    break;
                case Leader.INFORM:
                    // 忽略
                    break;
                case Leader.UPTODATE:
                    // 設置 zk server
                    self.cnxnFactory.setZooKeeperServer(zk);
                    // 退出循環                
                    break outerLoop;
                case Leader.NEWLEADER: // Getting NEWLEADER here instead of in discovery 
                    /**
                     * follower 響應 NEWLEADER ACK
                     */
                    writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null), true);
                    break;
                }
            }
        }
        ack.setZxid(ZxidUtils.makeZxid(newEpoch, 0));
        writePacket(ack, true);
        // 啟動 zk server
        zk.startup();
        
    }

從上述代碼中可以看出 follower 在數據同步階段的處理流程如下:

follower 接收 leader 發送的數據同步方式(DIFF/TRUNC/SANP)報文并進行相應處理

follower 收到 leader 發送的 NEWLEADER 報文后,會向 leader 響應 ACK (leader 在收到過半的 ACK 消息之后會發送 UPTODATE)

follower 收到 leader 發送的 UPTODATE 報文后,說明此時可以對外提供服務,此時將啟動 zk server

小結

最后用一張圖總結下 zk 在完成選舉后數據同步的過程如下圖所示:

文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。

轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/74515.html

相關文章

  • 高并發

    摘要:表示的是兩個,當其中任意一個計算完并發編程之是線程安全并且高效的,在并發編程中經常可見它的使用,在開始分析它的高并發實現機制前,先講講廢話,看看它是如何被引入的。電商秒殺和搶購,是兩個比較典型的互聯網高并發場景。 干貨:深度剖析分布式搜索引擎設計 分布式,高可用,和機器學習一樣,最近幾年被提及得最多的名詞,聽名字多牛逼,來,我們一步一步來擊破前兩個名詞,今天我們首先來說說分布式。 探究...

    supernavy 評論0 收藏0
  • 高并發

    摘要:表示的是兩個,當其中任意一個計算完并發編程之是線程安全并且高效的,在并發編程中經常可見它的使用,在開始分析它的高并發實現機制前,先講講廢話,看看它是如何被引入的。電商秒殺和搶購,是兩個比較典型的互聯網高并發場景。 干貨:深度剖析分布式搜索引擎設計 分布式,高可用,和機器學習一樣,最近幾年被提及得最多的名詞,聽名字多牛逼,來,我們一步一步來擊破前兩個名詞,今天我們首先來說說分布式。 探究...

    ddongjian0000 評論0 收藏0
  • 高并發

    摘要:表示的是兩個,當其中任意一個計算完并發編程之是線程安全并且高效的,在并發編程中經常可見它的使用,在開始分析它的高并發實現機制前,先講講廢話,看看它是如何被引入的。電商秒殺和搶購,是兩個比較典型的互聯網高并發場景。 干貨:深度剖析分布式搜索引擎設計 分布式,高可用,和機器學習一樣,最近幾年被提及得最多的名詞,聽名字多牛逼,來,我們一步一步來擊破前兩個名詞,今天我們首先來說說分布式。 探究...

    wangdai 評論0 收藏0
  • 高并發 - 收藏集 - 掘金

    摘要:在中一般來說通過來創建所需要的線程池,如高并發原理初探后端掘金閱前熱身為了更加形象的說明同步異步阻塞非阻塞,我們以小明去買奶茶為例。 AbstractQueuedSynchronizer 超詳細原理解析 - 后端 - 掘金今天我們來研究學習一下AbstractQueuedSynchronizer類的相關原理,java.util.concurrent包中很多類都依賴于這個類所提供的隊列式...

    levius 評論0 收藏0
  • 高并發 - 收藏集 - 掘金

    摘要:在中一般來說通過來創建所需要的線程池,如高并發原理初探后端掘金閱前熱身為了更加形象的說明同步異步阻塞非阻塞,我們以小明去買奶茶為例。 AbstractQueuedSynchronizer 超詳細原理解析 - 后端 - 掘金今天我們來研究學習一下AbstractQueuedSynchronizer類的相關原理,java.util.concurrent包中很多類都依賴于這個類所提供的隊列式...

    fantix 評論0 收藏0

發表評論

0條評論

plus2047

|高級講師

TA的文章

閱讀更多
最新活動
閱讀需要支付1元查看
<