摘要:通過增加分區數量,能夠通過部署多個消費者增加并行消費能力。然后使用了飽和策略,使得多線程處理不過來的時候,能夠阻塞在的消費線程上。多線程是為了增加效率,等是為了增加可靠性。
前提:本例適合那些沒有順序要求的消息主題。
kafka通過一系列優化,寫入和讀取速度能夠達到數萬條/秒。通過增加分區數量,能夠通過部署多個消費者增加并行消費能力。但還是有很多情況下,某些業務的執行速度實在是太慢,這個時候我們就要用到多線程去消費,提高應用機器的利用率,而不是一味的給kafka增加壓力。
使用Spring創建一個kafka消費者是非常簡單的。我們選擇的方式是繼承kafka的ShutdownableThread,然后實現它的doWork方法即可。
參考:https://github.com/apache/kaf...
多線程消費某個分區的數據即然是使用多線程,我們就需要新建一個線程池。
我們創建了一個最大容量為20的線程池,其中有兩個參數需要注意一下。(參考《JAVA多線程使用場景和注意事項簡版》)。
我們使用了了零容量的SynchronousQueue,一進一出,避免隊列里緩沖數據,這樣在系統異常關閉時,就能排除因為阻塞隊列丟消息的可能。
然后使用了CallerRunsPolicy飽和策略,使得多線程處理不過來的時候,能夠阻塞在kafka的消費線程上。
然后,我們將真正處理業務的邏輯放在任務中多線程執行,每次執行完畢,我們都手工的commit一次ack,表明這條消息我已經處理了。由于是線程池認領了這些任務,順序性是無法保證的,可能有些任務沒有執行完畢,后面的任務就已經把它的offset給提交了。o.O
不過這暫時不重要,首先讓它并行化運行就好。
可惜的是,當我們運行程序,直接拋出了異常,無法進行下去。
程序直接說了:
KafkaConsumer is not safe for multi-threaded access
顯然,kafka的消費端不是線程安全的,它拒絕你這么調用它的api。kafka的初衷是好的,想要避免一些并發環境的問題,但我確實需要使用多線程處理。
kafka消費者通過比較調用者的線程id來判斷是否是由外部線程發起請求。
long threadId = Thread.currentThread().getId(); if (threadId != currentThread.get() && !currentThread.compareAndSet(NO_CURRENT_THREAD, threadId)) throw new ConcurrentModificationException("KafkaConsumer is not safe for multi-threaded access"); refcount.incrementAndGet();
}
得,只能將commitSync函數放在線程外面了,先提交ack、再執行任務。
加入管道我們獲取的消息,可能在真正被執行之前,會進行一些過濾,比如一些空值或者特定條件的判斷。雖然可以直接放在消費者線程里運行,但顯的特別的亂,可以加入一個生產者消費者模型(你可以認為這是畫蛇添足)。這里采用的是阻塞隊列依然是SynchronousQueue,它充當了管道的功能。
我們把任務放入管道后,立馬commit。如果線程池已經滿了,將一直阻塞在消費者線程里,直到有空缺。然后,我們多帶帶啟動了一個線程,用來接收這些數據,然后提交到這部分的代碼看起來大概這樣。
應用能夠啟動了,消費速度賊快。
參數配置kafka的參數非常的多,我們比較關心的有以下幾個參數。
max.poll.records調用一次poll,返回的最大條數。這個值設置的大,那么處理的就慢,很容易超出max.poll.interval.ms的值(默認5分鐘),造成消費者的離線。在耗時非常大的消費中,是需要特別注意的。
enable.auto.commit是否開啟自動提交(offset)如果開啟,consumer已經消費的offset信息將會間歇性的提交到kafka中(持久保存)
當開啟offset自動提交時,提交請求的時間頻率由參數`
auto.commit.interval.ms`控制。
如果broker端反饋的數據量不足時(fetch.min.bytes),fetch請求等待的最長時間。如果數據量滿足需要,則立即返回。
session.timeout.msconsumer會話超時時長,如果在此時間內,server尚未接收到consumer任何請求(包括心跳檢測),那么server將會判定此consumer離線。此值越大,server等待consumer失效、rebalance時間就越長。
heartbeat.interval.msconsumer協調器與kafka集群之間,心跳檢測的時間間隔。kafka集群通過心跳判斷consumer會話的活性,以判斷consumer是否在線,如果離線則會把此consumer注冊的partition分配(assign)給相同group的其他consumer。此值必須小于“session.timeout.ms”,即會話過期時間應該比心跳檢測間隔要大,通常為session.timeout.ms的三分之一,否則心跳檢測就失去意義。
在本例中,我們的參數簡單的設置如下,主要調整了每次獲取的條數和檢測時間。其他的都是默認。
仔細的同學可能會看到,我們的代碼依然不是完全安全的。這是由于我們提前提交了ack導致的。程序正常運行下,這無傷大雅。但在應用異常關閉的時候,那些正在執行中的消息,很可能會丟失,對于一致性要求非常高的應用,我們要從兩個手段上進行保證。
使用關閉鉤子第一種就是考慮kill -15的情況。這種方式比較簡單,只要覆蓋ShutdownableThread的shutdown方法即可,應用將有機會執行線程池中的任務,確保消費完畢再關閉應用。
@Override public void shutdown() { super.shutdown(); executor.shutdown(); }使用日志處理
應用oom,或者直接kill -9了,事情就變得麻煩起來。
維護一個多帶帶的日志文件(或者本地db),在commit之前寫入一條日志,然后在真正執行完畢之后寫入一條對應的日志。當系統啟動時,讀取這些日志文件,獲取沒有執行成功的任務,重新執行。
想要效率,還想要可靠,是得下點苦力氣的。
借助redis處理這種方式與日志方式類似,但由于redis的效率很高(可達數萬),而且方便,是優于日志方式的。
可以使用Hash結構,提交任務的同時寫入Redis,任務執行完畢刪掉這個值,那么剩下的就是出現問題的消息。
在系統啟動時,首先檢測一下redis中是否有異常數據。如果有,首先處理這些數據,然后正常消費。
多線程是為了增加效率,redis等是為了增加可靠性。業務代碼是非常好編寫的,搞懂了邏輯就搞定了大部分;業務代碼有時候又是困難的,你要編寫大量輔助功能增加它的效率、照顧它的邊界。
以程序員的角度來說,最有競爭力的代碼都是為了照顧小概率發生的邊界異常。
kafka在吞吐量和可靠性方面,有各種的權衡,很多都是魚和熊掌的關系。不必糾結于它本身,我們可以借助外部的工具,獲取更大的收益。在這種情況下,redis當機與應用同時當機的概率還是比較小的。5個9的消息保證是可以做到的,剩下的那點不完美問題消息,你為什么不從日志里找呢?
擴展閱讀:
1、JAVA多線程使用場景和注意事項簡版
2、Kafka基礎知識索引
3、360度測試:KAFKA會丟數據么?其高可用是否滿足需求?
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/73894.html
摘要:只有兩個基礎組件同時死亡,才會受到嚴重影響。的意外死亡,造成生產端發送失敗。后臺會有一個線程進行這些失敗消息的遍歷和重新投遞。二阻塞業務正常進行。死亡,或者單獨死亡,消息最終都會被發出,僅當與同時死亡,消息才會發送失敗,并記錄在日志文件里。 本工具的核心思想就是:賭。只有兩個基礎組件同時死亡,才會受到嚴重影響。哦,斷電除外。 mq是個好東西,我們都在用。這也決定了mq應該是高高高可用的...
摘要:相關概念協議高級消息隊列協議是一個標準開放的應用層的消息中間件協議。可以用命令與不同,不是線程安全的。手動提交執行相關邏輯提交注意點將寫成單例模式,有助于減少端占用的資源。自身是線程安全的類,只要封裝得當就能最恰當的發揮好的作用。 本文使用的Kafka版本0.11 先思考些問題: 我想分析一下用戶行為(pageviews),以便我能設計出更好的廣告位 我想對用戶的搜索關鍵詞進行統計,...
閱讀 3525·2023-04-26 00:16
閱讀 1361·2021-11-25 09:43
閱讀 3824·2021-11-23 09:51
閱讀 2964·2021-09-24 09:55
閱讀 713·2021-09-22 15:45
閱讀 1387·2021-07-30 15:30
閱讀 3064·2019-08-30 14:04
閱讀 2237·2019-08-26 13:46