摘要:所以消息可以重復(fù)的放入不同的隊(duì)列中。而是對(duì)于消息來說的,在其發(fā)送消息到交換器時(shí),需指定。與發(fā)布訂閱模式的相同點(diǎn)是可以將消息重復(fù)發(fā)送。它需要處理低延遲的傳遞,用于支持傳統(tǒng)的消息傳遞系統(tǒng)用例。
理解概念的一個(gè)方法
之前說過學(xué)習(xí)一個(gè)新的東西,最核心的就是掌握概念。而如何掌握概念呢?我的其中一個(gè)方法就是對(duì)比,把相似且模糊不清的兩個(gè)概念進(jìn)行對(duì)比,這樣就理解更快。
RabbitMQ模式RabbitMQ有以下模式:
1.工作隊(duì)列(Worke Queues)
發(fā)消息和收消息都是直接通過隊(duì)列。在耗時(shí)比較多的任務(wù),我們把任務(wù)放入隊(duì)列里,然后每個(gè)工作者去獲取任務(wù)然后處理。所以這個(gè)工作隊(duì)列,也稱為任務(wù)隊(duì)列(Task Queues)。這樣就將耗資源的任務(wù)從產(chǎn)生任務(wù)的應(yīng)用上解耦出來。
這個(gè)模式最主要的特征是:每個(gè)任務(wù)只會(huì)分發(fā)到一個(gè)工作者中。
2.發(fā)布/訂閱(Publish/Subscribe)
這個(gè)發(fā)布/訂閱和觀察者模式很像,但不是同一個(gè)東西。具體可看看發(fā)布/訂閱和觀察者區(qū)別。
在這里,RabbitMQ引入了交換器(Exchange)的概念,生產(chǎn)者不直接與隊(duì)列交互,而是通過交換器去與隊(duì)列進(jìn)行交互(或者叫綁定)。也就說生產(chǎn)者只和交換器交互。引入交換器這概念后,這消息中間件可以玩的花樣就多了。發(fā)布/訂閱(Publish/Subscribe)就是其中的一個(gè)。這里使用到的就是fanout的交換器。
這個(gè)模式最主要的特征是:類似于廣播(broadcast),同個(gè)消息可以發(fā)送到不同的隊(duì)列中去,而且這fanout交換器也不關(guān)系隊(duì)列有哪些,只要隊(duì)列和fanout交換器有綁定就發(fā)送,這樣就可以將消息重復(fù)發(fā)送到不同的隊(duì)列上。
與工作隊(duì)列模式的區(qū)別是:發(fā)布/訂閱的概念叫消息,而不是任務(wù)。所以消息可以重復(fù)的放入不同的隊(duì)列中。
3.路由(Routing)
路由模式也是引入交換器概念后,消息中間件玩的一個(gè)花樣。這里用到的交換器叫direct。
在這模式里,得新增兩個(gè)概念,分別是binding key和routing key, binding key是對(duì)于隊(duì)列來說的,在其與direct交換器綁定時(shí)指定binding key。而routing key是對(duì)于消息來說的,在其發(fā)送消息到direct交換器時(shí),需指定routing key。這樣routing key能夠和binding key匹配得上的(就是值相等),direct交換器就會(huì)將消息發(fā)送到對(duì)應(yīng)binding key的隊(duì)列上。
這個(gè)模式最主要的特征是:控制消息的精度更高,可以指定哪些消息發(fā)送到哪些隊(duì)列里。
與發(fā)布/訂閱模式的區(qū)別是:區(qū)別是發(fā)布/訂閱是廣播,將消息發(fā)送到任何綁定交換器的隊(duì)列上,所以沒能力選擇消息,而路由是需binding key和routing key匹配上,消息才能發(fā)送到對(duì)應(yīng)binding key的隊(duì)列上,從而有能力去選擇消息。
與發(fā)布/訂閱模式的相同點(diǎn)是:可以將消息重復(fù)發(fā)送。
注:隊(duì)列可以綁定多個(gè)routing key
4.主題(Topics)
當(dāng)然,主題模式也是引入交換器概念后,消息中間件玩的一個(gè)花樣。這里用到的交換器叫topic。
這里用到的也是binding key和routing key,但不一樣的是,routing_key不能指定明確的key。而是這個(gè)key需要帶有點(diǎn)“.”,如 "stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit"。而在這模式下,binding key的指定可以更廣泛些,其結(jié)構(gòu)是這樣的".orange." 、 "..rabbit" 和"lazy.#"。其中*(星號(hào))是可以代表一個(gè)單詞,#(井號(hào))是可以代表零個(gè)或多個(gè)單詞。也跟路由類似的,只要這樣routing key能夠和binding key匹配得上的(這里可以不用值相等,模式匹配上即可),topic交換器就會(huì)將消息發(fā)送到對(duì)應(yīng)binding key的隊(duì)列上。
如Q1隊(duì)列的binding key是".orange.",而 Q2是"..rabbit"和"lazy.#"。如果消息的routing key是 "quick.orange.rabbit" 則此消息會(huì)被發(fā)送到Q1和Q2隊(duì)列上。routing key是"quick.orange.fox"的消息只會(huì)發(fā)送到Q1隊(duì)列上。routing key是"lazy.pink.rabbit" 的消息只會(huì)發(fā)送到Q2隊(duì)列一次,routing key是 "quick.brown.fox" 的消息沒有匹配任何的binding key則此消息丟棄。
注:隊(duì)列可以綁定多個(gè)routing key
5.遠(yuǎn)程過程調(diào)用RPC(Remote Procedure Call)
RPC可以遠(yuǎn)程調(diào)用函數(shù),等待服務(wù)器返回結(jié)果。
RPC的一個(gè)備注:RPC雖然用得很廣泛,然而它也有不足之處,就是開發(fā)人員無法清晰的知道自己調(diào)用的這個(gè)函數(shù)到底是本地函數(shù)還是很慢的RPC。這種困惑很容易導(dǎo)致出一個(gè)不可預(yù)測的系統(tǒng)和增加沒必要的復(fù)雜性導(dǎo)致難以定位問題。如果不用簡單的程序,誤用RPC還可能寫出很維護(hù)的意大利面條式的代碼。。
對(duì)于這個(gè)問題,有三個(gè)建議保證函數(shù)是很容易被辨別出是本地函數(shù)還是遠(yuǎn)程函數(shù)。
文檔化,清晰地記錄組件間的依賴。
處理網(wǎng)絡(luò)帶來的異常,如超時(shí)等。
當(dāng)出現(xiàn)用RPC是否必要時(shí),如果可以的話,你最好用異步管道(asynchronous pipeline)的形式,而不是使用阻塞形式的RPC。
。
RabbitMQ可以用于構(gòu)建RPC系統(tǒng)。一個(gè)客戶端和一個(gè)可擴(kuò)展的RPC服務(wù)器。不過此功能不太常用,所以就不留篇幅來講解。大概原理就是可以新增消息的屬性,從而將請(qǐng)求和響應(yīng)的消息給匹配上。
觀察者模式和發(fā)布/訂閱模式的區(qū)別觀察者模式
觀察者模式的定義:對(duì)象間的一種一對(duì)多的組合關(guān)系,以便一個(gè)對(duì)象的狀態(tài)發(fā)生變化時(shí),所有依賴于它的對(duì)象都得到通知。
舉個(gè)例子
假設(shè)你正在找一份軟件工程師的工作,對(duì)“香蕉公司”很感興趣。所以你聯(lián)系了他們的HR,給了他你的聯(lián)系電話。他保證如果有任何職位空缺都會(huì)通知你。這里還有幾個(gè)候選人也你一樣很感興趣。所以職位空缺大家都會(huì)知道,如果你回應(yīng)了他們的通知,他們就會(huì)聯(lián)系你面試。
該模式必須包含兩個(gè)角色:觀察者和觀察對(duì)象,香蕉公司就是被觀察者Subject,你就是Observers(還有和你一樣的候選人),當(dāng)被觀察者狀態(tài)發(fā)送變化(比如職位空缺)就會(huì)通知(notify)觀察者,前提是Observers注冊(cè)到Subject里,也就是香蕉公司的HR得有你的電話號(hào)碼。
發(fā)布/訂閱模式
在觀察者模式中的Subject就像一個(gè)發(fā)布者(Publisher),而觀察者(Observer)完全可以看作一個(gè)訂閱者(Subscriber)。subject通知觀察者時(shí),就像一個(gè)發(fā)布者通知他的訂閱者。這也就是為什么很多書和文章使用“發(fā)布-訂閱”概念來解釋觀察者設(shè)計(jì)模式。但是這里還有另外一個(gè)流行的模式叫做發(fā)布-訂閱設(shè)計(jì)模式。它的概念和觀察者模式非常類似。最大的區(qū)別是:
在發(fā)布-訂閱模式,消息的發(fā)送方,叫做發(fā)布者(publishers),消息不會(huì)直接發(fā)送給特定的接收者(訂閱者)。
意思就是發(fā)布者和訂閱者不知道對(duì)方的存在。需要一個(gè)第三方組件,叫做消息中間件,它將訂閱者和發(fā)布者串聯(lián)起來,它過濾和分配所有輸入的消息。換句話說,發(fā)布/訂閱模式用來處理不同系統(tǒng)組件的信息交流,即使這些組件不知道對(duì)方的存在。
我們?cè)O(shè)計(jì)kafka,是希望它能成為統(tǒng)一的平臺(tái)來處理大公司可能擁有的所有實(shí)時(shí)數(shù)據(jù)流。要做到這一點(diǎn),我們必須考慮相當(dāng)廣的用例(use case)。
它需要擁有高吞吐量來支持大容量事件流,如實(shí)時(shí)日志聚合(real-time log aggregation)。
它需要優(yōu)雅地處理大量的數(shù)據(jù)備份,用于支持離線系統(tǒng)的周期性數(shù)據(jù)負(fù)載。
它需要處理低延遲的傳遞,用于支持傳統(tǒng)的消息傳遞系統(tǒng)用例。
我們想它是分區(qū)、分布式、實(shí)時(shí)處理信息流,以創(chuàng)建新的信息流和傳輸信息流。這些動(dòng)機(jī)造就了kafka的分區(qū)和消費(fèi)者模型。
最后有可能數(shù)據(jù)流被輸入到其他數(shù)據(jù)系統(tǒng)中,而這些系統(tǒng)需要對(duì)外提供服務(wù),所以kafka需要有能力保證容錯(cuò)性,哪怕存在有機(jī)器宕機(jī)。
為了支持上述這些,我們?cè)O(shè)計(jì)了一些獨(dú)特元素,更類似于數(shù)據(jù)庫日志,而不是傳統(tǒng)的消息傳遞系統(tǒng)。
我們將在下面部分中概述設(shè)計(jì)中的一些元素。
持久化(Persistence) 別害怕文件系統(tǒng)kafka重度依賴文件系統(tǒng),用文件系統(tǒng)來存儲(chǔ)和緩存消息。人們都由這感覺“硬盤很慢”,以致于大家懷疑一個(gè)持久化架構(gòu)是否能具有競爭力的性能。實(shí)際上硬盤它很快也很慢,這取決于我們?cè)趺慈ナ褂盟?。一個(gè)合理的硬盤架構(gòu)通常可以和網(wǎng)絡(luò)一樣快。(看來作者的網(wǎng)速都很快)。
硬盤性能的關(guān)鍵是,磁盤驅(qū)動(dòng)器的吞吐量與過去十年的硬盤搜索的延遲有所不同。因此在6×7200rpm SATA RAID-5陣列的JBOD配置上的線性寫的性能大約為600MB/秒,但隨機(jī)寫入的性能僅為100k/秒,即超過6000倍的差別。這些線性讀寫是所有使用模式中最可預(yù)測的,并且由操作系統(tǒng)進(jìn)行了大量優(yōu)化?,F(xiàn)代操作系統(tǒng)都提供了預(yù)讀取(read-ahead)和后寫(write-behind)操作的技術(shù),這些支持多次讀取到一個(gè)大塊中和合并小的邏輯寫形成一個(gè)大的物理寫。這問題更深入的討論可以在這找到 ACM Queue article,他們確實(shí)發(fā)現(xiàn)順序硬盤讀寫在某些情況下比隨機(jī)內(nèi)存訪問還快。
為了彌補(bǔ)這些性能差異,現(xiàn)代操作系統(tǒng)越來越著重使用主存來做磁盤緩存。現(xiàn)代操作系統(tǒng)很樂意將空余內(nèi)存轉(zhuǎn)移到磁盤緩存中,但這需要承受在內(nèi)存被回收時(shí)帶來的一點(diǎn)點(diǎn)的性能損失。所有硬盤讀寫都通過這統(tǒng)一的緩存(磁盤緩存)。如果沒有直接IO,這特性并沒有那么容易被拋棄。因此即使一個(gè)進(jìn)場維護(hù)自己數(shù)據(jù)緩存時(shí),這些數(shù)據(jù)將會(huì)在OS的頁緩存里復(fù)制兩份,兩次高效地存儲(chǔ)所有東西。
此外,我們是在JVM基礎(chǔ)上建立的,任何一位有花時(shí)間去研究Java內(nèi)存的使用,都會(huì)知道以下兩件事情:
1.對(duì)象的內(nèi)存開銷非常高,通常會(huì)使要存儲(chǔ)的數(shù)據(jù)的大小增大一倍(甚至更多)。
2.隨著堆內(nèi)存的增加,Java垃圾收集會(huì)變得越來越繁瑣和緩慢。
也正是使用文件系統(tǒng)和依賴頁緩存(pagecache)帶來的結(jié)果優(yōu)于維護(hù)一個(gè)內(nèi)存中的緩存(in-memory cache)或是其他結(jié)構(gòu),通過對(duì)所有空閑內(nèi)存進(jìn)行自動(dòng)訪問,我們至少可以將可用緩存加倍,并且還可以繼續(xù)加倍,通過存儲(chǔ)緊湊的字節(jié)結(jié)構(gòu)而不是單個(gè)對(duì)象。這樣做的話可以在32GB的機(jī)器上使用28-30GB緩存,而不用擔(dān)心GC問題。而且,即使服務(wù)重啟,這些數(shù)據(jù)也保持熱度,對(duì)比起來,進(jìn)程內(nèi)存中的緩存在重啟后需要重建(對(duì)于10GB的緩存可能需要10分鐘),否則它需要從一個(gè)完全冷的緩存開始(這可能意味更糟糕的初始化性能)。這也極大地簡化了代碼,因?yàn)樵诰彺婧臀募到y(tǒng)之間保持一致性的所有邏輯現(xiàn)在都在操作系統(tǒng)中,這比一次性在進(jìn)程內(nèi)嘗試更有效、更正確。如果您的磁盤使用傾向于線性讀取,那么預(yù)讀取將有效地預(yù)操作這些緩存。
這表明了一個(gè)非常簡單的設(shè)計(jì):在我們耗盡空間的時(shí)候,與其保持盡可能多的內(nèi)存并將其全部清空到文件系統(tǒng),不如反過來,數(shù)據(jù)都是被立即寫入到文件系統(tǒng)上的持久日志中,而不必刷新到磁盤。實(shí)際上,這僅僅意味著它被轉(zhuǎn)移到內(nèi)核的頁緩存中。
以頁緩存為核心的設(shè)計(jì),在這里文章里有被描述,此文章是Varnish的設(shè)計(jì)。
在消息傳遞系統(tǒng)里的持久化數(shù)據(jù)結(jié)構(gòu)通常是一個(gè)消費(fèi)者隊(duì)列關(guān)聯(lián)著一棵BTree或者其他通用的隨機(jī)訪問數(shù)據(jù)結(jié)構(gòu)來維護(hù)消息的元數(shù)據(jù)。BTree是一個(gè)萬能的數(shù)據(jù)結(jié)構(gòu),可以在消息傳遞系統(tǒng)中支持各種事務(wù)和非事務(wù)性的語義。但它帶來相當(dāng)高的成本:BTree操作是O(log N)。通常O(log N)本質(zhì)上被認(rèn)為是等于常量時(shí)間,但對(duì)于硬盤操作則并不是這樣。磁盤尋軌達(dá)到10ms,并且每個(gè)磁盤一次只能執(zhí)行一次尋軌,所以并行性是有限的。因此,即使是少量的磁盤尋軌也會(huì)導(dǎo)致很高的開銷。由于存儲(chǔ)系統(tǒng)將非常快的緩存操作與非常慢的物理磁盤操作混合在一起,因此當(dāng)在緩存固定時(shí),數(shù)據(jù)增加時(shí),樹結(jié)構(gòu)的性能通常是超線性的。數(shù)據(jù)加倍則會(huì)使速度慢兩倍以上。
直觀上,一個(gè)持久的隊(duì)列可以建立在簡單的讀取和追加的形式,這通常也是日志解決方案使用的。這結(jié)構(gòu)有這樣的好處,所有操作都是O(1),并且讀操作不會(huì)阻塞寫和讀的操作。這是具有明顯的優(yōu)勢,是因?yàn)樾阅芡耆c數(shù)據(jù)量大小解耦了,一個(gè)服務(wù)現(xiàn)在可以充分利用那些大量的,且便宜,低轉(zhuǎn)速的SATA驅(qū)動(dòng)器。雖然硬盤的尋軌性能差,但它們的大型讀和寫的性能還是可以接受的,而且還是三分之一的價(jià)格就有三倍的容量。
在沒有任何性能懲罰的情況下訪問幾乎無限的磁盤空間意味著我們可以提供一些在消息傳遞系統(tǒng)中不常見的特性。例如,在kafka中,我們可以在相對(duì)較長的時(shí)間內(nèi)保留消息(比如一個(gè)星期),而不是每次消費(fèi)完就刪除消息。這將給消費(fèi)者帶來很大的靈活性。
我們?cè)谛史矫娓冻龃罅康呐?。我們最初用例中的一個(gè)是處理網(wǎng)站活動(dòng)數(shù)據(jù),這可以是非常大量的數(shù)據(jù):每個(gè)頁面的訪問都會(huì)產(chǎn)生許多寫操作。此外,我們假設(shè)每條消息至少被一個(gè)消費(fèi)者讀?。ㄍǔJ呛芏嘞M(fèi)者),因此我們努力讓消費(fèi)盡可能的便宜。
我們還發(fā)現(xiàn),經(jīng)歷過構(gòu)建和運(yùn)行多個(gè)類似的系統(tǒng),有效的多租戶業(yè)務(wù)的關(guān)鍵是效率。
我們?cè)谇懊嬲鹿?jié)討論過硬盤的效率。一旦消除了糟糕的磁盤訪問模式,在這種類型的系統(tǒng)中有兩個(gè)常見的低效原因:太多小的I/O操作和過度的字節(jié)復(fù)制。
這小IO問題發(fā)生在客戶端和服務(wù)器之間,和服務(wù)器自身的持久化操作中。
為了避免這種情況,我們的協(xié)議是圍繞一個(gè)“消息集(message set)”抽象構(gòu)建的,該抽象可以自然地將消息分組在一起。這允許網(wǎng)絡(luò)請(qǐng)求將消息分組,并分?jǐn)偩W(wǎng)絡(luò)往返的開銷,而不是一次發(fā)送一條消息。服務(wù)器依次將大量的消息追加到其日志中,而消費(fèi)者一次獲取大量的線性塊。
這個(gè)簡單的優(yōu)化產(chǎn)生數(shù)量級(jí)的加速。批處理導(dǎo)致了更大的網(wǎng)絡(luò)數(shù)據(jù)包、更大的順序磁盤操作、連續(xù)的內(nèi)存塊等等,所有這些都使得Kafka可以將隨機(jī)消息寫入的流變成 線性的寫 流給消費(fèi)者。
另一個(gè)低效率的是字節(jié)復(fù)制。在低消息率下,這不是一個(gè)問題,但在負(fù)載下的影響是顯著的。為了避免這種情況,我們采用了一種標(biāo)準(zhǔn)化的二進(jìn)制消息格式,由生產(chǎn)者、代理和消費(fèi)者共享(因此數(shù)據(jù)塊可以在不進(jìn)行修改的情況下傳輸)。
broker維護(hù)的消息日志本身就是一個(gè)文件目錄,每個(gè)文件都由一個(gè)以生產(chǎn)者和消費(fèi)者使用的相同格式寫入磁盤的消息集的序列填充。保持這種通用格式可以優(yōu)化最重要的操作:持久日志塊的網(wǎng)絡(luò)傳輸?,F(xiàn)代unix操作系統(tǒng)為將數(shù)據(jù)從頁緩存?zhèn)鬏數(shù)教捉幼痔峁┝烁叨葍?yōu)化的代碼路徑;在Linux中,這是通過sendfile的系統(tǒng)調(diào)用完成的。
要了解sendfile的作用,首先最重要先理解將數(shù)據(jù)從文件傳輸?shù)教捉幼值墓矓?shù)據(jù)路徑:
1.操作系統(tǒng)從磁盤讀取數(shù)據(jù)到內(nèi)核空間的頁緩存。
2.應(yīng)用程序?qū)?shù)據(jù)從內(nèi)核空間讀取到用戶空間緩沖區(qū)中。
3.應(yīng)用程序?qū)?shù)據(jù)返回到內(nèi)核空間,并將其寫入套接字緩沖區(qū)。
4.操作系統(tǒng)將數(shù)據(jù)從套接字緩沖區(qū)復(fù)制到通過網(wǎng)絡(luò)發(fā)送的NIC緩沖區(qū)。
有4次復(fù)制,兩次系統(tǒng)內(nèi)核調(diào)用,這樣的效率當(dāng)然就低下。使用sendfile,通過允許操作系統(tǒng)直接將數(shù)據(jù)從頁緩存發(fā)送到網(wǎng)絡(luò),避免了重復(fù)復(fù)制。因此在這個(gè)優(yōu)化的路徑中,只需要最后的復(fù)制,一次從磁盤復(fù)制到NIC緩沖區(qū)即可?!憧截?zero-copy)
我們期望一個(gè)常見的用例是在一個(gè)主題上有多個(gè)使用者。使用上述的零拷貝優(yōu)化,數(shù)據(jù)被完全復(fù)制到頁緩存中,并在每次讀取時(shí)重復(fù)使用,而不是存儲(chǔ)在內(nèi)存中并在每次讀取時(shí)將其復(fù)制到用戶空間。這就允許以接近網(wǎng)絡(luò)連接的極限的速率來讀取消息。
頁緩存和sendfile的組合意味著,在一個(gè)Kafka集群上,在有消費(fèi)者的機(jī)子上,您將看到磁盤上沒有任何讀取活動(dòng),因?yàn)樗鼈儗⑼耆珡木彺嬷刑峁?shù)據(jù)。
更多Java支持的sendfile和零拷貝,請(qǐng)點(diǎn)擊這里。
在某性情況下,事實(shí)上真正的瓶頸不是CPU也不是硬盤,而是網(wǎng)絡(luò)帶寬。對(duì)于需要在廣域網(wǎng)上的數(shù)據(jù)中心之間發(fā)送消息的數(shù)據(jù)管道來說,尤其如此。當(dāng)然,用戶自己可以壓縮消息而不需要kafka的支持。但這可能導(dǎo)致非常差的壓縮比,特別是當(dāng)消息的冗余字段很多(如JSON里的字段名和網(wǎng)站日志里的user agent或公共字符串)。高效的壓縮需要多個(gè)消息壓縮在一起,而不是每個(gè)消息獨(dú)立壓縮。
Kafka用高效的批處理格式支持這一點(diǎn)。可以將一批消息聚合到一起壓縮,并以這種形式發(fā)送到服務(wù)器。這批消息將以壓縮的形式寫入,并且將在日志中保持壓縮,并且只會(huì)被使用者解壓。
Kafka支持GZIP、Snappy和LZ4壓縮協(xié)議。關(guān)于壓縮的更多細(xì)節(jié)可以在這里找到。
生產(chǎn)者直接發(fā)送數(shù)據(jù)到broker,不需要任何的中間路由層,而接受的broker是該分區(qū)的leader。為了幫助生產(chǎn)者實(shí)現(xiàn)這一點(diǎn),所有Kafka節(jié)點(diǎn)都可以回答關(guān)于哪些是可用服務(wù)器的元數(shù)據(jù)的請(qǐng)求,以及在任何給定的時(shí)間內(nèi),某個(gè)主題的分區(qū)的leader是否允許生產(chǎn)者適當(dāng)?shù)匕l(fā)送它的請(qǐng)求。
由客戶端控制它想往哪個(gè)分區(qū)生產(chǎn)消息。這可以隨機(jī)地進(jìn)行,實(shí)現(xiàn)一種隨機(jī)的負(fù)載平衡,或者可以通過一些語義分區(qū)函數(shù)來實(shí)現(xiàn)。我們提供了語義分區(qū)的接口,允許用戶指定一個(gè)分區(qū)的key,并使用這個(gè)key來做hash到一個(gè)分區(qū)(如果需要的話,也是可以復(fù)寫這分區(qū)功能的)。例如,我們選擇user的id作為可用,則所以該用戶的信息都會(huì)發(fā)送到同樣的分區(qū)。這反過來又會(huì)讓消費(fèi)者對(duì)他們的消費(fèi)產(chǎn)生局部性的假設(shè)。這種明確設(shè)計(jì)的分區(qū),允許消費(fèi)者自己本地的處理。
批處理是效率的主要驅(qū)動(dòng)因素之一,為了能夠批處理,kafka的生產(chǎn)者會(huì)嘗試在內(nèi)存中積累數(shù)據(jù),然后在一起在一個(gè)請(qǐng)求中以大批量的形式發(fā)送出去。批處理這個(gè)可以設(shè)置按固定的消息數(shù)量或按特定的延遲(64k或10ms)。這允許累積更多字節(jié)的發(fā)送出去,這樣只是在服務(wù)器上做少量的大IO操作。這種緩沖是可配置的,這樣提供了一種機(jī)制來以額外的延遲來提高吞吐量。
具體的配置)和生產(chǎn)者的api可以在這文檔中找到。
kafka消費(fèi)者的工作方式是,向其想消費(fèi)的分區(qū)的leader發(fā)送“fetch”請(qǐng)求。在每個(gè)請(qǐng)求中消費(fèi)者指定日志的偏移量,然后接受回一大塊從偏移量開始的日志。因此,消費(fèi)者對(duì)position有重要的控制權(quán),如果需要,可以重置position來重新消費(fèi)數(shù)據(jù)。
Push和pull我們首先考慮的一個(gè)問題是,消費(fèi)者應(yīng)該是從broker拉取消息,還是應(yīng)該是broker把消息推送給消費(fèi)者。在這方面,kafka遵循了一種更傳統(tǒng)的設(shè)計(jì),大多數(shù)消息傳遞系統(tǒng)也會(huì)用的,那就是數(shù)據(jù)是從生產(chǎn)者push到broker,消費(fèi)者是從broker拉取數(shù)據(jù)。一些日志集中系統(tǒng),如Scribe和Apache Flume,遵循一個(gè)非常不同的,基于推送的路徑,將數(shù)據(jù)被推到下游。這兩種方法都由利弊,在基于推送的系統(tǒng),由于是broker得控制數(shù)據(jù)傳輸?shù)乃俾剩煌M(fèi)者可能要不同的速率。然而消費(fèi)者一般的目的都是讓消費(fèi)者自己能夠以最大的速度進(jìn)行消費(fèi),但在基于push的系統(tǒng),當(dāng)消費(fèi)速率低于生產(chǎn)效率時(shí),消費(fèi)者就不知道該怎么辦好了(本質(zhì)上就是一種拒絕服務(wù)攻擊(DOS))。一個(gè)基于pull的系統(tǒng)就擁有很好的熟悉,消費(fèi)者可以簡單的調(diào)控速率。
基于pull的系統(tǒng)的另一個(gè)優(yōu)點(diǎn)是,它可以對(duì)發(fā)送給消費(fèi)者的數(shù)據(jù)進(jìn)行聚合的批處理。基于推送的系統(tǒng)必須選擇立即發(fā)送請(qǐng)求或積累更多數(shù)據(jù),然后在不知道下游用戶是否能夠立即處理它的情況下發(fā)送它。如果對(duì)低延遲進(jìn)行調(diào)優(yōu),這將導(dǎo)致僅在傳輸結(jié)束時(shí)發(fā)送一條消息,最終將被緩沖,這是浪費(fèi)?;趐ull的設(shè)計(jì)解決了這個(gè)問題,因?yàn)橛脩艨偸窃谌罩镜漠?dāng)前位置(或者是一些可配置的最大大小)之后提取所有可用的消息。因此,我們可以在不引入不必要的延遲的情況下獲得最佳的批處理。
基于pull的系統(tǒng)的缺點(diǎn)是,如果broker沒數(shù)據(jù),則消費(fèi)者可能會(huì)不停的輪訓(xùn)。為了避免這一點(diǎn),我們?cè)趐ull請(qǐng)求上提供了參數(shù),允許消費(fèi)者在“長輪訓(xùn)”中阻塞,直到數(shù)據(jù)達(dá)到(并且可以選擇等待,直到一定數(shù)量的自己可以,確保傳輸?shù)拇笮。?
你可能詳細(xì)其他可能的設(shè)計(jì),如只有pull,點(diǎn)到點(diǎn)。生產(chǎn)者會(huì)將本地的日志寫到本地日志中,而broker則會(huì)從這些日志中拉取數(shù)據(jù)。通常還會(huì)提出類似的“存儲(chǔ)轉(zhuǎn)發(fā)(store-and-forward)”生產(chǎn)者。這很有趣,但是我們覺得不太適合我們的目標(biāo)用例:它有成千上萬的生產(chǎn)者。我們?cè)诖笠?guī)模上運(yùn)行持久數(shù)據(jù)系統(tǒng)的經(jīng)驗(yàn)使我們覺得,在許多應(yīng)用程序中涉及到數(shù)千個(gè)磁盤,實(shí)際上并不會(huì)使事情變得更可靠,而且操作起來也會(huì)是一場噩夢。在實(shí)踐中,我們發(fā)現(xiàn),我們可以在不需要生產(chǎn)者持久化的情況下,以大規(guī)模的SLAs來運(yùn)行管道。
消費(fèi)者的Position(Consumer Position)令人驚訝的是,跟蹤所使用的內(nèi)容是消息傳遞系統(tǒng)的關(guān)鍵性能點(diǎn)之一。
很多消息傳遞系統(tǒng)在broker中保存了關(guān)于什么消息是被消費(fèi)了的元數(shù)據(jù)。也就是說,當(dāng)消息傳遞給消費(fèi)者時(shí),broker要么立即記錄信息到本地,要么就是等待消費(fèi)者的確認(rèn)。這是一個(gè)相當(dāng)直觀的選擇,而且對(duì)于一臺(tái)機(jī)器服務(wù)器來說,很清楚地知道這些消息的狀態(tài)。由于許多消息傳遞系統(tǒng)中用于存儲(chǔ)的數(shù)據(jù)結(jié)構(gòu)都很糟糕,因此這(記錄消息狀態(tài))也是一個(gè)實(shí)用的選擇——因?yàn)閎roker知道什么是已經(jīng)被消費(fèi)的,所以可以立即刪除它,保持?jǐn)?shù)據(jù)的大小。
讓broker和消費(fèi)者就已經(jīng)消費(fèi)的東西達(dá)成一致,這可不是小問題。如果一條消息發(fā)送到網(wǎng)絡(luò)上,broker就把它置為已消費(fèi),但消費(fèi)者可能處理這條消息失敗了(或許是消費(fèi)者掛了,也或許是請(qǐng)求超時(shí)等),這條消息就會(huì)丟失了。為了解決這個(gè)問題,很多消息傳遞系統(tǒng)增加了確認(rèn)機(jī)制。當(dāng)消息被發(fā)送時(shí),是被標(biāo)志為已發(fā)送,而不是已消費(fèi);這是broker等待消費(fèi)者發(fā)來特定的確認(rèn)信息,則將消息置為已消費(fèi)。這個(gè)策略雖然解決了消息丟失的問題,但卻帶來了新的問題。第一,如果消費(fèi)者在發(fā)送確認(rèn)信息之前,在處理完消息之后,消費(fèi)者掛了,則會(huì)導(dǎo)致此消息會(huì)被處理兩次。第二個(gè)問題是關(guān)于性能,broker必須保存每個(gè)消息的不同狀態(tài)(首先先鎖住消息以致于不會(huì)讓它發(fā)送第二次,其次標(biāo)志位已消費(fèi)從而可以刪除它)。還有些棘手的問題要處理。如消息被發(fā)送出去,但其確認(rèn)信息一直沒返回。
kafka處理則不一樣。我們的主題被分為一個(gè)有序分區(qū)的集合,且每個(gè)分區(qū)在任何給定的時(shí)間內(nèi)只會(huì)被訂閱它的消費(fèi)者組中的一個(gè)消費(fèi)者給使用。這意味著每個(gè)分區(qū)中的消費(fèi)者的position僅僅是一個(gè)整數(shù),這是下一次消費(fèi)時(shí),消息的偏移量。這使?fàn)顟B(tài)(記錄是否被消費(fèi))非常小,每個(gè)分區(qū)只有一個(gè)數(shù)字。這個(gè)狀態(tài)可以被定期檢查。這樣確認(rèn)一條消息是否被消費(fèi)的成本就很低。
這樣還附加了一個(gè)好處。消費(fèi)者可以重置其最先的position從而重新消費(fèi)數(shù)據(jù)。這雖然違反了隊(duì)列的公共契約,但它卻變成關(guān)鍵功能給許多消費(fèi)者。例如,如果消費(fèi)者代碼有一個(gè)bug,并且在一些消息被消費(fèi)后才被發(fā)現(xiàn),那么當(dāng)bug被修復(fù)后,消費(fèi)者就可以重新使用這些消息。
離線數(shù)據(jù)加載(Offline Data Load)可擴(kuò)展持久化允許只有周期性地使用批量數(shù)據(jù)的消費(fèi)者的可能性,比如定期將批量數(shù)據(jù)加載到離線系統(tǒng)(如Hadoop或關(guān)系數(shù)據(jù)倉庫)。
消息傳遞語義(Message Delivery Semantics)現(xiàn)在我們已經(jīng)了解了些生產(chǎn)者和消費(fèi)者是怎么工作的,接下來我們說下kafka提供給生產(chǎn)者和消費(fèi)者的語義保證。很明顯這里提供了以下幾種消息傳遞保證機(jī)制:
至多一次(At most once),這樣消息可能會(huì)丟失,但永遠(yuǎn)不會(huì)重新傳遞。
至少一次(At least once),這樣消息不可能會(huì)丟失,但可能會(huì)重新傳遞。
有且僅有一次(Exactly once),這是大家想要的,每個(gè)消息會(huì)被傳遞一次,而且也僅僅只有一次。
值得注意的是,這可以歸結(jié)為兩個(gè)問題:發(fā)布消息的持久化保證,以及在消費(fèi)消息時(shí)的保證。
很多系統(tǒng)聲稱提供“有且僅有一次”的傳遞語義,但閱讀這些細(xì)節(jié)時(shí),會(huì)發(fā)現(xiàn)其中大部分都是誤導(dǎo)(他們不理解消費(fèi)者或生產(chǎn)者可能掛掉的情況,那些有多個(gè)消費(fèi)者處理的情況,或者是那些被寫入磁盤的數(shù)據(jù)可能丟失的情況)。
kafka的語義很直接。在發(fā)布消息時(shí),我們將消息“提交”到log中。一旦發(fā)布的消息被提交,只要有一個(gè)broker復(fù)制這個(gè)消息被寫入活動(dòng)分區(qū),它就不會(huì)丟失。提交的消息的定義、活動(dòng)分區(qū)以及我們?cè)噲D處理的失敗的類型的描述將在下一節(jié)(副本)中詳細(xì)描述?,F(xiàn)在我們假設(shè)在完美的情況下,現(xiàn)在讓我們假設(shè)一個(gè)完美的、無損的broker,和嘗試?yán)斫鈱?duì)生產(chǎn)者和消費(fèi)者的保證。如果一個(gè)生產(chǎn)者試圖發(fā)布消息并經(jīng)歷一個(gè)網(wǎng)絡(luò)錯(cuò)誤,那么就不能確定該錯(cuò)誤發(fā)生在消息提交之前還是之后。這類似于插入到一個(gè)數(shù)據(jù)庫表的自動(dòng)生成的主鍵的語義。
在0.11.0.0版本之前,如果一個(gè)生產(chǎn)者沒有收到一個(gè)消息已經(jīng)提交的響應(yīng),那么它幾乎沒有選擇,只能重新發(fā)送消息。這提供了“至少一次”的傳遞語義,因?yàn)槿绻颊?qǐng)求實(shí)際上成功了,那么在重新發(fā)送期間,消息可能再次被寫入到日志中。從0.11.0.0開始,Kafka生產(chǎn)者也支持一個(gè)冪傳遞的選項(xiàng),該選項(xiàng)保證重新發(fā)送不會(huì)導(dǎo)致日志中有這重復(fù)的消息。為了實(shí)現(xiàn)這一目標(biāo),broker為每個(gè)生產(chǎn)者分配一個(gè)ID,并使用由生產(chǎn)者發(fā)送消息時(shí)一起把序列號(hào)發(fā)送到broker,這樣broker就可以根據(jù)序列和id來處理重復(fù)的消息。同樣,從0.11.0.0開始,生產(chǎn)者支持使用類似于事務(wù)的語義向多個(gè)主題分區(qū)發(fā)送消息:即所有消息都已成功寫入或都失敗寫入。這種情況的主要應(yīng)用場景是在Kafka主題之間進(jìn)行“有且僅有一次”的處理(如下所述)。
并非所有的用例都需要這樣強(qiáng)的保證。對(duì)于延遲敏感的使用,我們?cè)试S生產(chǎn)者指定它需要的持久化級(jí)別。如果生產(chǎn)者指定要等待消息被提交要在10ms完成。則生產(chǎn)者可以指定它異步地執(zhí)行發(fā)送,或者等待直到leader(但不一定是follower)得到消息。
現(xiàn)在我們描述下消費(fèi)者視角下的語義。所有的副本都有相同的日志和相同的偏移量。消費(fèi)者控制它在這個(gè)日志中的position。如果消費(fèi)者從未崩潰,它可以將這個(gè)position存儲(chǔ)在內(nèi)存中,但是如果消費(fèi)者崩潰了,我們希望這個(gè)主題的分區(qū)來接替這個(gè)position的處理,那么新的進(jìn)程將需要選擇一個(gè)合適的position來開始處理。
消費(fèi)者讀取消息時(shí),有幾個(gè)處理消息和更新其位置的選項(xiàng)。
第二種是它先讀取消息,然后將position保存到日志中,最后是處理消息。在這種情況下,在保存其position之后,在保存處理消息產(chǎn)生的輸出之前,消費(fèi)者進(jìn)程可能會(huì)崩潰。在這種情況下,接手處理的過程將從保存的position開始,即使在此position之前的一些消息未被處理。這是對(duì)應(yīng)著“至多一次”的語義,失敗的消息可能不被處理。
第二種是它先讀取消息,然后處理消息,最后保存position到日志中。在這種情況下,在處理消息后,消費(fèi)者進(jìn)程可能會(huì)崩潰,但是在它保存它的position之前崩潰的。在這種情況下,當(dāng)新進(jìn)程接手了它接收到的最初幾條消息時(shí),或許這幾條消息就已經(jīng)被處理過了。在消費(fèi)者崩潰的情況下,這相當(dāng)于“至少一次”的語義。在許多情況下,消息有主鍵,因此更新是冪等的(接收相同的消息兩次,只是用另一個(gè)副本重寫了一個(gè)記錄)。
那“有且僅有一次”的語義怎樣(或者是說你到底想要什么)?從kafka主題中獲取消息處理后發(fā)布到其他主題(如一個(gè)Kafka Streams應(yīng)用),我們可以利用上面提到的版本0.11.0.0里的新事務(wù)生產(chǎn)者的功能。消費(fèi)者的position被當(dāng)做一個(gè)消息存儲(chǔ)在一個(gè)主題,因此我們可以在與接收處理數(shù)據(jù)的輸出主題相同的事務(wù)中寫入kafka的偏移量。 如果事務(wù)被中止,消費(fèi)者的position將恢復(fù)到原來的值,而輸出主題的生成數(shù)據(jù)將不會(huì)被其他消費(fèi)者看到,這取決于他們的“隔離級(jí)別”。在默認(rèn)的“read_uncommitted”隔離級(jí)別中,所有消息對(duì)消費(fèi)者都是可見的,即使它們是被中止的事務(wù)的一部分,但是在“read_committed”中,使用者只會(huì)從提交的事務(wù)中返回消息(以及任何不屬于事務(wù)的消息)。
當(dāng)寫入外部系統(tǒng)時(shí),限制是在需要協(xié)調(diào)消費(fèi)者的position和實(shí)際存儲(chǔ)的輸出。實(shí)現(xiàn)這一目標(biāo)的經(jīng)典方法是在存儲(chǔ)消費(fèi)者position和存儲(chǔ)消費(fèi)者輸出之間引入兩階段提交。但這可以更簡單地處理,并且通常通過讓消費(fèi)者將其偏移量存儲(chǔ)在與輸出相同的位置。這樣做比較好,因?yàn)橄M(fèi)者可能想要寫入的輸出系統(tǒng)都不支持兩階段提交。作為一個(gè)例子,考慮一個(gè)Kafka Connect連接器,它在HDFS中填充數(shù)據(jù),以及它讀取的數(shù)據(jù)的偏移量,從而保證數(shù)據(jù)和偏移量都得到了更新,或者兩者都不更新。對(duì)于需要這些更強(qiáng)語義的其他許多數(shù)據(jù)系統(tǒng),我們遵循類似的模式是為了那些需要強(qiáng)一致性語義的系統(tǒng),還為了這些消息沒有主鍵來允許刪除重復(fù)數(shù)據(jù)。
因此kafka為了kafka Streams,高效地支持“有且僅有一次”的傳遞,并且在Kafka主題之間傳輸和處理數(shù)據(jù)時(shí),通??梢允褂檬聞?wù)生產(chǎn)者/消費(fèi)者提供“有且僅有一次”的傳遞。對(duì)于其他目標(biāo)系統(tǒng)的“有且僅有一次”的傳遞一般需要協(xié)調(diào),但kafka提供了偏移量,它可以實(shí)現(xiàn)這要求(參見Kafka Connect)。否則,缺省情況下Kafka保證“至少一次”傳遞,并且允許用戶禁止生產(chǎn)者的重試或消費(fèi)者在處理數(shù)據(jù)之前提交position,從而實(shí)現(xiàn)“至多一次”的專遞。
副本(Replication)Kafka通過一個(gè)可配置的服務(wù)器數(shù)量對(duì)每個(gè)主題的分區(qū)進(jìn)行復(fù)制日志(你您可以按主題設(shè)置此副本因子(replication factor))。這允許在集群中的服務(wù)器發(fā)生故障時(shí)自動(dòng)恢復(fù),因此當(dāng)在出現(xiàn)故障時(shí)仍然可以使用消息。
其他消息傳遞系統(tǒng)提供了副本相關(guān)的特性,但,我們認(rèn)為,這似乎是一種策略而已,并沒有大量的使用,而且還有個(gè)很大的缺點(diǎn):slave是未被用上的,吞吐量受到嚴(yán)重的影響,恢復(fù)還需要繁瑣的人工配置,等等。kafka默認(rèn)是使用了副本功能,實(shí)際上那些副本因子設(shè)置為1的主題,我們也會(huì)當(dāng)做是使用副本功能的主題。
副本的最小單元是主題的分區(qū)。在沒有失敗的情況下,kafka的每個(gè)分區(qū)都是有一個(gè)leader,其follower可以為零個(gè)或多個(gè)。包括leader在內(nèi)的副本數(shù)量就是副本因子。所有讀和寫都是通過leader分區(qū)。通常情況,分區(qū)的數(shù)據(jù)量是多個(gè)broker,leader的數(shù)量時(shí)平均分配當(dāng)每個(gè)broker。follower的日志和leader的日志是完全相同的——它們都具有相同的偏移量和相同順序的消息(當(dāng)然,在任何給定的時(shí)刻,在日志的末尾可能會(huì)有一些還未同步到的消息)。
follower也跟kafka的普通消費(fèi)者一樣從leader消費(fèi)消息。follower從leader拉消息時(shí),有個(gè)很好的特性,那就時(shí)可以讓follower很容易地批量把日志應(yīng)用到其(follower)日志中。
跟很多 分布式系統(tǒng)處理自動(dòng)恢復(fù) 一樣,對(duì)于節(jié)點(diǎn)是否“存活(alive)”需要有一個(gè)明確的定義。對(duì)于kafka,節(jié)點(diǎn)存活有以下兩個(gè)條件:
1.節(jié)點(diǎn)必須維護(hù)它與ZooKeeper的session(通過ZooKeeper的心跳機(jī)制)
2.如果是slave,就必須復(fù)制leader,而且不能落后太遠(yuǎn)。
滿足上述兩個(gè)條件的節(jié)點(diǎn),我們更愿意叫“已同步(in sync)”而不是模糊不清的“存活”或“失敗”。leader保持跟蹤這些“已同步”的節(jié)點(diǎn)。如果follower掛了,或者卡住了,或者落后太遠(yuǎn)了,leader會(huì)講起從已同步的副本名單中移除。是有e replica.lag.time.max.ms這配置去控制卡住多長時(shí)間和落后多少副本數(shù)量。
在分布式系統(tǒng)術(shù)語中,我們只嘗試處理一個(gè)“失敗/恢復(fù)”模型,即節(jié)點(diǎn)突然停止工作,然后恢復(fù)(可能不知道它們已經(jīng)死亡)。kafka沒有處理所謂的“拜占庭式”的失敗,即節(jié)點(diǎn)產(chǎn)生任意或惡意的響應(yīng)(可能是由于某些錯(cuò)誤)。
現(xiàn)在,我們可以更精確地定義一個(gè)消息的提交,當(dāng)所有副本都同步到分區(qū),分區(qū)并且應(yīng)用到其日志中時(shí),就會(huì)被認(rèn)為是提交的。只有提交的消息才會(huì)分發(fā)給消費(fèi)者。這就意味著消費(fèi)者不用擔(dān)心當(dāng)leader崩潰時(shí),消息會(huì)丟失。另一方面,生產(chǎn)者可以選擇等待消息提交或不提交,這取決與它們對(duì)延遲和持久化之間的權(quán)衡。生產(chǎn)者可以使用acks這配置來控制這權(quán)衡。注意,這“最小數(shù)據(jù)量(minimun number)”同步副本的數(shù)量設(shè)置,是指當(dāng)消息都同步到所有副本后,kafka再去檢查時(shí),檢查的最小數(shù)量。如果生產(chǎn)者對(duì)確認(rèn)要求不太嚴(yán)格,則消息一發(fā)布就可以被使用了,即使同步副本數(shù)量還沒達(dá)到最小值。(這最小值可以低到只有一個(gè),那就是leader)。
kafka保證消息不會(huì)丟,只要任何時(shí)候至少有一個(gè)已同步的副本存在。
kafka可以在節(jié)點(diǎn)故障的情況下可用。但存在網(wǎng)絡(luò)分區(qū)時(shí),就可能無法使用了。
分區(qū)就是一個(gè)副本日志。副本日志是分布式數(shù)據(jù)系統(tǒng)的最基本的原語(primitvie)之一,而且有很多種實(shí)現(xiàn)方式。其他系統(tǒng)可以使用副本日志作為一種原語,用于在狀態(tài)機(jī)形式的分布式系統(tǒng)。
對(duì)于一系列值的順序達(dá)成一致的過程(通常編號(hào)為0、1、2、…),副本日志就是將其模型化。有很多方法可以實(shí)現(xiàn)這一點(diǎn),但最簡單和最快的是leader來選擇序值。只有l(wèi)eader還存活,所喲follower都只需要復(fù)制值即可,順序由leader決定。
當(dāng)然,如果leader不掛,那我們沒必要要follower。當(dāng)leader崩潰時(shí),我從follower中選擇出新的leader。但follower自己可能落后或崩潰,所以我們必須保證我們選擇的是最新的follower。日志復(fù)制算法必須這最基本的保證時(shí),如果我們告訴客戶端消息已經(jīng)提交了,而此時(shí)leader掛了,我們選擇的新leader也必須包含剛剛那個(gè)已經(jīng)提交了的消息。這就產(chǎn)生了一個(gè)權(quán)衡:如果leader等待過多的follower確認(rèn)消息,This yields a tradeoff: if the leader waits for more followers to acknowledge a message before declaring it committed then there will be more potentially electable leaders.
如果你指定確認(rèn)的數(shù)量和日志(與leader對(duì)比過的)的數(shù)量,這樣就保證有重疊性,那么這就叫法定人數(shù)(Quorums)。
這種權(quán)衡最常見的方法是,在提交決策和leader選舉中使用大多數(shù)投票。這不是kafka做的,但讓我們?nèi)ヌ剿魉?,了解它的利弊。假設(shè)我們有2f+1個(gè)副本。如果f+1節(jié)點(diǎn)收到消息,沒有超過f個(gè)節(jié)點(diǎn)失敗,則leader就保證所有消息都被提交,我們選擇新leader時(shí)也一樣。這是因?yàn)槲覀冊(cè)谌我夤?jié)點(diǎn)上選擇f+1個(gè)節(jié)點(diǎn),這f+1里必須至少有一個(gè)節(jié)點(diǎn)包含所有已提交消息的副本。副本最完整的結(jié)點(diǎn)將會(huì)被選中為新leader。這里還有很多算法細(xì)節(jié)需要處理(如明確定義日志的完整性,leader崩潰時(shí)怎么保證一致性,修改集群中的服務(wù)器),這些我們先暫時(shí)忽略。
多數(shù)投票方法有個(gè)非常好的特性:延遲僅僅取決于多臺(tái)最快的服務(wù)器。也就是說,如果副本因子時(shí)3,那么延遲由最快的一個(gè)slave決定,而不是最慢的slave(leader一個(gè)、最快的slave一個(gè),這就達(dá)到法定人數(shù)了)。
這個(gè)家族有很多算法,包括ZooKeeper的Zab,Raft和Viewstamped副本算法。我們知道的,更接近kafka的用的算法的學(xué)術(shù)出版是來自微軟的PacificA。
多數(shù)投票的不足之處就是,它不需要很多失敗的節(jié)點(diǎn),就可以讓你選擇不到leader。為了容忍一個(gè)節(jié)點(diǎn)失敗,則需要3個(gè)節(jié)點(diǎn),容忍2個(gè),則需要5個(gè)節(jié)點(diǎn)。在我們經(jīng)驗(yàn)里,以為只要?jiǎng)倓偤脡蛉哂嗟母北?,就能容忍一個(gè)節(jié)點(diǎn)的失敗,但這是不實(shí)際的,在5倍硬盤空間(5個(gè)硬盤,每個(gè)硬盤占1/5吞吐)情況下,每次都要寫5次,這對(duì)于大量數(shù)據(jù)的問題時(shí)不切實(shí)際的。這也是為什么法定人數(shù)算法比較常用在集群的配置文件如ZooKeeper,而很少用在原數(shù)據(jù)存儲(chǔ)上。例如在HDFS的namenode的高可用是建立在多數(shù)人投票,但這成本很高的算法不會(huì)用在它的數(shù)據(jù)存儲(chǔ)上。
Kafka使用了一個(gè)稍微不太一樣的方法去選擇法定人數(shù)。kafka動(dòng)態(tài)的維護(hù)一個(gè)ISR(in-sync replicas)集合,集合里面的節(jié)點(diǎn)都是已同步。只有這集合里面的人才適合選舉為leader。只有所有ISR都收到寫入分區(qū),則這分區(qū)的寫入就會(huì)被認(rèn)為已提交。這ISR保存在ZooKeeper。對(duì)于kafka的使用模型來說,這是一個(gè)重要的因素,那里有許多分區(qū),并且確保leader的平衡很重要。ISR模型和f+1副本,一個(gè)kafka主題可以容忍f個(gè)失?。偣簿蚮+1個(gè)節(jié)點(diǎn))。
我們想處理更多的用例,所以這個(gè)權(quán)衡我們覺得是合理的。在實(shí)際情況,對(duì)于容忍f個(gè)節(jié)點(diǎn)失敗,多數(shù)投票和ISR方法都是需要通用數(shù)量的副本確認(rèn)(比如,容忍1個(gè)節(jié)點(diǎn)失敗,多數(shù)投票方法則需要3個(gè)副本和1個(gè)確認(rèn),ISR方法需要2個(gè)副本和1個(gè)確認(rèn))。確認(rèn)提交而不需要由最慢的節(jié)點(diǎn)來確認(rèn)這是多數(shù)投票方法的好處。但我們覺得這是可以通過由客戶端選擇是否阻塞消息提交,以及控制副本因子(降低)而增加吞吐量和磁盤空間來優(yōu)化這個(gè)問題(這問題就是與多數(shù)投票對(duì)比)。
另一個(gè)重要的設(shè)計(jì)是,kafka不要求崩潰節(jié)點(diǎn)在所有數(shù)據(jù)完整的情況下恢復(fù)。在這個(gè)空間中,副本算法依賴于“穩(wěn)定存儲(chǔ)”的存在并不少見,這種“穩(wěn)定存儲(chǔ)”在任何故障恢復(fù)場景中都不能丟失,要保證一致性。這有兩個(gè)主要問題。首先,硬盤故障是我們?cè)诔志没瘮?shù)據(jù)系統(tǒng)的實(shí)際操作中最常見的問題,問題發(fā)生后,通常也不會(huì)完整地保留數(shù)據(jù)。其次,即使這不是一個(gè)問題,我們也不希望在每次寫入時(shí)都需要使用fsync,因?yàn)檫@樣會(huì)減少兩到三個(gè)數(shù)量級(jí)的性能。我們?cè)试S一個(gè)副本重新加入ISR的協(xié)議,這協(xié)議確保在重新加入之前,它必須完全重新同步,即使它在崩潰中丟失了未刷新的數(shù)據(jù)。
注意,Kafka對(duì)數(shù)據(jù)不丟失的保證是基于至少一個(gè)保持同步的副本。如果一個(gè)分區(qū)的副本都丟失了,則無法保證數(shù)據(jù)不丟失。
然而在實(shí)際情況下的系統(tǒng)當(dāng)所有副本掛之后必須做一些合理的事情。如果很不辛遇到這種情況,意識(shí)到后面會(huì)發(fā)生什么這是很重要??赡軙?huì)出現(xiàn)以下兩種情況:
1.等待ISR里的所有節(jié)點(diǎn)恢復(fù),并選擇出新的leader(希望這leader還保存著所有的數(shù)據(jù))。
2.選擇第一個(gè)副本(不需要是ISR里面的)恢復(fù),作為leader。
以下是可用性和持久化的權(quán)衡。一、如果我們等待所有ISR副本恢復(fù),則我們會(huì)等很長的時(shí)間。。二、如果副本的數(shù)據(jù)都丟了,則永遠(yuǎn)無法恢復(fù)。最后一個(gè)就是,如果一個(gè)沒有同步的副本恢復(fù),我們?cè)试S它為leader,則認(rèn)為它的日志是最新的,哪怕它沒有包含所有已提交的消息。在0.11.0.0版本里默認(rèn)的選擇第一個(gè)權(quán)衡,用等待來換取數(shù)據(jù)的一致性。這個(gè)是可以配置的,如果啟動(dòng)時(shí)間比一致性重要,則修改這個(gè) unclean.leader.election.enable。
這個(gè)困惑不僅僅kafka有。它存在與任何基于法定人數(shù)算法的場景。例如,在多數(shù)投票的場景,如果你是去大多數(shù)服務(wù)器,在剩余的服務(wù)器,你就必須在兩者選其中一個(gè),不是失去100%的數(shù)據(jù)就是丟失數(shù)據(jù)的一致性。
生產(chǎn)者生成消息時(shí),可以選擇0個(gè),1個(gè)或者全部副本確認(rèn)。注意這里的“全部副本確認(rèn)”不能保證所有被分配副本的結(jié)點(diǎn)都能收到消息。默認(rèn)的,當(dāng)acks=all時(shí),只要所有當(dāng)前所有ISR都收到消息,則可以確認(rèn)消息。例如,一個(gè)主題被設(shè)置為兩個(gè)副本和一個(gè)失?。ㄖ挥惺O乱粋€(gè)ISR),然后所有acks=all的寫入都會(huì)是成功的。如果剩余的副本也失敗,這樣消息就會(huì)被丟失。盡管這確保了分區(qū)的最大可用性,但是這種行為可能不適合某些喜歡持久化而不是可用性的用戶。因此,我們提供了兩種頂級(jí)的配置,可用于更傾向于消息持久化而不是可用性:
1.關(guān)閉不清晰的leader選舉——如果所有副本變得不可用,直到最近的leader變得可用,所有分區(qū)才可以變得可以用。這有效地避免了消息丟失的風(fēng)險(xiǎn)。請(qǐng)參閱上一節(jié)不清晰的Leader選舉。
2.指定最小的ISR數(shù)量——只有高過這最小數(shù)量,消息才會(huì)被確認(rèn),這是為了避免在寫入一個(gè)副本時(shí),而且副本掛了,導(dǎo)致消息丟失的風(fēng)險(xiǎn)。這個(gè)設(shè)置僅僅在生產(chǎn)者使用acks=all生效或保證消息在這數(shù)量以上的ISR確認(rèn)。這個(gè)設(shè)置提供了一致性和高可用的權(quán)衡。ISR最小數(shù)量設(shè)置高一點(diǎn),這樣更好的保證一致性。然而這樣會(huì)減少可用性,因?yàn)樵贗SR沒滿足這數(shù)量時(shí),分區(qū)是不可用的。
上訴討論副本,也僅僅是一份日志,也就是主題的一個(gè)分區(qū)。然而kafka是管理成千上萬的分區(qū)。我們?cè)噲D以循環(huán)(round-robin)方式在集群中平衡分區(qū),以避免在大數(shù)據(jù)量的主題的所有分區(qū)都在少量節(jié)點(diǎn)上。同樣地,我們?cè)噲D平衡leader,使每個(gè)節(jié)點(diǎn)都是其一定份額分區(qū)的leader。
對(duì)ledaer選舉過程進(jìn)行優(yōu)化也很重要,因?yàn)檫@是服務(wù)不可用的窗口期。一個(gè)簡單的leader選舉會(huì)在一個(gè)節(jié)點(diǎn)失敗后,在該節(jié)點(diǎn)內(nèi)所有分區(qū),每個(gè)分區(qū)都會(huì)舉行一次選舉。相反,我們選擇一個(gè)broker作為“controller”。這controller檢測broker層次的失敗,負(fù)責(zé)修改受故障影響的分區(qū)的leader。其結(jié)果是,我們能夠?qū)⒃S多需要的leadr變更批量處理,這使得選舉過程在大量的分區(qū)上變得更加便宜和快速。如果controller失敗了,其中一個(gè)存活的節(jié)點(diǎn)會(huì)變成新的controller。
日志壓縮保證kafka在每個(gè)分區(qū),對(duì)于每個(gè)key,至少保存其最近的一條消息。這解決了那些需要當(dāng)應(yīng)用或系統(tǒng)崩潰后,重啟時(shí)需重新加載數(shù)據(jù)的場景。
到目前位置,我們只討論了簡單的數(shù)據(jù)保存方法,那就是當(dāng)舊日志數(shù)據(jù)超過一定時(shí)間或達(dá)到一定大小的時(shí)候會(huì)被刪除。這個(gè)適用于每條相對(duì)獨(dú)立的消息,如臨時(shí)事件。然而,還有一類很重要的數(shù)據(jù),那就是根據(jù)key修改數(shù)據(jù),一種可變的數(shù)據(jù)(例如在數(shù)據(jù)庫表數(shù)據(jù)的變更那樣)。
我們討論一個(gè)具體的例子。一個(gè)主題包含了用戶emial信息,每次用戶更新他們的email信息,我們都會(huì)發(fā)送消息到topic,是根據(jù)他們的userid做主鍵。以下是我們發(fā)送的消息,userid是123,每條信息都對(duì)應(yīng)著一次的email信息修改(省略號(hào)是省略其他userid的消息)。
123 => bill@microsoft.com . . . 123 => bill@gatesfoundation.org . . . 123 => bill@gmail.com
日志壓縮給了我們更細(xì)顆粒度保留數(shù)據(jù)機(jī)制,這樣我們就可以保證只保留每一個(gè)key最后的一次變更(如123 => bill@gmail.com)。這樣我們保證了日志里都包含了所有key的最后一個(gè)值的快照。這就意味著下游的消費(fèi)者可以重建狀態(tài)而不需要保存所有的更變?nèi)罩尽?
讓我們一些日志壓縮有用的場景,然后我們?cè)诳纯词窃趺幢皇褂蒙稀?
1.數(shù)據(jù)庫變更訂閱(Database chagne subscription)。我們很常見到一份數(shù)據(jù)集會(huì)存在多種數(shù)據(jù)系統(tǒng)里,而且這系統(tǒng)里有一個(gè)類似數(shù)據(jù)庫那樣的(如RDBMS或新潮的key-value系統(tǒng))。舉個(gè)例子,你有一個(gè)數(shù)據(jù)庫、一個(gè)緩存、一個(gè)搜索集群和一個(gè)Hadoop集群。這樣每次數(shù)據(jù)庫的修改,都得映射到那緩存、那搜索集群和最后在Hadoop里。在這個(gè)場景里,你只是需要實(shí)時(shí)最新更新的日志。但如果需要重新加載進(jìn)緩存或恢復(fù)宕機(jī)的搜索節(jié)點(diǎn),就可能需要完整的數(shù)據(jù)集。
2.事件源(Event sourcing)。這是一種應(yīng)用設(shè)計(jì)風(fēng)格,它將查詢和應(yīng)用設(shè)計(jì)結(jié)合在一起,并使用日志作為程序的主要存儲(chǔ)。
3.高可用日志(Journaling for high-availability)。一個(gè)本地計(jì)算的進(jìn)程可以通過變更日志來做到容錯(cuò),這樣另一個(gè)進(jìn)程就能重新加載這些變更繼續(xù)處理。一個(gè)具體的例子就是流式查詢系統(tǒng),如計(jì)數(shù)、匯總和其他“分組”操作。實(shí)時(shí)流式處理框架Samza就是使用這功能達(dá)到目的的。
在上述場景中,主要處理實(shí)時(shí)的變更,偶爾需要重新加載或重新處理時(shí),能做的就只有重新加載所有數(shù)據(jù)。日志壓縮提供了這兩個(gè)功能,處理實(shí)時(shí)數(shù)據(jù)變更,和重新加載數(shù)據(jù)。這種使用日志的風(fēng)格,詳情可參看點(diǎn)擊。
這思路很簡單。如果我們保存無窮無盡的日志,保存上述場景中每個(gè)變更日志,而且還是一開始就獲取每個(gè)系統(tǒng)的狀態(tài)。使用這個(gè)完整的日志,我們就可以恢復(fù)到任何一個(gè)時(shí)間點(diǎn)的狀態(tài)。但這種完整日志的假設(shè)時(shí)不切實(shí)際的,因?yàn)閷?duì)于那些每一行記錄都在變更多次的系統(tǒng),即使數(shù)據(jù)很小,日志也會(huì)無限的增長下去。那我們就簡單的丟棄舊日志,雖然可以限制空間的增長,但也無法重建狀態(tài)——因?yàn)榕f日志被丟棄,可能一部分記錄的狀態(tài)無法重建。
相對(duì)于粗粒度的基于時(shí)間的數(shù)據(jù)保留策略,日志壓縮的策略是一種更細(xì)顆粒度,基于每一條記錄保存。這個(gè)想法是,有選擇性的刪除那些有多個(gè)變更記錄的同樣的key。這樣的日志就保證每個(gè)key都至少有一個(gè)最新的狀態(tài)。
數(shù)據(jù)保留策略可以為每個(gè)主題設(shè)置,所以一個(gè)集群里有些主題的保存策略可以設(shè)置為大小和時(shí)間來保存數(shù)據(jù),有主題也可以通過壓縮保留。
這個(gè)功能的靈感是來自于LinkedIn里最古老且最成功的基礎(chǔ)架構(gòu)——一個(gè)被稱為Databus的數(shù)據(jù)庫變更日志緩存系統(tǒng)。
跟大多數(shù)日志結(jié)構(gòu)存儲(chǔ)系統(tǒng)不一樣的時(shí),Kafka是為了訂閱而設(shè)計(jì)的,組織數(shù)據(jù)的形式也是為了更快的線性讀取和寫入。跟Databus不一樣之處是,kafka作為真實(shí)源(source-of-truth)存儲(chǔ),即使上游數(shù)據(jù)源不具備可重用性的情況下,它還是挺有用的。
不管是傳統(tǒng)的RDBMS還是分布式的NoSQL存儲(chǔ)在數(shù)據(jù)庫中的數(shù)據(jù)總是會(huì)更新的,相同key的新記錄更新數(shù)據(jù)的方式簡單來說有兩種:
1.直接更新(找到數(shù)據(jù)庫中的已有位置以最新的值替換舊的值)。
2.追加記錄(保留舊的值,查詢時(shí)再合并,或者也有一個(gè)后臺(tái)線程會(huì)定期合并)。
采用追加記錄的做法可以在節(jié)點(diǎn)崩潰時(shí)用于恢復(fù)數(shù)據(jù),還有一個(gè)好處是寫性能很高,因?yàn)槭蔷€性寫。
以下是各個(gè)數(shù)據(jù)系統(tǒng)的更新數(shù)據(jù)方式:
數(shù)據(jù)系統(tǒng) | 更新數(shù)據(jù)追加到哪里 | 數(shù)據(jù)文件 | 是否需要壓縮 |
---|---|---|---|
ZooKeeper | log | snapshot | 不要,因?yàn)閿?shù)據(jù)量不大 |
Redis | aof | rdb | 不需要,因?yàn)槭莾?nèi)存數(shù)據(jù)庫 |
Cassandra | commit log | data.db | 需要,數(shù)據(jù)存在本地文件 |
HBase | commit log | HFile | 需要,數(shù)據(jù)存在HDFS |
Kafka | commit log | commit log | 需要,數(shù)據(jù)存在分區(qū)中的Segment里 |
這里有個(gè)更高層次的圖,展示kafka日志的邏輯存儲(chǔ)結(jié)構(gòu),框框的每個(gè)數(shù)字都是一條消息的偏移量(offset):
日志的頭部(Log Head)就是傳統(tǒng)的kafka日志。日志的尾部(Log Tail)則是被壓縮過的日志。Log Head是很密集的,偏移量時(shí)連續(xù)的,保留了所有的消息。值得注意的是在Log Tail的消息雖然被壓縮,但依然保留它一開始被寫入時(shí)的偏移量,這個(gè)偏移量是永遠(yuǎn)不會(huì)被改變。而且這壓縮日志里的偏移量,在日志里依然時(shí)有效的。所以,時(shí)無法區(qū)分下一個(gè)更高的偏移量是什么,比如說,上面的例子,36、 37、 38都是屬于同一個(gè)位置。
以上說的都是數(shù)據(jù)更新時(shí)的日志壓縮,當(dāng)然日志壓縮也支持刪除。當(dāng)發(fā)送某個(gè)Key的最新版本的消息的內(nèi)容為null,這個(gè)Key將被刪除(某種程度上也算是更新,如上面的例子就是把email信息置為null)。這個(gè)消息也稱刪除標(biāo)志(delete marker),這個(gè)刪除標(biāo)志會(huì)把之前跟這key相同的消息刪掉。但這刪除標(biāo)志比較特殊,特殊之處是它是過一段時(shí)間才被刪除,從而騰出磁盤空間。而數(shù)據(jù)刪除的時(shí)間點(diǎn)會(huì)被標(biāo)志為“刪除保留點(diǎn)(delte retention point)”,也就是如上圖所示,這個(gè)圖展示也很特別,你看看兩個(gè)是point而不是pointer,也不是指向某個(gè)消息,而是消息與消息之間。說明它是個(gè)時(shí)間點(diǎn),而不是指向某個(gè)消息的指針pointer。
壓縮時(shí)通過后臺(tái)定期復(fù)制日志段(log segment)完成的。清除時(shí)并不會(huì)阻塞讀操作,而且還可以配置不超過一定的IO,從而避免影響消費(fèi)者和生產(chǎn)者。壓縮日志段的過程如下:
日志壓縮提供了什么保證?(What guarantees does log compaction provide?)日志壓縮保證:
1.任何消費(fèi)者只要是讀取日志的頭部的,都可以看到所有消息,頭部的消息不會(huì)被刪除。這些消息都是有連續(xù)的偏移量。Topic的min.compaction.lag.ms參數(shù)可用于保證在指定時(shí)間內(nèi)該消息的存在,而不會(huì)被壓縮。這提供了消息呆在頭部(未被壓縮)的時(shí)間的底線。
2.依然保持則消息的有序性。壓縮永遠(yuǎn)不會(huì)重新給消息排序,而僅僅是刪除其部分而已。
3.消息的偏移量永遠(yuǎn)不會(huì)改變。它永遠(yuǎn)標(biāo)志著消息所在的位置。
4.任何從日志最開始的地方開始處理都會(huì)至少看到每個(gè)key的最終狀態(tài)。另外,只要消費(fèi)者在delete.retention.ms(默認(rèn)是24小時(shí))這時(shí)間內(nèi)達(dá)到日志的頭部,則將會(huì)看到所有刪除記錄的刪除標(biāo)志。也就是說:由于刪除標(biāo)志的移除和讀取是同時(shí)發(fā)生,所以如果錯(cuò)過delete.retention.ms這時(shí)間,消費(fèi)者會(huì)錯(cuò)過刪除標(biāo)志。
日志壓縮通過日志清除器(log cleaner)執(zhí)行,后臺(tái)線程池復(fù)制日志段,移除那些存在于Log Head中的記錄。每個(gè)壓縮線程工作如下:
1.選擇Log Head中相對(duì)比Log Tail的比例高的日志。
2.創(chuàng)建Log Head中每個(gè)Key對(duì)應(yīng)的最后一個(gè)偏移量的日志摘要。
3.從頭到尾的開始復(fù)制,在復(fù)制過程中刪除相同key的日志。新的、干凈的日志段將立刻被交換(swap)到日志里,所以只需一個(gè)額外的日志段大小的硬盤空間就可以(不需要全部日志的空間)。
4.Log Head的日志摘要實(shí)際上是一個(gè)空間緊湊的哈希表。每個(gè)實(shí)體只需要24個(gè)字節(jié)空間。所以8G的cleaner空間,可以處理大概366G的Log Head(假設(shè)每個(gè)消息大小為1k)。
Kafka是默認(rèn)啟用日志清除器,是個(gè)線程池。如果要開啟指定主題的清理功能,你可以在日志里添加以下屬性:
log.cleanup.policy=compact
這個(gè)可以在創(chuàng)建主題時(shí)指定或修改主題時(shí)指定。
日志清除器可以設(shè)置多少消息在Log Head而不被刪除。這個(gè)啟用是通過設(shè)置壓縮時(shí)間段:
log.cleaner.min.compaction.lag.ms
如果不設(shè)置,則默認(rèn)是除了最后一個(gè)segment之外,其余日志段都會(huì)被壓縮,即最后一個(gè)日志段不會(huì)被壓縮。任何已激活的日志段都不會(huì)被壓縮,就算消息的時(shí)間已經(jīng)超過了上面配置的時(shí)間,這里的激活,是指有在消費(fèi)。
配額(Quotas)Kafka集群有能力強(qiáng)制性地要求控制broker中客戶端使用的資源。以下是兩類客戶的quotas:
1.網(wǎng)絡(luò)帶寬quotas,具體到字節(jié)(從0.9版本開始)。
2.請(qǐng)求速率quotas,具體到CPU的利用率(網(wǎng)絡(luò)和IO的比值)。
生產(chǎn)者和消費(fèi)者有可能生成/消費(fèi)大量的數(shù)據(jù)或請(qǐng)求速率非常高,以致于占滿了broker的資源,導(dǎo)致網(wǎng)絡(luò)飽和broker拒絕給其他客戶端服務(wù)。使用quotas就能避免這個(gè)問題,在多租戶集群上尤為重要,因?yàn)橐徊糠值唾|(zhì)量的客戶可能會(huì)降低高質(zhì)量客戶的用戶體驗(yàn)。實(shí)際上,可以對(duì)API進(jìn)行這樣的限制。
客戶組(Client groups)Kafka客戶標(biāo)識(shí)是用戶主體(user principal),用于代表用戶在這安全的集群上的權(quán)限。在無鑒權(quán)的時(shí)候,broker通過可配置的PrincipalBuilder來提供用戶主體,用來分組。由客戶端應(yīng)用選擇client-id作為客戶的邏輯分組。元組(user,client-id)則定義了一個(gè)安全邏輯組,共享user principal和chient-id。
quotas可以被應(yīng)用到元組(user,client-id),user或client-id組。對(duì)于一個(gè)連接,匹配上的quota將會(huì)應(yīng)用到此連接上。例如(user="test-user",client-id="test-client")擁有生產(chǎn)者quota是10MB/s,這個(gè)10MB的帶寬將會(huì)被user是“test-user”并且client-id是"test-client"的生產(chǎn)者進(jìn)行共用。
quota可以按(user,client-id)配置,也可以按user組配置,也可以按client-id組配置。默認(rèn)quota可以被任何級(jí)別的quota給覆蓋。這個(gè)機(jī)制類似于每個(gè)Topic可以覆蓋自己的。ZooKeeper的/config/users的quota可以覆蓋user和(user,client-id)的quota。/config/clients下的則可以覆蓋client-id的quota。這些ZooKeeper的覆蓋會(huì)即可在所以broker中生效,這樣我們就不需要修改配置時(shí)重啟服務(wù)器。詳情請(qǐng)點(diǎn)擊。
quota配置的優(yōu)先級(jí)如下:
1./config/users//clients/ 2./config/users/ /clients/ 3./config/users/ 4./config/users/ /clients/ 5./config/users/ /clients/ 6./config/users/ 7./config/clients/ 8./config/clients/
broker的(quota.producer.default, quota.consumer.default)屬性來給每個(gè)client-id組設(shè)置默認(rèn)的網(wǎng)絡(luò)帶寬。但后面的版本會(huì)刪除這些屬性。
client-id組的默認(rèn)quota可以在ZooKeeper中配置。
網(wǎng)絡(luò)帶寬quota,具體到字節(jié),而且是有組里的客戶一起共享。默認(rèn)的,每個(gè)獨(dú)立的客戶組都有一個(gè)固定的網(wǎng)絡(luò)帶寬的quota。這quota配置在每個(gè)broker。
請(qǐng)求速率配額(Request Rate Quotas)請(qǐng)求速率quota,具體到時(shí)間的百分比,時(shí)間是在quota窗口里每個(gè)broker的處理請(qǐng)求的IO線程和網(wǎng)絡(luò)線程。 n%的quota代表一個(gè)線程的n%,所以quota總數(shù)是((num.io.threads+num.network.threads)×100)%。每個(gè)客戶組在一個(gè)quota窗口中最多使用n%的IO線程和網(wǎng)絡(luò)線程。由于分配給IO和網(wǎng)絡(luò)的線程數(shù)是根據(jù)broker主機(jī)的cpu個(gè)數(shù),則每個(gè)請(qǐng)求速率quota代表著CPU的百分比。
實(shí)施(Enforcement)默認(rèn)情況下,每個(gè)唯一的客戶組都會(huì)有一個(gè)集群配置好的固定的quota。這個(gè)quota是定義在每個(gè)broker上。我們決定由每個(gè)broer定義這些quota,而不是由集群為每個(gè)client統(tǒng)一設(shè)置一個(gè)quota的原因,是因?yàn)闉榱朔奖愎蚕韖uota的設(shè)置。
如果Broker檢測到超過quota了,會(huì)怎么處理?在我們的解決方案中,我們是選擇降低速率,而不是直接返回錯(cuò)誤。broker會(huì)去計(jì)算處理這問題的延遲時(shí)間,這段時(shí)間則不會(huì)立刻響應(yīng)客戶端。這種超過quota的處理,對(duì)于客戶端來說是透明的??蛻舳瞬恍枰鲱~外的操作。實(shí)際上,客戶端額外的動(dòng)作,如果操作不好,還會(huì)加劇超過quota的問題。
字節(jié)率和線程利用率都會(huì)在多個(gè)小窗口中監(jiān)測(一秒鐘有30個(gè)窗口),以便快速準(zhǔn)確的糾正quota違規(guī)行為。
客戶端字節(jié)率在多個(gè)小窗口(例如每個(gè)1秒的30個(gè)窗口)上進(jìn)行測量,以便快速檢測和糾正配額違規(guī)。 通常,大的測量窗口(例如,每30秒10個(gè)窗口)會(huì)導(dǎo)致大量的流量,然后是長時(shí)間的延遲,這對(duì)用戶體驗(yàn)方面并不好。
參考和翻譯:
RabbitMQ官方 https://www.rabbitmq.com
Kafka官方 http://kafka.apache.org/docum...
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://specialneedsforspecialkids.com/yun/69265.html
閱讀 2733·2023-04-25 14:15
閱讀 2685·2021-11-04 16:11
閱讀 3385·2021-10-14 09:42
閱讀 434·2019-08-30 15:52
閱讀 2819·2019-08-30 14:03
閱讀 3536·2019-08-30 13:00
閱讀 2105·2019-08-26 11:40
閱讀 3301·2019-08-26 10:25