摘要:只有兩個基礎組件同時死亡,才會受到嚴重影響。的意外死亡,造成生產端發送失敗。后臺會有一個線程進行這些失敗消息的遍歷和重新投遞。二阻塞業務正常進行。死亡,或者多帶帶死亡,消息最終都會被發出,僅當與同時死亡,消息才會發送失敗,并記錄在日志文件里。
本工具的核心思想就是:賭。只有兩個基礎組件同時死亡,才會受到嚴重影響。哦,斷電除外。
mq是個好東西,我們都在用。這也決定了mq應該是高高高可用的。某團就因為這個組件,出了好幾次生產事故,呵呵。
大部分業務系統,要求的消息語義都是at least once,即都會有重復消息,但保證不會丟。即使這樣,依然有很多問題:
一、mq可用性無法保證。 mq的意外死亡,造成生產端發送失敗。很多消息要通過扒取日志進行回放,成本高耗時長。
二、mq阻塞業務正常進行。 mq卡頓或者網絡問題,會造成業務線程卡在mq的發送方法上,正常業務進行不下去,造成災難性的后果。
三、消息延遲。 mq死了就用不著說了,消息還沒投胎就已死亡。消息延遲主要是客戶端消費能力不強,或者是消費通道單一造成的。
使用組合存儲來保證消息的可靠投遞,就是okmq。
注意:okmq注重的是可靠性。對于順序性、事務等其他要素,不予考慮。當然,速度是必須的。設計想法
我即使用兩套redis來模擬一些mq操作,都會比現有的一些解決方案要強。但這肯定不是我們需要的,因為redis的堆積能力太有限,內存占用率直線上升的感覺并不太好。
但我們可以用redis來作為額外的發送確認機制。這個想法,在《使用多線程增加kafka消費能力》一文中曾經提到過,現在到了實現的時候了。
首先看下使用ApiOkmqKafkaProducer producer = new ProducerBuilder() .defaultSerializer() .eanbleHa("redis") .any("okmq.redis.mode", "single") .any("okmq.redis.endpoint", "127.0.0.1:6379") .any("okmq.redis.poolConfig.maxTotal", 100) .servers("localhost:9092") .clientID("okMQProducerTest") .build(); Packet packet = new Packet(); packet.setTopic("okmq-test-topic"); packet.setContent("i will send you a msg"); producer.sendAsync(packet, null); producer.shutdown();以redis為例
我們按照數字標號來介紹:
1、 在消息發送到kafka之前,首先入庫redis。由于后續回調需要用到一個唯一表示,我們在packet包里添加了一個uuid。
2、 調用底層的api,進行真正的消息投遞。
3、 通過監聽kafka的回調,刪除redis中對應的key。在這里可以得到某條消息確切的的ack時間。那么長時間沒有刪除的,就算是投遞失敗的消息。
4、 后臺會有一個線程進行這些失敗消息的遍歷和重新投遞。我們叫做recovery。最復雜的也就是這一部分。對于redis來說,會首先爭搶一個持續5min的鎖,然后遍歷相關hashkey。
所以,對于以上代碼,redis發出以下命令:
1559206423.395597 [0 127.0.0.1:62858] "HEXISTS" "okmq:indexhash" "okmq:5197354" 1559206423.396670 [0 127.0.0.1:62858] "HSET" "okmq:indexhash" "okmq:5197354" "" 1559206423.397300 [0 127.0.0.1:62858] "HSET" "okmq:5197354" "okmq::2b9b33fd-95fd-4cd6-8815-4c572f13f76e" "{"content":"i will send you a msg104736623015238","topic":"okmq-test-topic","identify":"2b9b33fd-95fd-4cd6-8815-4c572f13f76e","timestamp":1559206423318}" 1559206423.676212 [0 127.0.0.1:62858] "HDEL" "okmq:5197354" "okmq::2b9b33fd-95fd-4cd6-8815-4c572f13f76e" 1559206428.327788 [0 127.0.0.1:62861] "SET" "okmq:recovery:lock" "01fb85a9-0670-40c3-8386-b2b7178d4faf" "px" "300000" 1559206428.337930 [0 127.0.0.1:62858] "HGETALL" "okmq:indexhash" 1559206428.341365 [0 127.0.0.1:62858] "HSCAN" "okmq:5197354" "0" 1559206428.342446 [0 127.0.0.1:62858] "HDEL" "okmq:indexhash" "okmq:5197354" 1559206428.342788 [0 127.0.0.1:62861] "GET" "okmq:recovery:lock" 1559206428.343119 [0 127.0.0.1:62861] "DEL" "okmq:recovery:lock"以上問題解答 所以對于以上的三個問題,回答如下:
一、mq可用性無法保證。
為什么要要通過事后進行恢復呢?我把recovery機制帶著不是更好么?通過對未收到ack的消息進行遍歷,可以把這個過程做成自動化。
二、mq阻塞業務正常進行。
通過設置kafka的MAX_BLOCK_MS_CONFIG
參數,其實是可以不阻塞業務的,但會丟失消息。我可以使用其他存儲來保證這些丟失的消息重新發送。
三、消息延遲。
mq死掉了,依然有其他備用通道進行正常服務。也有的團隊采用雙寫mq雙消費的方式來保證這個過程,也是被逼急了:)。如果kafka死掉了,業務會切換到備用通道進行消費。
擴展自己的HA如果你不想用redis,比如你先要用hbase,那也是很簡單的。
但需要實現一個HA接口。
public interface HA { void close(); void configure(Properties properties); void preSend(Packet packet) throws HaException; void postSend(Packet packet) throws HaException; void doRecovery(AbstractProducer producer) throws HaException; }
使用之前,還需要注冊一下你的插件。
AbstractProducer.register("log", "com.sayhiai.arch.okmq.api.producer.ha.Ha2SimpleLog");重要參數
okmq.ha.recoveryPeriod 恢復線程檢測周期,默認5秒 okmq.redis.mode redis的集群模式,可選:single、sentinel、cluster okmq.redis.endpoint 地址,多個地址以,分隔 okmq.redis.connectionTimeout 連接超時 okmq.redis.soTimeout socket超時 okmq.redis.lockPx 分布式鎖的持有時間,可默認,5min okmq.redis.splitMillis 間隔時間,redis換一個key進行運算,默認5min okmq.redis.poolConfig.* 兼容jedis的所有參數1.0.0 版本功能
1、進行了生產端的高可用抽象,實現了kafka的樣例。
2、增加了SimpleLog的ping、pong日志實現。
3、增加了Redis的生產端備用通道。包含single、cluster、sentinel三種模式。
4、可以自定義其他備用通道。
5、兼容kakfa的所有參數設置。
規劃 2.0.01、實現ActiveMQ的集成。
2、實現消費者的備用通道集成。
3、增加嵌入式kv存儲的生產者集成。
4、更精細的控制系統的行為。
5、加入開關和預熱,避免新啟動mq即被壓垮。
6、redis分片機制,大型系統專用。
3.0.01、監控功能添加。
2、rest接口添加。
使用限制當你把參數ha設置為true,表明你已經收到以下的使用限制。反之,系統反應于原生無異。
使用限制:
本工具僅適用于非順序性、非事務性的普通消息投遞,且客戶端已經做了冪等。一些訂單系統、消息通知等業務,非常適合。如果你需要其他特性,請跳出此頁面。
kafka死亡,或者redis多帶帶死亡,消息最終都會被發出,僅當kafka與redis同時死亡,消息才會發送失敗,并記錄在日志文件里。
正常情況下,redis的使用容量極少極少。異常情況下,redis的容量有限,會迅速占滿。redis的剩余時間就是你的StopWatch,你必須在這個時間內恢復你的消息系統,一定要頂住哇。
End系統目前處于1.0.0版本,正在線上小規模試用。工具小眾,但適用于大部分應用場景。如果你正在尋求這樣的解決方案,歡迎一塊完善代碼。
github地址:
https://github.com/sayhiai/okmq
也歡迎關注《小姐姐味道》微信公眾號,進行交流。
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/74816.html
摘要:的億美元估值可能在開源中提供肯定,是一家基于的開源軟件提供商,在系列融資輪中籌集了億美元,估值為億美元。在月最新的期間,在公共預覽中為管理流媒體。在兩周后通過宣布其平臺組件的許可證更改做出了響應。該出版物推測如下,與超分頻器持有所有的卡。Confluent的25億美元估值可能在開源TurbulencetweetConfluent中提供肯定,Confluent是一家基于Apache Kafka...
閱讀 2380·2019-08-30 15:56
閱讀 1038·2019-08-30 15:55
閱讀 3202·2019-08-30 15:44
閱讀 933·2019-08-30 10:53
閱讀 1887·2019-08-29 16:33
閱讀 2468·2019-08-29 16:13
閱讀 719·2019-08-29 12:41
閱讀 874·2019-08-26 13:56