摘要:只有數據同步完成之后集群才具備對外提供服務的能力。當節點在選舉后角色確認為后將會進入狀態,源碼如下在節點狀態變更為之后會創建實例,并觸發過程。
在上一篇對 zookeeper 選舉實現分析之后,我們知道 zookeeper 集群在選舉結束之后,leader 節點將進入 LEADING 狀態,follower 節點將進入 FOLLOWING 狀態;此時集群中節點將進行數據同步操作,以保證數據一致。 只有數據同步完成之后 zookeeper 集群才具備對外提供服務的能力。
LEADING當節點在選舉后角色確認為 leader 后將會進入 LEADING 狀態,源碼如下:
public void run() { try { /* * Main loop */ while (running) { switch (getPeerState()) { case LEADING: LOG.info("LEADING"); try { setLeader(makeLeader(logFactory)); leader.lead(); setLeader(null); } catch (Exception e) { LOG.warn("Unexpected exception",e); } finally { if (leader != null) { leader.shutdown("Forcing shutdown"); setLeader(null); } setPeerState(ServerState.LOOKING); } break; } } } finally { } }
QuorumPeer 在節點狀態變更為 LEADING 之后會創建 leader 實例,并觸發 lead 過程。
void lead() throws IOException, InterruptedException { try { // 省略 /** * 開啟線程用于接收 follower 的連接請求 */ cnxAcceptor = new LearnerCnxAcceptor(); cnxAcceptor.start(); readyToStart = true; /** * 阻塞等待計算新的 epoch 值,并設置 zxid */ long epoch = getEpochToPropose(self.getId(), self.getAcceptedEpoch()); zk.setZxid(ZxidUtils.makeZxid(epoch, 0)); /** * 阻塞等待接收過半的 follower 節點發送的 ACKEPOCH 信息; 此時說明已經確定了本輪選舉后 epoch 值 */ waitForEpochAck(self.getId(), leaderStateSummary); self.setCurrentEpoch(epoch); try { /** * 阻塞等待 超過半數的節點 follower 發送了 NEWLEADER ACK 信息;此時說明過半的 follower 節點已經完成數據同步 */ waitForNewLeaderAck(self.getId(), zk.getZxid(), LearnerType.PARTICIPANT); } catch (InterruptedException e) { // 省略 } /** * 啟動 zk server,此時集群可以對外正式提供服務 */ startZkServer(); // 省略 }
從 lead 方法的實現可得知,leader 與 follower 在數據同步過程中會執行如下過程:
接收 follower 連接
計算新的 epoch 值
通知統一 epoch 值
數據同步
啟動 zk server 對外提供服務
FOLLOWING下面在看下 follower 節點進入 FOLLOWING 狀態后的操作:
public void run() { try { /* * Main loop */ while (running) { switch (getPeerState()) { case LOOKING: // 省略 case OBSERVING: // 省略 case FOLLOWING: try { LOG.info("FOLLOWING"); setFollower(makeFollower(logFactory)); follower.followLeader(); } catch (Exception e) { LOG.warn("Unexpected exception",e); } finally { follower.shutdown(); setFollower(null); setPeerState(ServerState.LOOKING); } break; } } } finally { } }
QuorumPeer 在節點狀態變更為 FOLLOWING 之后會創建 follower 實例,并觸發 followLeader 過程。
void followLeader() throws InterruptedException { // 省略 try { QuorumServer leaderServer = findLeader(); try { /** * follower 與 leader 建立連接 */ connectToLeader(leaderServer.addr, leaderServer.hostname); /** * follower 向 leader 提交節點信息用于計算新的 epoch 值 */ long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO); /** * follower 與 leader 數據同步 */ syncWithLeader(newEpochZxid); // 省略 } catch (Exception e) { // 省略 } } finally { // 省略 } }
從 followLeader 方法的實現可得知,follower 與 leader 在數據同步過程中會執行如下過程:
請求連接 leader
提交節點信息計算新的 epoch 值
數據同步
下面我們看下在各個環節的實現細節;
Leader Follower 建立通信protected QuorumServer findLeader() { QuorumServer leaderServer = null; // Find the leader by id Vote current = self.getCurrentVote(); for (QuorumServer s : self.getView().values()) { if (s.id == current.getId()) { // Ensure we have the leader"s correct IP address before // attempting to connect. s.recreateSocketAddresses(); leaderServer = s; break; } } if (leaderServer == null) { LOG.warn("Couldn"t find the leader with id = " + current.getId()); } return leaderServer; }
protected void connectToLeader(InetSocketAddress addr, String hostname) throws IOException, ConnectException, InterruptedException { sock = new Socket(); sock.setSoTimeout(self.tickTime * self.initLimit); for (int tries = 0; tries < 5; tries++) { try { sock.connect(addr, self.tickTime * self.syncLimit); sock.setTcpNoDelay(nodelay); break; } catch (IOException e) { if (tries == 4) { LOG.error("Unexpected exception",e); throw e; } else { LOG.warn("Unexpected exception, tries="+tries+ ", connecting to " + addr,e); sock = new Socket(); sock.setSoTimeout(self.tickTime * self.initLimit); } } Thread.sleep(1000); } self.authLearner.authenticate(sock, hostname); leaderIs = BinaryInputArchive.getArchive(new BufferedInputStream( sock.getInputStream())); bufferedOutput = new BufferedOutputStream(sock.getOutputStream()); leaderOs = BinaryOutputArchive.getArchive(bufferedOutput); }
follower 會通過選舉后的投票信息確認 leader 節點地址,并發起連接(總共有 5 次嘗試連接的機會,若連接不通則重新進入選舉過程)
class LearnerCnxAcceptor extends ZooKeeperThread{ private volatile boolean stop = false; public LearnerCnxAcceptor() { super("LearnerCnxAcceptor-" + ss.getLocalSocketAddress()); } @Override public void run() { try { while (!stop) { try{ /** * 接收 follower 的連接,并開啟 LearnerHandler 線程用于處理二者之間的通信 */ Socket s = ss.accept(); s.setSoTimeout(self.tickTime * self.initLimit); s.setTcpNoDelay(nodelay); BufferedInputStream is = new BufferedInputStream( s.getInputStream()); LearnerHandler fh = new LearnerHandler(s, is, Leader.this); fh.start(); } catch (SocketException e) { // 省略 } catch (SaslException e){ LOG.error("Exception while connecting to quorum learner", e); } } } catch (Exception e) { LOG.warn("Exception while accepting follower", e); } } }
從 LearnerCnxAcceptor 實現可以看出 leader 節點在為每個 follower 節點連接建立之后都會為之分配一個 LearnerHandler 線程用于處理二者之間的通信。
計算新的 epoch 值follower 在與 leader 建立連接之后,會發出 FOLLOWERINFO 信息
long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);
protected long registerWithLeader(int pktType) throws IOException{ /** * 發送 follower info 信息,包括 last zxid 和 sid */ long lastLoggedZxid = self.getLastLoggedZxid(); QuorumPacket qp = new QuorumPacket(); qp.setType(pktType); qp.setZxid(ZxidUtils.makeZxid(self.getAcceptedEpoch(), 0)); /* * Add sid to payload */ LearnerInfo li = new LearnerInfo(self.getId(), 0x10000); ByteArrayOutputStream bsid = new ByteArrayOutputStream(); BinaryOutputArchive boa = BinaryOutputArchive.getArchive(bsid); boa.writeRecord(li, "LearnerInfo"); qp.setData(bsid.toByteArray()); /** * follower 向 leader 發送 FOLLOWERINFO 信息,包括 zxid,sid,protocol version */ writePacket(qp, true); // 省略 }
接下來我們看下 leader 在接收到 FOLLOWERINFO 信息之后做什么(參考 LearnerHandler)
public void run() { try { // 省略 /** * leader 接收 follower 發送的 FOLLOWERINFO 信息,包括 follower 節點的 zxid,sid,protocol version * @see Learner.registerWithleader() */ QuorumPacket qp = new QuorumPacket(); ia.readRecord(qp, "packet"); byte learnerInfoData[] = qp.getData(); if (learnerInfoData != null) { if (learnerInfoData.length == 8) { // 省略 } else { /** * 高版本的 learnerInfoData 包括 long 類型的 sid, int 類型的 protocol version 占用 12 字節 */ LearnerInfo li = new LearnerInfo(); ByteBufferInputStream.byteBuffer2Record(ByteBuffer.wrap(learnerInfoData), li); this.sid = li.getServerid(); this.version = li.getProtocolVersion(); } } /** * 通過 follower 發送的 zxid,解析出 foloower 節點的 epoch 值 */ long lastAcceptedEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid()); long peerLastZxid; StateSummary ss = null; long zxid = qp.getZxid(); /** * 阻塞等待計算新的 epoch 值 */ long newEpoch = leader.getEpochToPropose(this.getSid(), lastAcceptedEpoch); // 省略 }
從上述代碼可知,leader 在接收到 follower 發送的 FOLLOWERINFO 信息之后,會解析出 follower 節點的 acceptedEpoch 值并參與到新的 epoch 值計算中。 (具體計算邏輯參考方法 getEpochToPropose)
public long getEpochToPropose(long sid, long lastAcceptedEpoch) throws InterruptedException, IOException { synchronized(connectingFollowers) { if (!waitingForNewEpoch) { return epoch; } // epoch 用來記錄計算后的選舉周期值 // follower 或 leader 的 acceptedEpoch 值與 epoch 比較;若前者大則將其加一 if (lastAcceptedEpoch >= epoch) { epoch = lastAcceptedEpoch+1; } // connectingFollowers 用來記錄與 leader 已連接的 follower connectingFollowers.add(sid); QuorumVerifier verifier = self.getQuorumVerifier(); // 判斷是否已計算出新的 epoch 值的條件是 leader 已經參與了 epoch 值計算,以及超過一半的節點參與了計算 if (connectingFollowers.contains(self.getId()) && verifier.containsQuorum(connectingFollowers)) { // 將 waitingForNewEpoch 設置為 false 說明不需要等待計算新的 epoch 值了 waitingForNewEpoch = false; // 設置 leader 的 acceptedEpoch 值 self.setAcceptedEpoch(epoch); // 喚醒 connectingFollowers wait 的線程 connectingFollowers.notifyAll(); } else { long start = Time.currentElapsedTime(); long cur = start; long end = start + self.getInitLimit()*self.getTickTime(); while(waitingForNewEpoch && cur < end) { // 若未完成新的 epoch 值計算則阻塞等待 connectingFollowers.wait(end - cur); cur = Time.currentElapsedTime(); } if (waitingForNewEpoch) { throw new InterruptedException("Timeout while waiting for epoch from quorum"); } } return epoch; } }
從方法 getEpochToPropose 可知 leader 會收集集群中過半的 follower acceptedEpoch 信息后,選出一個最大值然后加 1 就是 newEpoch 值; 在此過程中 leader 會進入阻塞狀態直到過半的 follower 參與到計算才會進入下一階段。
通知新的 epoch 值leader 在計算出新的 newEpoch 值后,會進入下一階段發送 LEADERINFO 信息 (同樣參考 LearnerHandler)
public void run() { try { // 省略 /** * 阻塞等待計算新的 epoch 值 */ long newEpoch = leader.getEpochToPropose(this.getSid(), lastAcceptedEpoch); if (this.getVersion() < 0x10000) { // we are going to have to extrapolate the epoch information long epoch = ZxidUtils.getEpochFromZxid(zxid); ss = new StateSummary(epoch, zxid); // fake the message leader.waitForEpochAck(this.getSid(), ss); } else { byte ver[] = new byte[4]; ByteBuffer.wrap(ver).putInt(0x10000); /** * 計算出新的 epoch 值后,leader 向 follower 發送 LEADERINFO 信息;包括新的 newEpoch */ QuorumPacket newEpochPacket = new QuorumPacket(Leader.LEADERINFO, ZxidUtils.makeZxid(newEpoch, 0), ver, null); oa.writeRecord(newEpochPacket, "packet"); bufferedOutput.flush(); // 省略 } } // 省略 }
protected long registerWithLeader(int pktType) throws IOException{ // 省略 /** * follower 向 leader 發送 FOLLOWERINFO 信息,包括 zxid,sid,protocol version */ writePacket(qp, true); /** * follower 接收 leader 發送的 LEADERINFO 信息 */ readPacket(qp); /** * 解析 leader 發送的 new epoch 值 */ final long newEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid()); if (qp.getType() == Leader.LEADERINFO) { // we are connected to a 1.0 server so accept the new epoch and read the next packet leaderProtocolVersion = ByteBuffer.wrap(qp.getData()).getInt(); byte epochBytes[] = new byte[4]; final ByteBuffer wrappedEpochBytes = ByteBuffer.wrap(epochBytes); /** * new epoch > current accepted epoch 則更新 acceptedEpoch 值 */ if (newEpoch > self.getAcceptedEpoch()) { wrappedEpochBytes.putInt((int)self.getCurrentEpoch()); self.setAcceptedEpoch(newEpoch); } else if (newEpoch == self.getAcceptedEpoch()) { wrappedEpochBytes.putInt(-1); } else { throw new IOException("Leaders epoch, " + newEpoch + " is less than accepted epoch, " + self.getAcceptedEpoch()); } /** * follower 向 leader 發送 ACKEPOCH 信息,包括 last zxid */ QuorumPacket ackNewEpoch = new QuorumPacket(Leader.ACKEPOCH, lastLoggedZxid, epochBytes, null); writePacket(ackNewEpoch, true); return ZxidUtils.makeZxid(newEpoch, 0); } }
從上述代碼可以看出在完成 newEpoch 值計算后的 leader 與 follower 的交互過程:
leader 向 follower 發送 LEADERINFO 信息,告知 follower 新的 epoch 值
follower 接收解析 LEADERINFO 信息,若 new epoch 值大于 current accepted epoch 值則更新 acceptedEpoch
follower 向 leader 發送 ACKEPOCH 信息,反饋 leader 已收到新的 epoch 值,并附帶 follower 節點的 last zxid
數據同步LearnerHandler 中 leader 在收到過半的 ACKEPOCH 信息之后將進入數據同步階段
public void run() { try { // 省略 // peerLastZxid 為 follower 的 last zxid peerLastZxid = ss.getLastZxid(); /* the default to send to the follower */ int packetToSend = Leader.SNAP; long zxidToSend = 0; long leaderLastZxid = 0; /** the packets that the follower needs to get updates from **/ long updates = peerLastZxid; ReentrantReadWriteLock lock = leader.zk.getZKDatabase().getLogLock(); ReadLock rl = lock.readLock(); try { rl.lock(); final long maxCommittedLog = leader.zk.getZKDatabase().getmaxCommittedLog(); final long minCommittedLog = leader.zk.getZKDatabase().getminCommittedLog(); LinkedListproposals = leader.zk.getZKDatabase().getCommittedLog(); if (peerLastZxid == leader.zk.getZKDatabase().getDataTreeLastProcessedZxid()) { /** * follower 與 leader 的 zxid 相同說明 二者數據一致;同步方式為差量同步 DIFF,同步的zxid 為 peerLastZxid, 也就是不需要同步 */ packetToSend = Leader.DIFF; zxidToSend = peerLastZxid; } else if (proposals.size() != 0) { // peerLastZxid 介于 minCommittedLog ,maxCommittedLog 中間 if ((maxCommittedLog >= peerLastZxid) && (minCommittedLog <= peerLastZxid)) { /** * 在遍歷 proposals 時,用來記錄上一個 proposal 的 zxid */ long prevProposalZxid = minCommittedLog; boolean firstPacket=true; packetToSend = Leader.DIFF; zxidToSend = maxCommittedLog; for (Proposal propose: proposals) { // 跳過 follower 已經存在的提案 if (propose.packet.getZxid() <= peerLastZxid) { prevProposalZxid = propose.packet.getZxid(); continue; } else { if (firstPacket) { firstPacket = false; if (prevProposalZxid < peerLastZxid) { /** * 此時說明有部分 proposals 提案在 leader 節點上不存在,則需告訴 follower 丟棄這部分 proposals * 也就是告訴 follower 先執行回滾 TRUNC ,需要回滾到 prevProposalZxid 處,也就是 follower 需要丟棄 prevProposalZxid ~ peerLastZxid 范圍內的數據 * 剩余的 proposals 則通過 DIFF 進行同步 */ packetToSend = Leader.TRUNC; zxidToSend = prevProposalZxid; updates = zxidToSend; } } /** * 將剩余待 DIFF 同步的提案放入到隊列中,等待發送 */ queuePacket(propose.packet); /** * 每個提案后對應一個 COMMIT 報文 */ QuorumPacket qcommit = new QuorumPacket(Leader.COMMIT, propose.packet.getZxid(), null, null); queuePacket(qcommit); } } } else if (peerLastZxid > maxCommittedLog) { /** * follower 的 zxid 比 leader 大 ,則告訴 follower 執行 TRUNC 回滾 */ packetToSend = Leader.TRUNC; zxidToSend = maxCommittedLog; updates = zxidToSend; } else { } } } finally { rl.unlock(); } QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER, ZxidUtils.makeZxid(newEpoch, 0), null, null); if (getVersion() < 0x10000) { oa.writeRecord(newLeaderQP, "packet"); } else { // 數據同步完成之后會發送 NEWLEADER 信息 queuedPackets.add(newLeaderQP); } bufferedOutput.flush(); //Need to set the zxidToSend to the latest zxid if (packetToSend == Leader.SNAP) { zxidToSend = leader.zk.getZKDatabase().getDataTreeLastProcessedZxid(); } /** * 發送數據同步方式信息,告訴 follower 按什么方式進行數據同步 */ oa.writeRecord(new QuorumPacket(packetToSend, zxidToSend, null, null), "packet"); bufferedOutput.flush(); /* if we are not truncating or sending a diff just send a snapshot */ if (packetToSend == Leader.SNAP) { /** * 如果是全量同步的話,則將 leader 本地數據序列化寫入 follower 的輸出流 */ leader.zk.getZKDatabase().serializeSnapshot(oa); oa.writeString("BenWasHere", "signature"); } bufferedOutput.flush(); /** * 開啟個線程執行 packet 發送 */ sendPackets(); /** * 接收 follower ack 響應 */ qp = new QuorumPacket(); ia.readRecord(qp, "packet"); /** * 阻塞等待過半的 follower ack */ leader.waitForNewLeaderAck(getSid(), qp.getZxid(), getLearnerType()); /** * leader 向 follower 發送 UPTODATE,告知其可對外提供服務 */ queuedPackets.add(new QuorumPacket(Leader.UPTODATE, -1, null, null)); // 省略 } }
從上述代碼可以看出 leader 和 follower 在進行數據同步時會通過 peerLastZxid 與 maxCommittedLog, minCommittedLog 兩個值比較最終決定數據同步方式。
DIFF(差異化同步)follower 的 peerLastZxid 等于 leader 的 peerLastZxid
此時說明 follower 與 leader 數據一致,采用 DIFF 方式同步,也即是無需同步
follower 的 peerLastZxid 介于 maxCommittedLog, minCommittedLog 兩者之間
此時說明 follower 與 leader 數據存在差異,需對差異的部分進行同步;首先 leader 會向 follower 發送 DIFF 報文告知其同步方式,隨后會發送差異的提案及提案提交報文
交互流程如下:
Leader Follower | DIFF | | --------------------> | | PROPOSAL | | --------------------> | | COMMIT | | --------------------> | | PROPOSAL | | --------------------> | | COMMIT | | --------------------> |
示例: 假設 leader 節點的提案緩存隊列對應的 zxid 依次是:
0x500000001, 0x500000002, 0x500000003, 0x500000004, 0x500000005
而 follower 節點的 peerLastZxid 為 0x500000003,則需要將 0x500000004, 0x500000005 兩個提案進行同步;那么數據包發送過程如下表:
報文類型 | ZXID |
---|---|
DIFF | 0x500000005 |
PROPOSAL | 0x500000004 |
COMMIT | 0x500000004 |
PROPOSAL | 0x500000005 |
COMMIT | 0x500000005 |
在上文 DIFF 差異化同步時會存在一個特殊場景就是 雖然 follower 的 peerLastZxid 介于 maxCommittedLog, minCommittedLog 兩者之間,但是 follower 的 peerLastZxid 在 leader 節點中不存在; 此時 leader 需告知 follower 先回滾到 peerLastZxid 的前一個 zxid, 回滾后再進行差異化同步。
交互流程如下:
Leader Follower | TRUNC | | --------------------> | | PROPOSAL | | --------------------> | | COMMIT | | --------------------> | | PROPOSAL | | --------------------> | | COMMIT | | --------------------> |
示例: 假設集群中三臺節點 A, B, C 某一時刻 A 為 Leader 選舉周期為 5, zxid 包括: (0x500000004, 0x500000005, 0x500000006); 假設某一時刻 leader A 節點在處理完事務為 0x500000007 的請求進行廣播時 leader A 節點服務器宕機導致 0x500000007 該事物沒有被同步出去;在集群進行下一輪選舉之后 B 節點成為新的 leader,選舉周期為 6 對外提供服務處理了新的事務請求包括 0x600000001, 0x600000002;
集群節點 | ZXID 列表 |
---|---|
A | 0x500000004, 0x500000005, 0x500000006, 0x500000007 |
B | 0x500000004, 0x500000005, 0x500000006, 0x600000001, 0x600000002 |
C | 0x500000004, 0x500000005, 0x500000006, 0x600000001, 0x600000002 |
此時節點 A 在重啟加入集群后,在與 leader B 節點進行數據同步時會發現事務 0x500000007 在 leader 節點中并不存在,此時 leader 告知 A 需先回滾事務到 0x500000006,在差異同步事務 0x600000001,0x600000002;那么數據包發送過程如下表:
報文類型 | ZXID |
---|---|
TRUNC | 0x500000006 |
PROPOSAL | 0x600000001 |
COMMIT | 0x600000001 |
PROPOSAL | 0x600000002 |
COMMIT | 0x600000002 |
若 follower 的 peerLastZxid 大于 leader 的 maxCommittedLog,則告知 follower 回滾至 maxCommittedLog; 該場景可以認為是 TRUNC+DIFF 的簡化模式
交互流程如下:
Leader Follower | TRUNC | | --------------------> |SNAP(全量同步)
若 follower 的 peerLastZxid 小于 leader 的 minCommittedLog 或者 leader 節點上不存在提案緩存隊列時,將采用 SNAP 全量同步方式。 該模式下 leader 首先會向 follower 發送 SNAP 報文,隨后從內存數據庫中獲取全量數據序列化傳輸給 follower, follower 在接收全量數據后會進行反序列化加載到內存數據庫中。
交互流程如下:
Leader Follower | SNAP | | --------------------> | | DATA | | --------------------> |
leader 在完成數據同步之后,會向 follower 發送 NEWLEADER 報文,在收到過半的 follower 響應的 ACK 之后此時說明過半的節點完成了數據同步,接下來 leader 會向 follower 發送 UPTODATE 報文告知 follower 節點可以對外提供服務了,此時 leader 會啟動 zk server 開始對外提供服務。
FOLLOWER 數據同步下面我們在看下數據同步階段 FOLLOWER 是如何處理的,參考 Learner.syncWithLeader
protected void syncWithLeader(long newLeaderZxid) throws IOException, InterruptedException{ QuorumPacket ack = new QuorumPacket(Leader.ACK, 0, null, null); QuorumPacket qp = new QuorumPacket(); long newEpoch = ZxidUtils.getEpochFromZxid(newLeaderZxid); /** * 接收 leader 發送的數據同步方式報文 */ readPacket(qp); synchronized (zk) { if (qp.getType() == Leader.DIFF) { } else if (qp.getType() == Leader.SNAP) { // 執行加載全量數據 } else if (qp.getType() == Leader.TRUNC) { // 執行回滾 } else { } outerLoop: while (self.isRunning()) { readPacket(qp); switch(qp.getType()) { case Leader.PROPOSAL: // 處理提案 break; case Leader.COMMIT: // commit proposal break; case Leader.INFORM: // 忽略 break; case Leader.UPTODATE: // 設置 zk server self.cnxnFactory.setZooKeeperServer(zk); // 退出循環 break outerLoop; case Leader.NEWLEADER: // Getting NEWLEADER here instead of in discovery /** * follower 響應 NEWLEADER ACK */ writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null), true); break; } } } ack.setZxid(ZxidUtils.makeZxid(newEpoch, 0)); writePacket(ack, true); // 啟動 zk server zk.startup(); }
從上述代碼中可以看出 follower 在數據同步階段的處理流程如下:
follower 接收 leader 發送的數據同步方式(DIFF/TRUNC/SANP)報文并進行相應處理
當 follower 收到 leader 發送的 NEWLEADER 報文后,會向 leader 響應 ACK (leader 在收到過半的 ACK 消息之后會發送 UPTODATE)
當 follower 收到 leader 發送的 UPTODATE 報文后,說明此時可以對外提供服務,此時將啟動 zk server
小結最后用一張圖總結下 zk 在完成選舉后數據同步的過程如下圖所示:
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/74515.html
摘要:表示的是兩個,當其中任意一個計算完并發編程之是線程安全并且高效的,在并發編程中經常可見它的使用,在開始分析它的高并發實現機制前,先講講廢話,看看它是如何被引入的。電商秒殺和搶購,是兩個比較典型的互聯網高并發場景。 干貨:深度剖析分布式搜索引擎設計 分布式,高可用,和機器學習一樣,最近幾年被提及得最多的名詞,聽名字多牛逼,來,我們一步一步來擊破前兩個名詞,今天我們首先來說說分布式。 探究...
摘要:在中一般來說通過來創建所需要的線程池,如高并發原理初探后端掘金閱前熱身為了更加形象的說明同步異步阻塞非阻塞,我們以小明去買奶茶為例。 AbstractQueuedSynchronizer 超詳細原理解析 - 后端 - 掘金今天我們來研究學習一下AbstractQueuedSynchronizer類的相關原理,java.util.concurrent包中很多類都依賴于這個類所提供的隊列式...
摘要:在中一般來說通過來創建所需要的線程池,如高并發原理初探后端掘金閱前熱身為了更加形象的說明同步異步阻塞非阻塞,我們以小明去買奶茶為例。 AbstractQueuedSynchronizer 超詳細原理解析 - 后端 - 掘金今天我們來研究學習一下AbstractQueuedSynchronizer類的相關原理,java.util.concurrent包中很多類都依賴于這個類所提供的隊列式...
閱讀 1325·2023-04-26 00:10
閱讀 2428·2021-09-22 15:38
閱讀 3746·2021-09-22 15:13
閱讀 3503·2019-08-30 13:11
閱讀 646·2019-08-30 11:01
閱讀 3028·2019-08-29 14:20
閱讀 3208·2019-08-29 13:27
閱讀 1726·2019-08-29 11:33