摘要:前言分布式一致性原理剖析系列將會(huì)對(duì)的分布式一致性原理進(jìn)行詳細(xì)的剖析,介紹其實(shí)現(xiàn)方式原理以及其存在的問(wèn)題等基于版本。使用額外的一致性組件維護(hù)。管理的全局組件,其保證數(shù)據(jù)的一致性。將這個(gè)加入自己的,同時(shí)向所有發(fā)送請(qǐng)求,要求將這個(gè)加入。
前言
“Elasticsearch分布式一致性原理剖析”系列將會(huì)對(duì)Elasticsearch的分布式一致性原理進(jìn)行詳細(xì)的剖析,介紹其實(shí)現(xiàn)方式、原理以及其存在的問(wèn)題等(基于6.2版本)。前兩篇文章介紹了ES中集群如何組成,master選舉算法,master更新meta的流程等,并分析了選舉、Meta更新中的一致性問(wèn)題。本文會(huì)分析ES中的數(shù)據(jù)流,包括其寫(xiě)入流程、算法模型PacificA、SequenceNumber與Checkpoint等,并比較ES的實(shí)現(xiàn)與標(biāo)準(zhǔn)PacificA算法的異同。目錄如下:
問(wèn)題背景
數(shù)據(jù)寫(xiě)入流程
PacificA算法
SequenceNumber、Checkpoint與故障恢復(fù)
ES與PacificA的比較
小結(jié)
問(wèn)題背景
用過(guò)ES的同學(xué)都知道,ES中每個(gè)Index會(huì)劃分為多個(gè)Shard,Shard分布在不同的Node上,以此來(lái)實(shí)現(xiàn)分布式的存儲(chǔ)和查詢,支撐大規(guī)模的數(shù)據(jù)集。對(duì)于每個(gè)Shard,又會(huì)有多個(gè)Shard的副本,其中一個(gè)為Primary,其余的一個(gè)或多個(gè)為Replica。數(shù)據(jù)在寫(xiě)入時(shí),會(huì)先寫(xiě)入Primary,由Primary將數(shù)據(jù)再同步給Replica。在讀取時(shí),為了提高讀取能力,Primary和Replica都會(huì)接受讀請(qǐng)求。
在這種模型下,我們能夠感受到ES具有這樣的一些特性,比如:
數(shù)據(jù)高可靠:數(shù)據(jù)具有多個(gè)副本。
服務(wù)高可用:Primary掛掉之后,可以從Replica中選出新的Primary提供服務(wù)。
讀能力擴(kuò)展:Primary和Replica都可以承擔(dān)讀請(qǐng)求。
故障恢復(fù)能力:Primary或Replica掛掉都會(huì)導(dǎo)致副本數(shù)不足,此時(shí)可以由新的Primary通過(guò)復(fù)制數(shù)據(jù)產(chǎn)生新的副本。
另外,我們也可以想到一些問(wèn)題,比如:
數(shù)據(jù)怎么從Primary復(fù)制到Replica?
一次寫(xiě)入要求所有副本都成功嗎?
Primary掛掉會(huì)丟數(shù)據(jù)嗎?
數(shù)據(jù)從Replica讀,總是能讀到最新數(shù)據(jù)嗎?
故障恢復(fù)時(shí),需要拷貝Shard下的全部數(shù)據(jù)嗎?
可以看到,對(duì)于ES中的數(shù)據(jù)一致性,雖然我們可以很容易的了解到其大概原理,但是對(duì)其細(xì)節(jié)我們還有很多的困惑。那么本文就從ES的寫(xiě)入流程,采用的一致性算法,SequenceId和Checkpoint的設(shè)計(jì)等方面來(lái)介紹ES如何工作,進(jìn)而回答上述這些問(wèn)題。需要注意的是,本文基于ES6.2版本進(jìn)行分析,可能很多內(nèi)容并不適用于ES之前的版本,比如2.X的版本等。
首先我們來(lái)看一下數(shù)據(jù)的寫(xiě)入流程,讀者也可以閱讀這篇文章來(lái)詳細(xì)了解:https://zhuanlan.zhihu.com/p/...。
Replication角度: Primary -> Replica
我們從大的角度來(lái)看,ES寫(xiě)入流程為先寫(xiě)入Primary,再并發(fā)寫(xiě)入Replica,最后應(yīng)答客戶端,流程如下:
檢查Active的Shard數(shù)。
String activeShardCountFailure = checkActiveShardCount();
寫(xiě)入Primary。
String activeShardCountFailure = checkActiveShardCount();
primaryResult = primary.perform(request);
并發(fā)的向所有Replicate發(fā)起寫(xiě)入請(qǐng)求
performOnReplicas(replicaRequest, globalCheckpoint, replicationGroup.getRoutingTable());
等所有Replicate返回或者失敗后,返回給Client。
private void decPendingAndFinishIfNeeded() { assert pendingActions.get() > 0 : "pending action count goes below 0 for request [" + request + "]"; if (pendingActions.decrementAndGet() == 0) { finish(); } }
上述過(guò)程在ReplicationOperation類(lèi)的execute函數(shù)中,完整代碼如下:
public void execute() throws Exception { final String activeShardCountFailure = checkActiveShardCount(); final ShardRouting primaryRouting = primary.routingEntry(); final ShardId primaryId = primaryRouting.shardId(); if (activeShardCountFailure != null) { finishAsFailed(new UnavailableShardsException(primaryId, "{} Timeout: [{}], request: [{}]", activeShardCountFailure, request.timeout(), request)); return; } totalShards.incrementAndGet(); pendingActions.incrementAndGet(); // increase by 1 until we finish all primary coordination primaryResult = primary.perform(request); primary.updateLocalCheckpointForShard(primaryRouting.allocationId().getId(), primary.localCheckpoint()); final ReplicaRequest replicaRequest = primaryResult.replicaRequest(); if (replicaRequest != null) { if (logger.isTraceEnabled()) { logger.trace("[{}] op [{}] completed on primary for request [{}]", primaryId, opType, request); } // we have to get the replication group after successfully indexing into the primary in order to honour recovery semantics. // we have to make sure that every operation indexed into the primary after recovery start will also be replicated // to the recovery target. If we used an old replication group, we may miss a recovery that has started since then. // we also have to make sure to get the global checkpoint before the replication group, to ensure that the global checkpoint // is valid for this replication group. If we would sample in the reverse, the global checkpoint might be based on a subset // of the sampled replication group, and advanced further than what the given replication group would allow it to. // This would entail that some shards could learn about a global checkpoint that would be higher than its local checkpoint. final long globalCheckpoint = primary.globalCheckpoint(); final ReplicationGroup replicationGroup = primary.getReplicationGroup(); markUnavailableShardsAsStale(replicaRequest, replicationGroup.getInSyncAllocationIds(), replicationGroup.getRoutingTable()); performOnReplicas(replicaRequest, globalCheckpoint, replicationGroup.getRoutingTable()); } successfulShards.incrementAndGet(); // mark primary as successful decPendingAndFinishIfNeeded(); }
下面我們針對(duì)這個(gè)流程,來(lái)分析幾個(gè)問(wèn)題:
1. 為什么第一步要檢查Active的Shard數(shù)?
ES中有一個(gè)參數(shù),叫做wait_for_active_shards,這個(gè)參數(shù)是Index的一個(gè)setting,也可以在請(qǐng)求中帶上這個(gè)參數(shù)。這個(gè)參數(shù)的含義是,在每次寫(xiě)入前,該shard至少具有的active副本數(shù)。假設(shè)我們有一個(gè)Index,其每個(gè)Shard有3個(gè)Replica,加上Primary則總共有4個(gè)副本。如果配置wait_for_active_shards為3,那么允許最多有一個(gè)Replica掛掉,如果有兩個(gè)Replica掛掉,則Active的副本數(shù)不足3,此時(shí)不允許寫(xiě)入。
這個(gè)參數(shù)默認(rèn)是1,即只要Primary在就可以寫(xiě)入,起不到什么作用。如果配置大于1,可以起到一種保護(hù)的作用,保證寫(xiě)入的數(shù)據(jù)具有更高的可靠性。但是這個(gè)參數(shù)只在寫(xiě)入前檢查,并不保證數(shù)據(jù)一定在至少這些個(gè)副本上寫(xiě)入成功,所以并不是嚴(yán)格保證了最少寫(xiě)入了多少個(gè)副本。關(guān)于這一點(diǎn),可參考以下官方文檔:
https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-index_.html ...It is important to note that this setting greatly reduces the chances of the write operation not writing to the requisite number of shard copies, but it does not completely eliminate the possibility, because this check occurs before the write operation commences. Once the write operation is underway, it is still possible for replication to fail on any number of shard copies but still succeed on the primary. The _shards section of the write operation’s response reveals the number of shard copies on which replication succeeded/failed.
2. 寫(xiě)入Primary完成后,為何要等待所有Replica響應(yīng)(或連接失敗)后返回
在更早的ES版本,Primary和Replica之間是允許異步復(fù)制的,即寫(xiě)入Primary成功即可返回。但是這種模式下,如果Primary掛掉,就有丟數(shù)據(jù)的風(fēng)險(xiǎn),而且從Replica讀數(shù)據(jù)也很難保證能讀到最新的數(shù)據(jù)。所以后來(lái)ES就取消異步模式了,改成Primary等Replica返回后再返回給客戶端。
因?yàn)镻rimary要等所有Replica返回才能返回給客戶端,那么延遲就會(huì)受到最慢的Replica的影響,這確實(shí)是目前ES架構(gòu)的一個(gè)弊端。之前曾誤認(rèn)為這里是等wait_for_active_shards個(gè)副本寫(xiě)入成功即可返回,但是后來(lái)讀源碼發(fā)現(xiàn)是等所有Replica返回的。
https://github.com/elastic/elasticsearch/blob/master/docs/reference/docs/data-replication.asciidoc ... Once all replicas have successfully performed the operation and responded to the primary, the primary acknowledges the successful completion of the request to the client.
如果Replica寫(xiě)入失敗,ES會(huì)執(zhí)行一些重試邏輯等,但最終并不強(qiáng)求一定要在多少個(gè)節(jié)點(diǎn)寫(xiě)入成功。在返回的結(jié)果中,會(huì)包含數(shù)據(jù)在多少個(gè)shard中寫(xiě)入成功了,多少個(gè)失敗了:如果Replica寫(xiě)入失敗,ES會(huì)執(zhí)行一些重試邏輯等,但最終并不強(qiáng)求一定要在多少個(gè)節(jié)點(diǎn)寫(xiě)入成功。在返回的結(jié)果中,會(huì)包含數(shù)據(jù)在多少個(gè)shard中寫(xiě)入成功了,多少個(gè)失敗了:
如果Replica寫(xiě)入失敗,ES會(huì)執(zhí)行一些重試邏輯等,但最終并不強(qiáng)求一定要在多少個(gè)節(jié)點(diǎn)寫(xiě)入成功。在返回的結(jié)果中,會(huì)包含數(shù)據(jù)在多少個(gè)shard中寫(xiě)入成功了,多少個(gè)失敗了:
3. 如果某個(gè)Replica持續(xù)寫(xiě)失敗,用戶是否會(huì)經(jīng)常查到舊數(shù)據(jù)?
這個(gè)問(wèn)題是說(shuō),假如一個(gè)Replica持續(xù)寫(xiě)入失敗,那么這個(gè)Replica上的數(shù)據(jù)可能落后Primary很多。我們知道ES中Replica也是可以承擔(dān)讀請(qǐng)求的,那么用戶是否會(huì)讀到這個(gè)Replica上的舊數(shù)據(jù)呢?
答案是如果一個(gè)Replica寫(xiě)失敗了,Primary會(huì)將這個(gè)信息報(bào)告給Master,然后Master會(huì)在Meta中更新這個(gè)Index的InSyncAllocations配置,將這個(gè)Replica從中移除,移除后它就不再承擔(dān)讀請(qǐng)求。在Meta更新到各個(gè)Node之前,用戶可能還會(huì)讀到這個(gè)Replica的數(shù)據(jù),但是更新了Meta之后就不會(huì)了。所以這個(gè)方案并不是非常的嚴(yán)格,考慮到ES本身就是一個(gè)近實(shí)時(shí)系統(tǒng),數(shù)據(jù)寫(xiě)入后需要refresh才可見(jiàn),所以一般情況下,在短期內(nèi)讀到舊數(shù)據(jù)應(yīng)該也是可接受的。
ReplicationOperation.java,寫(xiě)入Replica失敗的OnFailure函數(shù): public void onFailure(Exception replicaException) { logger.trace( (org.apache.logging.log4j.util.Supplier>) () -> new ParameterizedMessage( "[{}] failure while performing [{}] on replica {}, request [{}]", shard.shardId(), opType, shard, replicaRequest), replicaException); if (TransportActions.isShardNotAvailableException(replicaException)) { decPendingAndFinishIfNeeded(); } else { RestStatus restStatus = ExceptionsHelper.status(replicaException); shardReplicaFailures.add(new ReplicationResponse.ShardInfo.Failure( shard.shardId(), shard.currentNodeId(), replicaException, restStatus, false)); String message = String.format(Locale.ROOT, "failed to perform %s on replica %s", opType, shard); replicasProxy.failShardIfNeeded(shard, message, replicaException, ReplicationOperation.this::decPendingAndFinishIfNeeded, ReplicationOperation.this::onPrimaryDemoted, throwable -> decPendingAndFinishIfNeeded()); } } 調(diào)用failShardIfNeeded: public void failShardIfNeeded(ShardRouting replica, String message, Exception exception, Runnable onSuccess, ConsumeronPrimaryDemoted, Consumer onIgnoredFailure) { logger.warn((org.apache.logging.log4j.util.Supplier>) () -> new ParameterizedMessage("[{}] {}", replica.shardId(), message), exception); shardStateAction.remoteShardFailed(replica.shardId(), replica.allocationId().getId(), primaryTerm, message, exception, createListener(onSuccess, onPrimaryDemoted, onIgnoredFailure)); } shardStateAction.remoteShardFailed向Master發(fā)送請(qǐng)求,執(zhí)行該Replica的ShardFailed邏輯,將Shard從InSyncAllocation中移除。 public void shardFailed(ShardRouting failedShard, UnassignedInfo unassignedInfo) { if (failedShard.active() && unassignedInfo.getReason() != UnassignedInfo.Reason.NODE_LEFT) { removeAllocationId(failedShard); if (failedShard.primary()) { Updates updates = changes(failedShard.shardId()); if (updates.firstFailedPrimary == null) { // more than one primary can be failed (because of batching, primary can be failed, replica promoted and then failed...) updates.firstFailedPrimary = failedShard; } } } if (failedShard.active() && failedShard.primary()) { increasePrimaryTerm(failedShard.shardId()); } }
Primary自身角度
從Primary自身的角度,一次寫(xiě)入請(qǐng)求會(huì)先寫(xiě)入Lucene,然后寫(xiě)入translog。具體流程可以看這篇文章:https://zhuanlan.zhihu.com/p/... 。
1. 為什么要寫(xiě)translog?
translog類(lèi)似于數(shù)據(jù)庫(kù)中的commitlog,或者binlog。只要translog寫(xiě)入成功并flush,那么這筆數(shù)據(jù)就落盤(pán)了,數(shù)據(jù)安全性有了保證,Segment就可以晚一點(diǎn)落盤(pán)。因?yàn)閠ranslog是append方式寫(xiě)入,寫(xiě)入性能也會(huì)比隨機(jī)寫(xiě)更高。
另一方面是,translog記錄了每一筆數(shù)據(jù)更改,以及數(shù)據(jù)更改的順序,所以translog也可以用于數(shù)據(jù)恢復(fù)。數(shù)據(jù)恢復(fù)包含兩方面,一方面是節(jié)點(diǎn)重啟后,從translog中恢復(fù)重啟前還未落盤(pán)的Segment數(shù)據(jù),另一方面是用于Primary和新的Replica之間的數(shù)據(jù)同步,即Replica逐步追上Primary數(shù)據(jù)的過(guò)程。
2. 為什么先寫(xiě)Lucene,再寫(xiě)translog?
寫(xiě)Lucene是寫(xiě)入內(nèi)存,寫(xiě)入后在內(nèi)存中refresh即可讀到,寫(xiě)translog是落盤(pán),為了數(shù)據(jù)持久化以及恢復(fù)。正常來(lái)講,分布式系統(tǒng)中是先寫(xiě)commitLog進(jìn)行數(shù)據(jù)持久化,再在內(nèi)存中apply這次更改,那么ES為什么要反其道而行之呢?主要原因大概是寫(xiě)入Lucene時(shí),Lucene會(huì)再對(duì)數(shù)據(jù)進(jìn)行一些檢查,有可能出現(xiàn)寫(xiě)入Lucene失敗的情況。如果先寫(xiě)translog,那么就要處理寫(xiě)入translog成功但是寫(xiě)入Lucene一直失敗的問(wèn)題,所以ES采用了先寫(xiě)Lucene的方式。
PacificA是微軟亞洲研究院提出的一種用于日志復(fù)制系統(tǒng)的分布式一致性算法,論文發(fā)表于2008年(PacificA paper)。ES官方明確提出了其Replication模型基于該算法:
https://github.com/elastic/elasticsearch/blob/master/docs/reference/docs/data-replication.asciidoc Elasticsearch’s data replication model is based on the primary-backup model and is described very well in the PacificA paper of Microsoft Research. That model is based on having a single copy from the replication group that acts as the primary shard. The other copies are called replica shards. The primary serves as the main entry point for all indexing operations. It is in charge of validating them and making sure they are correct. Once an index operation has been accepted by the primary, the primary is also responsible for replicating the operation to the other copies.
網(wǎng)上講解這個(gè)算法的文章較少,因此本文根據(jù)PacificA的論文,簡(jiǎn)單介紹一下這個(gè)算法。該算法具有以下幾個(gè)特點(diǎn):
強(qiáng)一致性。
單Primary向多Secondary的數(shù)據(jù)同步模式。
使用額外的一致性組件維護(hù)Configuration。
少數(shù)派Replica可用時(shí)仍可寫(xiě)入。
一些名詞
首先我們介紹一下算法中的一些名詞:
Replica Group:一個(gè)互為副本的數(shù)據(jù)集合叫做Replica Group,每個(gè)副本是一個(gè)Replica。一個(gè)Replica Group中只有一個(gè)副本是Primary,其余為Secondary。
Configuration:一個(gè)Replica Group的Configuration描述了這個(gè)Replica Group包含哪些副本,其中Primary是誰(shuí)等。
Configuration Version:Configuration的版本號(hào),每次Configuration發(fā)生變更時(shí)加1。
Configuration Manager: 管理Configuration的全局組件,其保證Configuration數(shù)據(jù)的一致性。Configuration變更會(huì)由某個(gè)Replica發(fā)起,帶著Version發(fā)送給Configuration Manager,Configuration Manager會(huì)檢查Version是否正確,如果不正確則拒絕更改。
Query & Update:對(duì)一個(gè)Replica Group的操作分為兩種,Query和Update,Query不會(huì)改變數(shù)據(jù),Update會(huì)更改數(shù)據(jù)。
Serial Number(sn):代表每個(gè)Update操作執(zhí)行的順序,每次Update操作加1,為連續(xù)的數(shù)字。
Prepared List:Update操作的準(zhǔn)備序列。
Committed List:Update操作的提交序列,提交序列中的操作一定不會(huì)丟失(除非全部副本掛掉)。在同一個(gè)Replica上,Committed List一定是Prepared List的前綴。
Primary Invariant
在PacificA算法中,要求采用某種錯(cuò)誤檢測(cè)機(jī)制來(lái)滿足以下不變式:
Primary Invariant: 任何時(shí)候,當(dāng)一個(gè)Replica認(rèn)為自己是Primary時(shí),Configuration Manager中維護(hù)的Configuration也認(rèn)為其是當(dāng)前的Primary。任何時(shí)候,最多只有一個(gè)Replica認(rèn)為自己是這個(gè)Replica Group的Primary。
Primary Invariant保證了當(dāng)一個(gè)節(jié)點(diǎn)認(rèn)為自己是Primary時(shí),其肯定是當(dāng)前的Primary。如果不能滿足Primary Invariant,那么Query請(qǐng)求就可能發(fā)送給Old Primary,讀到舊的數(shù)據(jù)。
怎么保證滿足Primary Invariant呢?論文給出的一種方法是通過(guò)Lease機(jī)制,這也是分布式系統(tǒng)中常用的一種方式。具體來(lái)說(shuō),Primary會(huì)定期獲取一個(gè)Lease,獲取之后認(rèn)為某段時(shí)間內(nèi)自己肯定是Primary,一旦超過(guò)這個(gè)時(shí)間還未獲取到新的Lease就退出Primary狀態(tài)。只要各個(gè)機(jī)器的CPU不出現(xiàn)較大的時(shí)鐘漂移,那么就能夠保證Lease機(jī)制的有效性。
論文中實(shí)現(xiàn)Lease機(jī)制的方式是,Primary定期向所有Secondary發(fā)送心跳來(lái)獲取Lease,而不是所有節(jié)點(diǎn)都向某個(gè)中心化組件獲取Lease。這樣的好處是分散了壓力,不會(huì)出現(xiàn)中心化組件故障而導(dǎo)致所有節(jié)點(diǎn)失去Lease的情況。
Query
Query流程比較簡(jiǎn)單,Query只能發(fā)送給Primary,Primary根據(jù)最新commit的數(shù)據(jù),返回對(duì)應(yīng)的值。由于算法要求滿足Primary Invariant,所以Query總是能讀到最新commit的數(shù)據(jù)。
Update
Update流程如下:
Primary分配一個(gè)Serial Number(簡(jiǎn)稱sn)給一個(gè)UpdateRequest。
Primary將這個(gè)UpdateRequest加入自己的Prepared List,同時(shí)向所有Secondary發(fā)送Prepare請(qǐng)求,要求將這個(gè)UpdateRequest加入Prepared List。
當(dāng)所有Replica都完成了Prepare,即所有Replica的Prepared List中都包含了該Update請(qǐng)求時(shí),Primary開(kāi)始Commit這個(gè)請(qǐng)求,即將這個(gè)UpdateRequest放入Committed List中,同時(shí)Apply這個(gè)Update。需要注意的是,同一個(gè)Replica上,Committed List永遠(yuǎn)是Prepared List的前綴,所以Primary實(shí)際上是提高Committed Point,把這個(gè)Update Request包含進(jìn)來(lái)。
返回客戶端,Update操作成功。
當(dāng)下一次Primary向Secondary發(fā)送請(qǐng)求時(shí),會(huì)帶上Primary當(dāng)前的Committed Point,此時(shí)Secondary才會(huì)提高自己的Committed Point。
從Update流程我們可以得出以下不變式:
Commited Invariant
我們把某一個(gè)Secondary的Committed List記為SecondaryCommittedList,其Prepared List記為SecondaryPreparedList,把Primary的Committed List記為PrimaryCommittedList。
Commited Invariant:SecondaryCommittedList一定是PrimaryCommittedList的前綴,PrimaryCommittedList一定是SecondaryPreparedList的前綴。
Reconfiguration:Secondary故障,Primary故障,新加節(jié)點(diǎn)
Secondary故障
當(dāng)一個(gè)Secondary故障時(shí),Primary向Configuration Manager發(fā)起Reconfiguration,將故障節(jié)點(diǎn)從Replica Group中刪除。一旦移除這個(gè)Replica,它就不屬于這個(gè)Replica Group了,所有請(qǐng)求都不會(huì)再發(fā)給它。
假設(shè)某個(gè)Primary和Secondary發(fā)生了網(wǎng)絡(luò)分區(qū),但是都可以連接Configuration Manager。這時(shí)候Primary會(huì)檢測(cè)到Secondary沒(méi)有響應(yīng)了,Secondary也會(huì)檢測(cè)到Primary沒(méi)有響應(yīng)。此時(shí)兩者都會(huì)試圖發(fā)起Reconfiguration,將對(duì)方從Replica Group中移除,這里的策略是First Win的原則,誰(shuí)先到Configuration Manager中更改成功,誰(shuí)就留在Replica Group里,而另外一個(gè)已經(jīng)不屬于Replica Group了,也就無(wú)法再更新Configuration了。由于Primary會(huì)向Secondary請(qǐng)求一個(gè)Lease,在Lease有效期內(nèi)Secondary不會(huì)執(zhí)行Reconfiguration,而Primary的探測(cè)間隔必然是小于Lease時(shí)間的,所以我認(rèn)為這種情況下總是傾向于Primary先進(jìn)行Reconfiguration,將Secondary剔除。
Primary故障
當(dāng)一個(gè)Primary故障時(shí),Secondary會(huì)收不到Primary的心跳,如果超過(guò)Lease的時(shí)間,那么Secondary就會(huì)發(fā)起Reconfiguration,將Primary剔除,這里也是First Win的原則,哪個(gè)Secondary先成功,就會(huì)變成Primary。
當(dāng)一個(gè)Secondary變成Primary后,需要先經(jīng)過(guò)一個(gè)叫做Reconciliation的階段才能提供服務(wù)
由于上述的Commited Invariant,所以原先的Primary的Committed List一定是新的Primary的Prepared List的前綴,那么我們將新的Primary的Prepared List中的內(nèi)容與當(dāng)前Replica Group中的其他節(jié)點(diǎn)對(duì)齊,相當(dāng)于把該節(jié)點(diǎn)上未Commit的記錄在所有節(jié)點(diǎn)上再Commit一次,那么就一定包含之前所有的Commit記錄。即以下不變式:
Reconfiguration Invariant:當(dāng)一個(gè)新的Primary在T時(shí)刻完成Reconciliation時(shí),那么T時(shí)刻之前任何節(jié)點(diǎn)(包括原Primary)的Commited List都是新Primary當(dāng)前Commited List的前綴。
Reconfiguration Invariant表明了已經(jīng)Commit的數(shù)據(jù)在Reconfiguration過(guò)程中不會(huì)丟。
新加節(jié)點(diǎn)
新加的節(jié)點(diǎn)需要先成為Secondary Candidate,這時(shí)候Primary就開(kāi)始向其發(fā)送Prepare請(qǐng)求,此時(shí)這個(gè)節(jié)點(diǎn)還會(huì)追之前未同步過(guò)來(lái)的記錄,一旦追平,就申請(qǐng)成為一個(gè)Secondary,然后Primary向Configuration Manager發(fā)起配置變更,將這個(gè)節(jié)點(diǎn)加入Replica Group。
還有一種情況時(shí),如果一個(gè)節(jié)點(diǎn)曾經(jīng)在Replica Group中,由于臨時(shí)發(fā)生故障被移除,現(xiàn)在需要重新加回來(lái)。此時(shí)這個(gè)節(jié)點(diǎn)上的Commited List中的數(shù)據(jù)肯定是已經(jīng)被Commit的了,但是Prepared List中的數(shù)據(jù)未必被Commit,所以應(yīng)該將未Commit的數(shù)據(jù)移除,從Committed Point開(kāi)始向Primary請(qǐng)求數(shù)據(jù)。
算法總結(jié)
PacificA是一個(gè)讀寫(xiě)都滿足強(qiáng)一致性的算法,它把數(shù)據(jù)的一致性與配置(Configuration)的一致性分開(kāi),使用額外的一致性組件(Configuration Manager)維護(hù)配置的一致性,在數(shù)據(jù)的可用副本數(shù)少于半數(shù)時(shí),仍可以寫(xiě)入新數(shù)據(jù)并保證強(qiáng)一致性。
ES在設(shè)計(jì)上參考了PacificA算法,其通過(guò)Master維護(hù)Index的Meta,類(lèi)似于論文中的Configuration Manager維護(hù)Configuration。其IndexMeta中的InSyncAllocationIds代表了當(dāng)前可用的Shard,類(lèi)似于論文中維護(hù)Replica Group。下一節(jié)我們會(huì)介紹ES中的SequenceNumber和Checkpoint,這兩個(gè)類(lèi)似于PacificA算法中的Serial Number和Committed Point,在這一節(jié)之后,會(huì)再有一節(jié)來(lái)比較ES的實(shí)現(xiàn)與PacificA的異同。
SequenceNumber、Checkpoint與故障恢復(fù)
上面介紹了ES的一致性算法模型PacificA,該算法很重要的一點(diǎn)是每個(gè)Update操作都會(huì)有一個(gè)對(duì)應(yīng)的Serial Number,表示執(zhí)行的順序。在之前的ES版本中,每個(gè)寫(xiě)入操作并沒(méi)有類(lèi)似Serial Number的東西,所以很多事情做不了。在15年的時(shí)候,ES官方開(kāi)始規(guī)劃給每個(gè)寫(xiě)操作加入SequenceNumber,并設(shè)想了很多應(yīng)用場(chǎng)景。具體信息可以參考以下兩個(gè)鏈接:
Add Sequence Numbers to write operations #10708
Sequence IDs: Coming Soon to an Elasticsearch Cluster Near You
下面我們簡(jiǎn)單介紹一下Sequence、Checkpoint是什么,以及其應(yīng)用場(chǎng)景。
Term和SequenceNumber
每個(gè)寫(xiě)操作都會(huì)分配兩個(gè)值,Term和SequenceNumber。Term在每次Primary變更時(shí)都會(huì)加1,類(lèi)似于PacificA論文中的Configuration Version。SequenceNumber在每次操作后加1,類(lèi)似于PacificA論文中的Serial Number。
由于寫(xiě)請(qǐng)求總是發(fā)給Primary,所以Term和SequenceNumber會(huì)由Primary分配,在向Replica發(fā)送同步請(qǐng)求時(shí),會(huì)帶上這兩個(gè)值。
LocalCheckpoint和GlobalCheckpoint
LocalCheckpoint代表本Shard中所有小于該值的請(qǐng)求都已經(jīng)處理完畢。
GlobalCheckpoint代表所有小于該值的請(qǐng)求在所有的Replica上都處理完畢。GlobalCheckpoint會(huì)由Primary進(jìn)行維護(hù),每個(gè)Replica會(huì)向Primary匯報(bào)自己的LocalCheckpoint,Primary根據(jù)這些信息來(lái)提升GlobalCheckpoint。
GlobalCheckpoint是一個(gè)全局的安全位置,代表其前面的請(qǐng)求都被所有Replica正確處理了,可以應(yīng)用在節(jié)點(diǎn)故障恢復(fù)后的數(shù)據(jù)回補(bǔ)。另一方面,GlobalCheckpoint也可以用于Translog的GC,因?yàn)橹暗牟僮饔涗浛梢圆槐4媪恕2贿^(guò)ES中Translog的GC策略是按照大小或者時(shí)間,好像并沒(méi)有使用GlobalCheckpoint。
快速故障恢復(fù)
當(dāng)一個(gè)Replica故障時(shí),ES會(huì)將其移除,當(dāng)故障超過(guò)一定時(shí)間,ES會(huì)分配一個(gè)新的Replica到新的Node上,此時(shí)需要全量同步數(shù)據(jù)。但是如果之前故障的Replica回來(lái)了,就可以只回補(bǔ)故障之后的數(shù)據(jù),追平后加回來(lái)即可,實(shí)現(xiàn)快速故障恢復(fù)。實(shí)現(xiàn)快速故障恢復(fù)的條件有兩個(gè),一個(gè)是能夠保存故障期間所有的操作以及其順序,另一個(gè)是能夠知道從哪個(gè)點(diǎn)開(kāi)始同步數(shù)據(jù)。第一個(gè)條件可以通過(guò)保存一定時(shí)間的Translog實(shí)現(xiàn),第二個(gè)條件可以通過(guò)Checkpoint實(shí)現(xiàn),所以就能夠?qū)崿F(xiàn)快速的故障恢復(fù)。這是SequenceNumber和Checkpoint的第一個(gè)重要應(yīng)用場(chǎng)景。
ES與PacificA的比較
相同點(diǎn)
Meta一致性和Data一致性分開(kāi)處理:PacificA中通過(guò)Configuration Manager維護(hù)Configuration的一致性,ES中通過(guò)Master維護(hù)Meta的一致性。
維護(hù)同步中的副本集合:PacificA中維護(hù)Replica Group,ES中維護(hù)InSyncAllocationIds。
SequenceNumber:在PacificA和ES中,寫(xiě)操作都具有SequenceNumber,記錄操作順序。
不同點(diǎn)
不同點(diǎn)主要體現(xiàn)在ES雖然遵循PacificA,但是目前其實(shí)現(xiàn)還有很多地方不滿足算法要求,所以不能保證嚴(yán)格的強(qiáng)一致性。主要有以下幾點(diǎn):
Meta一致性:上一篇中分析了ES中Meta一致性的問(wèn)題,可以看到ES并不能完全保證Meta一致性,因此也必然無(wú)法嚴(yán)格保證Data的一致性。
Prepare階段:PacificA中有Prepare階段,保證數(shù)據(jù)在所有節(jié)點(diǎn)Prepare成功后才能Commit,保證Commit的數(shù)據(jù)不丟,ES中沒(méi)有這個(gè)階段,數(shù)據(jù)會(huì)直接寫(xiě)入。
讀一致性:ES中所有InSync的Replica都可讀,提高了讀能力,但是可能讀到舊數(shù)據(jù)。另一方面是即使只能讀Primary,ES也需要Lease機(jī)制等避免讀到Old Primary。因?yàn)镋S本身是近實(shí)時(shí)系統(tǒng),所以讀一致性要求可能并不嚴(yán)格。
小結(jié)
本文分析了ES中數(shù)據(jù)流的一致性問(wèn)題,可以看到ES最近幾年在這一塊有很多進(jìn)展,但也存在許多問(wèn)題。本文是Elasticsearch分布式一致性原理剖析的最后一篇文章,該系列文章是對(duì)ES的一個(gè)調(diào)研分析總結(jié),逐步分析了ES中的節(jié)點(diǎn)發(fā)現(xiàn)、Master選舉、Meta一致性、Data一致性等,對(duì)能夠讀完該系列文章的同學(xué)說(shuō)一聲感謝,期待與大家的交流。
詳情請(qǐng)閱讀原文
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://specialneedsforspecialkids.com/yun/68935.html
摘要:前言分布式一致性原理剖析系列將會(huì)對(duì)的分布式一致性原理進(jìn)行詳細(xì)的剖析,介紹其實(shí)現(xiàn)方式原理以及其存在的問(wèn)題等基于版本。使用額外的一致性組件維護(hù)。管理的全局組件,其保證數(shù)據(jù)的一致性。將這個(gè)加入自己的,同時(shí)向所有發(fā)送請(qǐng)求,要求將這個(gè)加入。 前言Elasticsearch分布式一致性原理剖析系列將會(huì)對(duì)Elasticsearch的分布式一致性原理進(jìn)行詳細(xì)的剖析,介紹其實(shí)現(xiàn)方式、原理以及其存在的問(wèn)題...
摘要:前言分布式一致性原理剖析系列將會(huì)對(duì)的分布式一致性原理進(jìn)行詳細(xì)的剖析,介紹其實(shí)現(xiàn)方式原理以及其存在的問(wèn)題等基于版本。中需要持久化的包括當(dāng)前版本號(hào),每次更新加。收集不到足夠的,于是本次發(fā)布失敗,同時(shí)退出狀態(tài)。 前言Elasticsearch分布式一致性原理剖析系列將會(huì)對(duì)Elasticsearch的分布式一致性原理進(jìn)行詳細(xì)的剖析,介紹其實(shí)現(xiàn)方式、原理以及其存在的問(wèn)題等(基于6.2版本)。前一...
閱讀 1993·2021-11-24 10:45
閱讀 1849·2021-10-09 09:43
閱讀 1291·2021-09-22 15:38
閱讀 1219·2021-08-18 10:19
閱讀 2837·2019-08-30 15:55
閱讀 3056·2019-08-30 12:45
閱讀 2960·2019-08-30 11:25
閱讀 356·2019-08-29 11:30