摘要:發(fā)送消息在上述示例中我們使用了接口傳入并發(fā)送,在實際實現(xiàn)中該方法使用另一個接口并傳入了回調(diào)函數(shù)。需要注意的是,如果攔截器拋出異常,程序不會停止,只會寫入一個級別的日志。如果下一個攔截器依賴于上一個的結(jié)果,那么最終得到的數(shù)據(jù)可能不正確。
Kafka作為當前流行的消息中間件,在消息隊列、微服務架構(gòu)、大數(shù)據(jù)平臺等方面有著廣泛的應用。如果將平臺比作人體,Kafka便是神經(jīng)系統(tǒng),負責傳遞消息。本系列利用碎片化的時間,閱讀Kafka源碼深入了解各個模塊的原理和實現(xiàn),不定期更新。文中所有代碼均來自https://github.com/apache/kafka
Kafka Producer簡單使用示例KafkaProducer用于將事件從客戶端應用發(fā)送至Kafka集群。Producer本身是線程安全的,并且多個線程共享單個實例時也會有性能上的提升。以下示例來自org.apache.kafka.clients.producer.KafkaProducer類:
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("acks", "all"); props.put("retries", 0); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producerproducer = new KafkaProducer<>(props); for (int i = 0; i < 100; i++) producer.send(new ProducerRecord ("my-topic", Integer.toString(i), Integer.toString(i))); producer.close();
props變量定義了Producer的屬性和基本配置信息:
bootstrap.servers:kafka服務器地址,事件會發(fā)送至該服務器。在生產(chǎn)環(huán)境中通常定義多個服務器并用逗號分隔,以防止單個服務器突然崩潰。
acks:當事件發(fā)送至Kafka集群時,數(shù)據(jù)在集群內(nèi)部會有主從備份,acks定義了何時可以判定消息發(fā)送成功。
acks = 0時,Producer在消息發(fā)送后不會等待服務器返回結(jié)果,立刻返回成功。
acks = 1時,消息在主(leader)服務器寫入后返回成功,不會等待從(follower)服務器備份完成。
acks = all時,消息在主從服務器都寫入成功后才告知Producer發(fā)送成功。
retries: 當發(fā)送失敗時,producer自動重發(fā)的次數(shù),并不是所有的錯誤都可以觸發(fā)自動重發(fā),并且自動重發(fā)可能導致消息發(fā)送順序錯亂,具體信息將在以后的章節(jié)介紹
key.serializer/value.serializer: 所有發(fā)送至kafka的數(shù)據(jù)都是以byte形式存在的,key/value serializer負責將Java實例轉(zhuǎn)化為字節(jié)。
使用上述配置初始化proudcer后,我們可以構(gòu)建ProducerRecord,這里使用topic,key,value構(gòu)建消息并調(diào)用producer.send方法發(fā)送至kafka集群。在程序結(jié)束前務必調(diào)用producer.close方法,因為默認情況下producer會在內(nèi)存中batch多個事件,并一起發(fā)送以增加性能,close方法會強制發(fā)送當前內(nèi)存中未發(fā)送的事件。
發(fā)送消息在上述示例中我們使用了send接口傳入并發(fā)送ProducerRecord,在實際實現(xiàn)中該方法使用另一個send接口并傳入了null回調(diào)函數(shù)。Kafka發(fā)送消息是異步的,回調(diào)函數(shù)可以獲得發(fā)送結(jié)果,若發(fā)送成功,回調(diào)函數(shù)可以得到消息的元數(shù)據(jù)包括topic,partition,offset等。若失敗可獲得錯誤信息。
/** * Asynchronously send a record to a topic. Equivalent tosend(record, null)
. * See {@link #send(ProducerRecord, Callback)} for details. */ @Override public Futuresend(ProducerRecord record) { return send(record, null); } /** * Asynchronously send a record to a topic and invoke the provided callback when the send has been acknowledged. * * The send is asynchronous and this method will return immediately once the record has been stored in the buffer of * records waiting to be sent. This allows sending many records in parallel without blocking to wait for the response after each one. */ @Override public Future
send(ProducerRecord record, Callback callback) { // intercept the record, which can be potentially modified; this method does not throw exceptions ProducerRecord interceptedRecord = this.interceptors.onSend(record); return doSend(interceptedRecord, callback); }
一個回調(diào)函數(shù)的例子:
producer.send(myRecord, new Callback() { public void onCompletion(RecordMetadata metadata, Exception e) { if(e != null) { e.printStackTrace(); } else { System.out.println("The offset of the record we just sent is: " + metadata.offset()); } } });攔截器(ProducerInterceptor)
public interface ProducerInterceptorextends Configurable { /** * 消息發(fā)送前調(diào)用 */ public ProducerRecord onSend(ProducerRecord record); /** * 消息發(fā)送后,服務器返回結(jié)果(成功或錯誤)時調(diào)用 */ public void onAcknowledgement(RecordMetadata metadata, Exception exception); /** * 攔截器關(guān)閉時調(diào)用 */ public void close(); }
每一個Producer都可以設置一個或多個攔截器,攔截器允許客戶端攔截或修改要發(fā)送的消息,通過Properties進行設置:
Properties props = new Properties(); ... props.put("interceptor.classes", "your.interceptor.class.name");
public class KafkaProducerimplements Producer { // ... other class members private final ProducerInterceptors interceptors; // Producer構(gòu)造函數(shù) KafkaProducer(ProducerConfig config, Serializer keySerializer, Serializer valueSerializer, Metadata metadata, KafkaClient kafkaClient) { // ...其他步驟省略 // 從config中獲取攔截器實例,config從properties中構(gòu)造 List > interceptorList = (List) (new ProducerConfig(userProvidedConfigs, false)).getConfiguredInstances(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ProducerInterceptor.class); this.interceptors = new ProducerInterceptors<>(interceptorList); } }
攔截器設置完成后,在send方法中進行調(diào)用:
@Override public FutureProducerInterceptorssend(ProducerRecord record, Callback callback) { // intercept the record, which can be potentially modified; this method does not throw exceptions ProducerRecord interceptedRecord = this.interceptors.onSend(record); return doSend(interceptedRecord, callback); }
ProducerInterceptors是一個容器類,封裝了多個攔截器,onSend方法被producer的send方法調(diào)用。
/** * A container that holds the list {@link org.apache.kafka.clients.producer.ProducerInterceptor} * and wraps calls to the chain of custom interceptors. */ public class ProducerInterceptorsimplements Closeable { private final List > interceptors; public ProducerInterceptors(List > interceptors) { this.interceptors = interceptors; } public ProducerRecord onSend(ProducerRecord record) { ProducerRecord interceptRecord = record; // 按順序執(zhí)行每一個攔截器的onSend方法 for (ProducerInterceptor interceptor : this.interceptors) { try { interceptRecord = interceptor.onSend(interceptRecord); } catch (Exception e) { // do not propagate interceptor exception, log and continue calling other interceptors // be careful not to throw exception from here if (record != null) log.warn("Error executing interceptor onSend callback for topic: {}, partition: {}", record.topic(), record.partition(), e); else log.warn("Error executing interceptor onSend callback", e); } } return interceptRecord; } /** * 1. 當發(fā)送的消息被服務器接受并返回時調(diào)用 * 2. 當發(fā)送的消息未到達服務器之前就失敗時調(diào)用(見下方onSendError方法) **/ public void onAcknowledgement(RecordMetadata metadata, Exception exception) { for (ProducerInterceptor interceptor : this.interceptors) { try { interceptor.onAcknowledgement(metadata, exception); } catch (Exception e) { // do not propagate interceptor exceptions, just log log.warn("Error executing interceptor onAcknowledgement callback", e); } } } /** * Producer在發(fā)送數(shù)據(jù)前要構(gòu)建多種不同的信息,每一步都有可能拋出異常,本方法由producer在遇到異常時調(diào)用, * TopicPartition記錄了topic和partition信息,由producer構(gòu)建,但若異常發(fā)生在其構(gòu)建之前,該參數(shù)為空,因此從record里提取topic和partition數(shù)據(jù)構(gòu)建。 **/ public void onSendError(ProducerRecord record, TopicPartition interceptTopicPartition, Exception exception) { for (ProducerInterceptor interceptor : this.interceptors) { try { if (record == null && interceptTopicPartition == null) { interceptor.onAcknowledgement(null, exception); } else { if (interceptTopicPartition == null) { interceptTopicPartition = new TopicPartition(record.topic(), record.partition() == null ? RecordMetadata.UNKNOWN_PARTITION : record.partition()); } interceptor.onAcknowledgement(new RecordMetadata(interceptTopicPartition, -1, -1, RecordBatch.NO_TIMESTAMP, Long.valueOf(-1L), -1, -1), exception); } } catch (Exception e) { // do not propagate interceptor exceptions, just log log.warn("Error executing interceptor onAcknowledgement callback", e); } } } }
需要注意的是,如果攔截器拋出異常,程序不會停止,只會寫入一個warn級別的日志。并且攔截器鏈也不會停止執(zhí)行,而是繼續(xù)執(zhí)行下一個攔截器。如果下一個攔截器依賴于上一個的結(jié)果,那么最終得到的數(shù)據(jù)可能不正確。
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://specialneedsforspecialkids.com/yun/68744.html
摘要:核心實現(xiàn)是這個方法通過不同的模式可以實現(xiàn)發(fā)送即忘忽略返回結(jié)果同步發(fā)送獲取返回的對象,回調(diào)函數(shù)置為異步發(fā)送設置回調(diào)函數(shù)三種消息模式。 Kafka是一款很棒的消息系統(tǒng),可以看看我之前寫的 后端好書閱讀與推薦來了解一下它的整體設計。今天我們就來深入了解一下它的實現(xiàn)細節(jié)(我fork了一份代碼),首先關(guān)注Producer這一方。 要使用kafka首先要實例化一個KafkaProducer,需要有...
閱讀 2648·2021-11-24 09:39
閱讀 1648·2021-11-24 09:38
閱讀 629·2021-11-22 14:44
閱讀 1888·2021-11-18 10:02
閱讀 2573·2021-11-18 10:02
閱讀 1158·2021-10-14 09:43
閱讀 4244·2021-09-29 09:35
閱讀 523·2021-07-30 15:30