国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專欄INFORMATION COLUMN

Kafka學習筆記

aikin / 2218人閱讀

摘要:學習筆記使用一個叫的文學家的名字用來命名的。引入,正式升級為分布式流處理平臺。主要還是針對組成員數量減少的情況。當所有成員都退出組后,消費者組狀態變更為。自動定期刪除過期位移的條件就是,組要處于狀態。減少下游系統一次性消費的消息總數。

Kafka 學習筆記

Kafka使用一個叫Franz Kafka的文學家的名字用來命名的。

Kafka是一款開源的消息引擎系統。也是一個分布式流處理平臺。

Kafka同時支持點對點模型以及發布/訂閱模型。

為什么要使用Kakfa?四個字:削峰填谷!

Kafka 術語

Record:消息,指Kafka處理對象

Topic:主題,用來承載消息的容器

Partition:分區,一個有序不變的消息隊列,一個主題下可以有多個分區

Offset:消息位移,表示分區中每條信息的位置,是一個單調遞增不變的值

Replica,副本,數據冗余。

領導者副本:對外提供服務,與客戶端進行交互

追隨者副本:不能與外界進行交互,只是被動地追隨領導者副本

Producer:生產者,向主題發布新消息的應用程序

Consumer:消費者,向主題訂閱新消息的應用程序

Consumer Offset:消費者位移,表示消費者消費進度

Consumer Group:消費者組,多個消費者實例共同組成的一個組,同時消費多個分區來實現高吞吐。

Rebalance:重平衡,消費者組內某個消費者實例掛掉后,其他消費者實例自動重新分配訂閱主題分區的過程。它是Kafka消費者端實現高可用的重要手段。

Kafka 種類

Apache Kafka: 也稱社區版Kafka,迭代速度快,社區響應度高,使用它可以讓你有更高的把控度;缺陷在于僅僅提供基礎核心組件,缺失一些高級特性

Confluent Kafka: 優勢在于集成了很多高級特性且由Kafka原班人馬打造,質量保證;缺陷在于國內相關資料不全,普及率較低,沒有太多可參考的范例。

CDH/HDP Kafka: 優勢在于操作簡單,節省運維成本;缺陷在于把控度低,演進速度慢

Kafka 版本號 一個題外話

Kafka新版本客戶端代碼開始完全由java語言編寫,于是有些人開始“JAVA VS SCALA”的大討論。并從語言特性上分析為什么社區擯棄Scala轉而投向Java的懷抱。

其實事情沒有那么復雜,僅僅是因為社區來了一批Java程序猿,而以前老的scala程序猿隱退了罷了。

版本演進

Kafka總共演進了7個大版本

0.7版本: 上古版本,一旦有人向你推薦這個版本,懟他。

0.8版本: 開始引入副本機制,另外老版本需要制定zookeeper地址而不是Broker地址。在0.8.2.0版本社區引入了新版本Producer API,即指定Broker地址的Producer。

0.9版本: 重量級的大版本更迭。增加了基礎的安全認證/權限功能,引入了Kafka Connect,新版本Producer API穩定。

0.10.0.0: 里程碑的大版本。該版本又有兩個小版本,0.10.1和0.10.2。引入Kafka streams,正式升級為分布式流處理平臺。0.10.2.2 新Consumer API穩定。

0.11.0.0: 目前最主流的版本之一。引入兩個重量級功能變更:一個是提供冪等性Producer API以及事務 API, 另一個是對Kafka消息格式做了重構。

1.0和2.0: 如果你是Kafka Stream用戶,至少選擇2.0.0版本吧。

最后還有個建議,不論你使用的是哪個版本,都請盡量保持服務端版本和客戶端版本一致,否則你將損失很多Kafka為你提供的性能優化收益。

江湖經驗:不要輕易成為新版本的小白鼠。

集群部署

磁盤容量舉例:

假設公司有個業務需要每天向Kafka集群發送 1 億條信息。每條消息保存兩份來防止數據丟失。消息默認保存兩周時間。并假設消息的平均大小是1KB。問你的Kafka集群需要為這個業務預留多少磁盤空間?

總大小:1億 1KB 2備份 * 14 ~= 2800G
加上Kafka的一些索引數據,為它預留10%,那么總大小變為 2800 * (1 + 10%) ~= 3TB

Kafka支持數據壓縮,壓縮比0.75的話,那么應該預留的存儲空間為2.25TB左右。

帶寬舉例

與其說是帶寬資源的規劃,其實真正要規劃的是Kafka服務器的數量。

假設公司機房環境1Gbps,現有個業務,需要在1小時內處理1TB的業務數據。

一般單臺服務器 規劃使用70%的帶寬資源的1/3 ~= 240Mbps。

1TB需要1小時處理,則每秒差不多需要處理2336Mbps的數據,除 240Mbps,則差不多需要10臺機器。如果消息還需要額外復制的話,那么還要對應乘上備份數。

集群配置參數
配置名稱 示例 建議值
log.dirs /home/kafka1,/home/kafka2 kafka寫日志多路徑,不僅能提升寫性能,在1.1版本中還能支持故障轉移功能。
zookeeper.connect zk1:2181,zk2:2181,zk3:2181/kafka1
listens listeners=PLAINTEXT://dn1.ambari:6667
auto.create.topics.enable true false,不建議可以自動創建主題
unclean.leader.election.enable false false,如果設置為true有丟數據風險
auto.leader.rebalance.enable false false,不定期進行leader副本的選舉
log.retention.hours 168 默認保持7天數據
log.retention.bytes -1 保存多少數據都可以
message.max.bytes 1000000 默認值建議調大。該值代表Broker能處理的最大消息大小
生產者分區策略 輪詢策略

隨機策略

按消息保存鍵策略

自定義策略 生產者壓縮

壓縮配置

compression.type

壓縮算法

總結一下壓縮和解壓縮,Producer端壓縮,Broker端保持,Consumer端解壓縮。

無消息丟失最佳實踐

不要使用producer.send(msg),而要使用producer.send(msg,callback)

設置acks=all,表明所有副本Broker都要接受消息,該消息才算是“已提交”

設置retries>0,表明Producer自動重試,當網絡順斷時,防止消息丟失。

設置unclean.leader.election.enable=false

設置replication.factor >=3,增加副本數,保證數據冗余

設置min.insync.replicas > 1,控制的是消息至少要被寫入多少個副本才算是 已提交。

確保replication.factor > min.insync.replicas。如果兩者相等,那么只要有一個副本掛機,整個分區就無法正常工作了。推薦設置replication.factor = min.insync.replicas + 1

確保消息消費完再提交。設置enable.aoto.commit=false

Kafka 攔截器

分為生產者攔截器和消費者攔截器。

典型的應用場景可以應用于客戶端監控、端到端系統性能測試、消息審計等多種功能在內的場景。

Kafka是如何管理TCP連接的 java生產者是如何管理TCP連接的

KafkaProducer實例創建時啟動Sender線程,從而創建與bootstrap.servers中所有的Broker的TCP連接。

KafkaProducer實例首次更新元數據信息之后,還會再次創建與集群中所有Broker的TCP連接

如果Producer端發送信息到某臺Broker時,發現沒有與該Broker的TCP連接,那么也會創建連接

如果設置connections.max.idle.ms > 0,則步驟一中的TCP連接會被自動關閉;如果設置該參數-1,那么步驟一中創建的連接無法被關閉,會成為僵尸進程。

Java消費者是如何管理TCP連接的

創建的3個時機

發起FindCoordinator請求時

連接協調者時

消費數據時

消費者程序會創建3類TCP連接

確定協調者和獲取集群元數據

連接協調者,令其執行組成員管理操作

執行實際的消息獲取

冪等生產者和事務生產者

消息交付可靠性保障,常見的承諾有以下三種

最多一次:消息可能會丟失,但絕不會重復發送

至少一次:消息不會丟失,但有可能被重復發送

精確一次:消息不會丟失,也不會被重復發送

Kafka默認是最少一次

要保證精確一次,就需要冪等和事務。不過性能會想對較差。

冪等生產者

冪等性有很多好處。其最大的優勢在于我們可以安全地重試任何冪等性操作,反正它們不會破壞我們的系統狀態。

在0.11.0.0版本引入了冪等生產者,只要更改配置props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true)

使用冪等生產者要注意

它只能保證單分區的冪等,多分區無法實現

只能實現單會話上的冪等,重啟之后冪等消失

事務生產者

設置事務型Producer

props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true)

設置producer端參數transctional.id。最好為其設置一個有意義的名字

此外代碼也要做一些調整變化。

producer.initTransactions();
try {
            producer.beginTransaction();
            producer.send(record1);
            producer.send(record2);
            producer.commitTransaction();
} catch (KafkaException e) {
            producer.abortTransaction();
}
重平衡 怎么避免Rebalance

Rebalance發生的時機有三個

組成員數據量發生變化

訂閱主題數量發生變化

訂閱主題的分區數發生變化

后面兩個通常是運維的主動操作,無法避免。主要還是針對組成員數量減少的情況。增加一般也是人為主動的。

那么避免因為參數或邏輯不合理而導致的成員退出,與之相關的主要參數

session.timeout.ms,推薦設置6s

heartbeat.interval.ms,推薦設置2s

max.poll.interval.ms,推薦設置比你的業務邏輯處理要長

GC參數,避免頻繁的FULL GC

重平衡通知

重平衡過程是通過 消費者端的心跳線程來通知到其他消費者實例的。

0.10.1.0版本之前,發送心跳請求是在消費者主線程完成的,也就是kafkaConsumer.poll方法的那個線程。這樣做有諸多弊端,因為消息處理也是在這個線程中完成的。因此當業務邏輯處理消耗了較長時間,心跳請求就無法及時發送到協調者那邊了。導致協調者 錯誤地認為該消費者已經死了。

0.10.1.0版本開始,社區引入了一個多帶帶的線程來專門執行心跳發送。

消費者組狀態機

定義了5種狀態

各個狀態的流轉

一個消費者組最開始是Empty狀態,當重平衡過程開啟后,它會被置為PreparingRebalance狀態等待成員加入,之后變更到CompletingRebalance狀態等待分配方案,最后流轉到Stable狀態完成重平衡。

當有新成員或已有成員退出時,消費者組的狀態從Stable直接跳到PreparingRebalance狀態,此時,所有現存成員就必須重新申請加入組。

當所有成員都退出組后,消費者組狀態變更為Empty。

Kafka自動定期刪除過期位移的條件就是,組要處于Empty狀態。

重平衡流程 消費者端重平衡流程

JoinGroup請求

SyncGroup請求

Broker端重平衡場景分析

新成員入組

組成員主動離組

組成員崩潰離組

重平衡時協調者對組內成員提交位移的處理

位移提交

CommitFailedException怎么處理?

縮短消息處理的時間,該方法優先處理

增加Consumer端允許下游系統消費一批數據的最大時長。設置參數max.poll.interval.ms,新版本默認是5分鐘。

減少下游系統一次性消費的消息總數。max.poll.records

下游系統使用多線程來加速消費

多消費者實例

鑒于KafkaConsumer不是線程安全的事實,制定兩套多線程方案。

每個線程維護專屬的KafkaConsumer實例,負責完整的消息獲取、消息處理流程

核心代碼

```
public class KafkaConsumerRunner implements Runnable {
     private final AtomicBoolean closed = new AtomicBoolean(false);
     private final KafkaConsumer consumer;


     public void run() {
         try {
             consumer.subscribe(Arrays.asList("topic"));
             while (!closed.get()) {
            ConsumerRecords records = 
                consumer.poll(Duration.ofMillis(10000));
                 //  執行消息處理邏輯
             }
         } catch (WakeupException e) {
             // Ignore exception if closing
             if (!closed.get()) throw e;
         } finally {
             consumer.close();
         }
     }


     // Shutdown hook which can be called from a separate thread
     public void shutdown() {
         closed.set(true);
         consumer.wakeup();
     }
```

消費者程序使用單或多線程獲取消息,創建多個消費者線程執行消息處理邏輯

核心代碼

```
private final KafkaConsumer consumer;
private ExecutorService executors;
...


private int workerNum = ...;
executors = new ThreadPoolExecutor(
    workerNum, workerNum, 0L, TimeUnit.MILLISECONDS,
    new ArrayBlockingQueue<>(1000), 
    new ThreadPoolExecutor.CallerRunsPolicy());


...
while (true)  {
    ConsumerRecords records = 
        consumer.poll(Duration.ofSeconds(1));
    for (final ConsumerRecord record : records) {
        executors.submit(new Worker(record));
    }
}

```

兩種方案各有特點。

監控消費進度的3種方法

使用Kafka自帶命令行工具kafka-consumer-groups腳本

使用Kafka Consumer API

使用Kafka自帶的JMX監控指標

Kafka副本詳解

副本機制的好處:

提供數據冗余

提供高伸縮性

改善數據局部性

但Kafka只有第一種好處,原因是這樣的設計,Kafka有兩點好處

方便實現 Read-your-writes

指當你用生產者API向Kafka成功寫入消息后,馬上使用消費者API去讀取剛才生產的消息

方便實現單調讀(Monotonic Reads)

在多次消費信息時,不會看到該消息一會存在一會不存在的情況。

判斷Follower副本與Leader副本是否同步的標準,Broker參數replia.lag.time.max.ms的參數值。Kafka有一個in-sync Replicas(ISR)集合的概念。

Kafka控制器

控制器組件(Controller),是Kafka的核心組件,它的主要作用是在Apache Zookeeper的幫助下管理和協調整個Kafka集群。

控制器是怎么被選出來的

每臺Broker都能充當控制器,在Broker啟動時,會嘗試去Zookeeper中創建/controller節點。Kafka當前選舉規則,第一個成功創建/controller節點的Broker會被指定為控制器。

控制器能做什么?

主題管理

分區重分配

Prefered領導者選舉

集群成員管理

數據服務,控制器上保存最全的集群元數據信息

控制器保存了什么數據?

這些數據其實也在Zookeeper中存儲了一份。

控制器的故障轉移

總結

小竅門分享:當你覺得控制器出現問題時,比如主題無法刪除了,重分區hang住了,你可以不用重啟broker或者控制器,快速簡便的方法,直接去Zookeeper手動刪除/controller節點。

這樣做的好處是,既可以引發控制器的重選舉,又可以避免重啟Broker導致的消息中斷。

Kafka請求處理 請求方案

Kafka方案類似于Reactor模式

那么Kafka類似的方案是這樣的。網絡線程池默認參數num.network.threads=3

好了,客戶端發來的請求會被Aceptor線程分發到任意一個網絡線程中,由他們進行處理。你可能會認為,網絡線程池是順序處理不就好了?實際上,Kafka在這個環節上又做了一層異步線程池的處理。

IO線程池執行真正的處理。如果是PRODUCER生產請求,則將消息寫入到底層的磁盤日志中;如果是FETCH請求,則從磁盤或頁緩存中讀取消息。當IO請求處理完請求后,會將生成的響應放入網絡線程池的響應隊列中,并由對應的網絡線程負責將Response反還給客戶端。

請求隊列是所有網絡線程共享的,而響應隊列則是每個網絡線程專屬的。

IO線程池默認參數num.io.threads=8

圖中還有一個Purgatory的組件,這是Kafka中著名的“煉獄”組件。

它是用來緩存延時請求的,所謂延時請求,就是那些一時未滿足條件的不可立刻處理的請求。

文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。

轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/75734.html

相關文章

  • 極客時間《Kafka核心技術與實戰》返現 + 腦圖 + 送學習筆記

    摘要:作者胡夕人人貸計算平臺部總監,將在這篇專欄中一步一步的教你填平這些坑,全面提升你的實戰能力搭配掘金小冊圖解之核心原理學習效果更佳哦送學習筆記 showImg(https://segmentfault.com/img/bVbsg9O?w=258&h=258);關注有課學微信公眾號,回復暗號 kafka 獲取購買《Kafka核心技術與實戰》極客時間專欄地址,購買成功后提交購買截圖即可獲得返...

    yvonne 評論0 收藏0

發表評論

0條評論

aikin

|高級講師

TA的文章

閱讀更多
最新活動
閱讀需要支付1元查看
<