国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專欄INFORMATION COLUMN

開源一個kafka增強:okmq-1.0.0

PAMPANG / 1317人閱讀

摘要:只有兩個基礎組件同時死亡,才會受到嚴重影響。的意外死亡,造成生產端發送失敗。后臺會有一個線程進行這些失敗消息的遍歷和重新投遞。二阻塞業務正常進行。死亡,或者多帶帶死亡,消息最終都會被發出,僅當與同時死亡,消息才會發送失敗,并記錄在日志文件里。

本工具的核心思想就是:賭。只有兩個基礎組件同時死亡,才會受到嚴重影響。哦,斷電除外。

mq是個好東西,我們都在用。這也決定了mq應該是高高高可用的。某團就因為這個組件,出了好幾次生產事故,呵呵。

大部分業務系統,要求的消息語義都是at least once,即都會有重復消息,但保證不會丟。即使這樣,依然有很多問題:

一、mq可用性無法保證。 mq的意外死亡,造成生產端發送失敗。很多消息要通過扒取日志進行回放,成本高耗時長。

二、mq阻塞業務正常進行。 mq卡頓或者網絡問題,會造成業務線程卡在mq的發送方法上,正常業務進行不下去,造成災難性的后果。

三、消息延遲。 mq死了就用不著說了,消息還沒投胎就已死亡。消息延遲主要是客戶端消費能力不強,或者是消費通道單一造成的。

使用組合存儲來保證消息的可靠投遞,就是okmq

注意:okmq注重的是可靠性。對于順序性、事務等其他要素,不予考慮。當然,速度是必須的。
設計想法

我即使用兩套redis來模擬一些mq操作,都會比現有的一些解決方案要強。但這肯定不是我們需要的,因為redis的堆積能力太有限,內存占用率直線上升的感覺并不太好。

但我們可以用redis來作為額外的發送確認機制。這個想法,在《使用多線程增加kafka消費能力》一文中曾經提到過,現在到了實現的時候了。

首先看下使用Api
OkmqKafkaProducer producer = new ProducerBuilder()
.defaultSerializer()
.eanbleHa("redis")
.any("okmq.redis.mode", "single")
.any("okmq.redis.endpoint", "127.0.0.1:6379")
.any("okmq.redis.poolConfig.maxTotal", 100)
.servers("localhost:9092")
.clientID("okMQProducerTest")
.build();

Packet packet = new Packet();
packet.setTopic("okmq-test-topic");
packet.setContent("i will send you a msg");
producer.sendAsync(packet, null);
producer.shutdown();
以redis為例


我們按照數字標號來介紹:

1、 在消息發送到kafka之前,首先入庫redis。由于后續回調需要用到一個唯一表示,我們在packet包里添加了一個uuid。

2、 調用底層的api,進行真正的消息投遞。

3、 通過監聽kafka的回調,刪除redis中對應的key。在這里可以得到某條消息確切的的ack時間。那么長時間沒有刪除的,就算是投遞失敗的消息。

4、 后臺會有一個線程進行這些失敗消息的遍歷和重新投遞。我們叫做recovery。最復雜的也就是這一部分。對于redis來說,會首先爭搶一個持續5min的鎖,然后遍歷相關hashkey。

所以,對于以上代碼,redis發出以下命令:

1559206423.395597 [0 127.0.0.1:62858] "HEXISTS" "okmq:indexhash" "okmq:5197354"
1559206423.396670 [0 127.0.0.1:62858] "HSET" "okmq:indexhash" "okmq:5197354" ""
1559206423.397300 [0 127.0.0.1:62858] "HSET" "okmq:5197354" "okmq::2b9b33fd-95fd-4cd6-8815-4c572f13f76e" "{"content":"i will send you a msg104736623015238","topic":"okmq-test-topic","identify":"2b9b33fd-95fd-4cd6-8815-4c572f13f76e","timestamp":1559206423318}"
1559206423.676212 [0 127.0.0.1:62858] "HDEL" "okmq:5197354" "okmq::2b9b33fd-95fd-4cd6-8815-4c572f13f76e"
1559206428.327788 [0 127.0.0.1:62861] "SET" "okmq:recovery:lock" "01fb85a9-0670-40c3-8386-b2b7178d4faf" "px" "300000"
1559206428.337930 [0 127.0.0.1:62858] "HGETALL" "okmq:indexhash"
1559206428.341365 [0 127.0.0.1:62858] "HSCAN" "okmq:5197354" "0"
1559206428.342446 [0 127.0.0.1:62858] "HDEL" "okmq:indexhash" "okmq:5197354"
1559206428.342788 [0 127.0.0.1:62861] "GET" "okmq:recovery:lock"
1559206428.343119 [0 127.0.0.1:62861] "DEL" "okmq:recovery:lock"
以上問題解答 所以對于以上的三個問題,回答如下:

一、mq可用性無法保證。

為什么要要通過事后進行恢復呢?我把recovery機制帶著不是更好么?通過對未收到ack的消息進行遍歷,可以把這個過程做成自動化。

二、mq阻塞業務正常進行。

通過設置kafka的MAX_BLOCK_MS_CONFIG
參數,其實是可以不阻塞業務的,但會丟失消息。我可以使用其他存儲來保證這些丟失的消息重新發送。

三、消息延遲。

mq死掉了,依然有其他備用通道進行正常服務。也有的團隊采用雙寫mq雙消費的方式來保證這個過程,也是被逼急了:)。如果kafka死掉了,業務會切換到備用通道進行消費。

擴展自己的HA

如果你不想用redis,比如你先要用hbase,那也是很簡單的。
但需要實現一個HA接口。

public interface HA {
    void close();

    void configure(Properties properties);

    void preSend(Packet packet) throws HaException;

    void postSend(Packet packet) throws HaException;

    void doRecovery(AbstractProducer producer) throws HaException;
}

使用之前,還需要注冊一下你的插件。

AbstractProducer.register("log", "com.sayhiai.arch.okmq.api.producer.ha.Ha2SimpleLog");
重要參數
okmq.ha.recoveryPeriod 恢復線程檢測周期,默認5秒

okmq.redis.mode redis的集群模式,可選:single、sentinel、cluster
okmq.redis.endpoint 地址,多個地址以,分隔
okmq.redis.connectionTimeout 連接超時
okmq.redis.soTimeout socket超時
okmq.redis.lockPx 分布式鎖的持有時間,可默認,5min
okmq.redis.splitMillis 間隔時間,redis換一個key進行運算,默認5min
okmq.redis.poolConfig.* 兼容jedis的所有參數
1.0.0 版本功能

1、進行了生產端的高可用抽象,實現了kafka的樣例。

2、增加了SimpleLog的ping、pong日志實現。

3、增加了Redis的生產端備用通道。包含single、cluster、sentinel三種模式。

4、可以自定義其他備用通道。

5、兼容kakfa的所有參數設置。

規劃 2.0.0

1、實現ActiveMQ的集成。

2、實現消費者的備用通道集成。

3、增加嵌入式kv存儲的生產者集成。

4、更精細的控制系統的行為。

5、加入開關和預熱,避免新啟動mq即被壓垮。

6、redis分片機制,大型系統專用。

3.0.0

1、監控功能添加。

2、rest接口添加。

使用限制

當你把參數ha設置為true,表明你已經收到以下的使用限制。反之,系統反應于原生無異。

使用限制:
本工具僅適用于非順序性、非事務性的普通消息投遞,且客戶端已經做了冪等。一些訂單系統、消息通知等業務,非常適合。如果你需要其他特性,請跳出此頁面。

kafka死亡,或者redis多帶帶死亡,消息最終都會被發出,僅當kafka與redis同時死亡,消息才會發送失敗,并記錄在日志文件里。

正常情況下,redis的使用容量極少極少。異常情況下,redis的容量有限,會迅速占滿。redis的剩余時間就是你的StopWatch,你必須在這個時間內恢復你的消息系統,一定要頂住哇。

End

系統目前處于1.0.0版本,正在線上小規模試用。工具小眾,但適用于大部分應用場景。如果你正在尋求這樣的解決方案,歡迎一塊完善代碼。

github地址:

https://github.com/sayhiai/okmq

也歡迎關注《小姐姐味道》微信公眾號,進行交流。

文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。

轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/74816.html

相關文章

  • Confluent的25億美元估值可能在開源動蕩中提供肯定。

    摘要:的億美元估值可能在開源中提供肯定,是一家基于的開源軟件提供商,在系列融資輪中籌集了億美元,估值為億美元。在月最新的期間,在公共預覽中為管理流媒體。在兩周后通過宣布其平臺組件的許可證更改做出了響應。該出版物推測如下,與超分頻器持有所有的卡。Confluent的25億美元估值可能在開源TurbulencetweetConfluent中提供肯定,Confluent是一家基于Apache Kafka...

    enda 評論0 收藏0

發表評論

0條評論

PAMPANG

|高級講師

TA的文章

閱讀更多
最新活動
閱讀需要支付1元查看
<