摘要:核心實現(xiàn)是這個方法通過不同的模式可以實現(xiàn)發(fā)送即忘忽略返回結(jié)果同步發(fā)送獲取返回的對象,回調(diào)函數(shù)置為異步發(fā)送設置回調(diào)函數(shù)三種消息模式。
Kafka是一款很棒的消息系統(tǒng),可以看看我之前寫的 后端好書閱讀與推薦來了解一下它的整體設計。今天我們就來深入了解一下它的實現(xiàn)細節(jié)(我fork了一份代碼),首先關注Producer這一方。
要使用kafka首先要實例化一個KafkaProducer,需要有brokerIP、序列化器等必要Properties以及acks(0、1、n)、compression、retries、batch.size等非必要Properties,通過這個簡單的接口可以控制Producer大部分行為,實例化后就可以調(diào)用send方法發(fā)送消息了。
核心實現(xiàn)是這個方法:
public Futuresend(ProducerRecord record, Callback callback) { // intercept the record, which can be potentially modified; this method does not throw exceptions ProducerRecord interceptedRecord = this.interceptors.onSend(record);//① return doSend(interceptedRecord, callback);//② }
通過不同的模式可以實現(xiàn)發(fā)送即忘(忽略返回結(jié)果)、同步發(fā)送(獲取返回的future對象,回調(diào)函數(shù)置為null)、異步發(fā)送(設置回調(diào)函數(shù))三種消息模式。
我們來看看消息類ProducerRecord有哪些屬性:
private final String topic;//主題 private final Integer partition;//分區(qū) private final Headers headers;//頭 private final K key;//鍵 private final V value;//值 private final Long timestamp;//時間戳
它有多個構(gòu)造函數(shù),可以適應不同的消息類型:比如有無分區(qū)、有無key等。
①中ProducerInterceptors(有0 ~ 無窮多個,形成一個攔截鏈)對ProducerRecord進行攔截處理(比如打上時間戳,進行審計與統(tǒng)計等操作)
public ProducerRecordonSend(ProducerRecord record) { ProducerRecord interceptRecord = record; for (ProducerInterceptor interceptor : this.interceptors) { try { interceptRecord = interceptor.onSend(interceptRecord); } catch (Exception e) { // 不拋出異常,繼續(xù)執(zhí)行下一個攔截器 if (record != null) log.warn("Error executing interceptor onSend callback for topic: {}, partition: {}", record.topic(), record.partition(), e); else log.warn("Error executing interceptor onSend callback", e); } } return interceptRecord; }
如果用戶有定義就進行處理并返回處理后的ProducerRecord,否則直接返回本身。
然后②中doSend真正發(fā)送消息,并且是異步的(源碼太長只保留關鍵):
private FuturedoSend(ProducerRecord record, Callback callback) { TopicPartition tp = null; try { // 序列化 key 和 value byte[] serializedKey; try { serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key()); } catch (ClassCastException cce) { } byte[] serializedValue; try { serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value()); } catch (ClassCastException cce) { } // 計算分區(qū)獲得主題與分區(qū) int partition = partition(record, serializedKey, serializedValue, cluster); tp = new TopicPartition(record.topic(), partition); // 回調(diào)與事務處理省略。 Header[] headers = record.headers().toArray(); // 消息追加到RecordAccumulator中 RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, headers, interceptCallback, remainingWaitMs); // 該批次滿了或者創(chuàng)建了新的批次就要喚醒IO線程發(fā)送該批次了,也就是sender的wakeup方法 if (result.batchIsFull || result.newBatchCreated) { log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition); this.sender.wakeup(); } return result.future; } catch (Exception e) { // 攔截異常并拋出 this.interceptors.onSendError(record, tp, e); throw e; } }
下面是計算分區(qū)的方法:
private int partition(ProducerRecordrecord, byte[] serializedKey, byte[] serializedValue, Cluster cluster) { Integer partition = record.partition(); // 消息有分區(qū)就直接使用,否則就使用分區(qū)器計算 return partition != null ? partition : partitioner.partition( record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster); }
默認的分區(qū)器DefaultPartitioner實現(xiàn)方式是如果partition存在就直接使用,否則根據(jù)key計算partition,如果key也不存在就使用round robin算法分配partition。
/** * The default partitioning strategy: *
以上就是發(fā)送消息的邏輯處理,接下來我們再看看消息發(fā)送的物理處理。
Sender(是一個Runnable,被包含在一個IO線程ioThread中,該線程不斷從RecordAccumulator隊列中的讀取消息并通過Selector將數(shù)據(jù)發(fā)送給Broker)的wakeup方法,實際上是KafkaClient接口的wakeup方法,由NetworkClient類實現(xiàn),采用了NIO,也就是java.nio.channels.Selector.wakeup()方法實現(xiàn)。
Sender的run中主要邏輯是不停執(zhí)行準備消息和等待消息:
long pollTimeout = sendProducerData(now);//③ client.poll(pollTimeout, now);//④
③完成消息設置并保存到信道中,然后監(jiān)聽感興趣的key,由KafkaChannel實現(xiàn)。
public void setSend(Send send) { if (this.send != null) throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress, connection id is " + id); this.send = send; this.transportLayer.addInterestOps(SelectionKey.OP_WRITE); } // transportLayer的一種實現(xiàn)中的相關方法 public void addInterestOps(int ops) { key.interestOps(key.interestOps() | ops); }
④主要是Selector的poll,其select被wakeup喚醒:
public void poll(long timeout) throws IOException { /* check ready keys */ long startSelect = time.nanoseconds(); int numReadyKeys = select(timeout);//wakeup使其停止阻塞 long endSelect = time.nanoseconds(); this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds()); if (numReadyKeys > 0 || !immediatelyConnectedKeys.isEmpty() || dataInBuffers) { SetreadyKeys = this.nioSelector.selectedKeys(); // Poll from channels that have buffered data (but nothing more from the underlying socket) if (dataInBuffers) { keysWithBufferedRead.removeAll(readyKeys); //so no channel gets polled twice Set toPoll = keysWithBufferedRead; keysWithBufferedRead = new HashSet<>(); //poll() calls will repopulate if needed pollSelectionKeys(toPoll, false, endSelect); } // Poll from channels where the underlying socket has more data pollSelectionKeys(readyKeys, false, endSelect); // Clear all selected keys so that they are included in the ready count for the next select readyKeys.clear(); pollSelectionKeys(immediatelyConnectedKeys, true, endSelect); immediatelyConnectedKeys.clear(); } else { madeReadProgressLastPoll = true; //no work is also "progress" } long endIo = time.nanoseconds(); this.sensors.ioTime.record(endIo - endSelect, time.milliseconds()); }
其中pollSelectionKeys方法會調(diào)用如下方法完成消息發(fā)送:
public Send write() throws IOException { Send result = null; if (send != null && send(send)) { result = send; send = null; } return result; } private boolean send(Send send) throws IOException { send.writeTo(transportLayer); if (send.completed()) transportLayer.removeInterestOps(SelectionKey.OP_WRITE); return send.completed(); }
Send是一次數(shù)據(jù)發(fā)包,一般由ByteBufferSend或者MultiRecordsSend實現(xiàn),其writeTo調(diào)用transportLayer的write方法,一般由PlaintextTransportLayer或者SslTransportLayer實現(xiàn),區(qū)分是否使用ssl:
public long writeTo(GatheringByteChannel channel) throws IOException { long written = channel.write(buffers); if (written < 0) throw new EOFException("Wrote negative bytes to channel. This shouldn"t happen."); remaining -= written; pending = TransportLayers.hasPendingWrites(channel); return written; } public int write(ByteBuffer src) throws IOException { return socketChannel.write(src); }
到此就把Producer的業(yè)務相關邏輯處理和非業(yè)務相關的網(wǎng)絡 2方面的主要流程梳理清楚了。其他額外的功能是通過一些配置保證的。
比如順序保證就是max.in.flight.requests.per.connection,InFlightRequests的doSend會進行判斷(由NetworkClient的canSendRequest調(diào)用),只要該參數(shù)設為1即可保證當前包未確認就不能發(fā)送下一個包從而實現(xiàn)有序性
public boolean canSendMore(String node) { Dequequeue = requests.get(node); return queue == null || queue.isEmpty() || (queue.peekFirst().send.completed() && queue.size() < this.maxInFlightRequestsPerConnection); }
再比如可靠性,通過設置acks,Sender中sendProduceRequest的clientRequest加入了回調(diào)函數(shù):
RequestCompletionHandler callback = new RequestCompletionHandler() { public void onComplete(ClientResponse response) { handleProduceResponse(response, recordsByPartition, time.milliseconds());//調(diào)用completeBatch } }; /** * 完成或者重試投遞,這里如果acks不對就會重試 * * @param batch The record batch * @param response The produce response * @param correlationId The correlation id for the request * @param now The current POSIX timestamp in milliseconds */ private void completeBatch(ProducerBatch batch, ProduceResponse.PartitionResponse response, long correlationId, long now, long throttleUntilTimeMs) { } public class ProduceResponse extends AbstractResponse { /** * Possible error code: * INVALID_REQUIRED_ACKS (21) */ }
kafka源碼一層一層包裝很多,錯綜復雜,如有錯誤請大家不吝賜教。
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://specialneedsforspecialkids.com/yun/76861.html
摘要:相關概念協(xié)議高級消息隊列協(xié)議是一個標準開放的應用層的消息中間件協(xié)議。可以用命令與不同,不是線程安全的。手動提交執(zhí)行相關邏輯提交注意點將寫成單例模式,有助于減少端占用的資源。自身是線程安全的類,只要封裝得當就能最恰當?shù)陌l(fā)揮好的作用。 本文使用的Kafka版本0.11 先思考些問題: 我想分析一下用戶行為(pageviews),以便我能設計出更好的廣告位 我想對用戶的搜索關鍵詞進行統(tǒng)計,...
閱讀 1357·2021-11-24 09:39
閱讀 1346·2021-11-04 16:12
閱讀 2686·2021-09-24 09:47
閱讀 3337·2021-09-01 10:50
閱讀 1477·2019-08-30 15:55
閱讀 1423·2019-08-30 15:43
閱讀 642·2019-08-30 11:08
閱讀 3578·2019-08-23 18:33