摘要:創(chuàng)建一個設置為非阻塞模式創(chuàng)建并設置相關屬性調(diào)用的方法,該方法會向遠端發(fā)起建連請求因為是非阻塞的,所以該方法返回時,連接不一定已經(jīng)建立好即完成次握手。
我們知道kafka是基于TCP連接的。其并沒有像很多中間件使用netty作為TCP服務器。而是自己基于Java NIO寫了一套。關于kafka為什么沒有選用netty的原因可以看這里。
對Java NIO不太了解的同學可以先看下這兩篇文章,本文需要讀者對NIO有一定的了解。
https://segmentfault.com/a/11...
https://www.jianshu.com/p/0d4...
更多文章見個人博客:https://github.com/farmerjohn...
幾個重要類先看下Kafka Client的網(wǎng)絡層架構,圖片來自于這篇文章。
本文主要分析的是Network層。
Network層有兩個重要的類:Selector和KafkaChannel。
這兩個類和Java NIO層的java.nio.channels.Selector和Channel有點類似。
Selector幾個關鍵字段如下
// jdk nio中的Selector java.nio.channels.Selector nioSelector; // 記錄當前Selector的所有連接信息 Mapchannels; // 已發(fā)送完成的請求 List completedSends; // 已收到的請求 List completedReceives; // 還沒有完全收到的請求,對上層不可見 Map > stagedReceives; // 作為client端,調(diào)用connect連接遠端時返回true的連接 Set immediatelyConnectedKeys; // 已經(jīng)完成的連接 List connected; // 一次讀取的最大大小 int maxReceiveSize;
從網(wǎng)絡層來看kafka是分為client端(producer和consumer,broker作為從時也是client)和server端(broker)的。本文將分析client端是如何建立連接,以及收發(fā)數(shù)據(jù)的。server也是依靠Selector和KafkaChannel進行網(wǎng)絡傳輸。在Network層兩端的區(qū)別并不大。
建立連接kafka的client端啟動時會調(diào)用Selector#connect(下文中如無特殊注明,均指org.apache.kafka.common.network.Selector)方法建立連接。
public void connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException { if (this.channels.containsKey(id)) throw new IllegalStateException("There is already a connection for id " + id); // 創(chuàng)建一個SocketChannel SocketChannel socketChannel = SocketChannel.open(); // 設置為非阻塞模式 socketChannel.configureBlocking(false); // 創(chuàng)建socket并設置相關屬性 Socket socket = socketChannel.socket(); socket.setKeepAlive(true); if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE) socket.setSendBufferSize(sendBufferSize); if (receiveBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE) socket.setReceiveBufferSize(receiveBufferSize); socket.setTcpNoDelay(true); boolean connected; try { // 調(diào)用SocketChannel的connect方法,該方法會向遠端發(fā)起tcp建連請求 // 因為是非阻塞的,所以該方法返回時,連接不一定已經(jīng)建立好(即完成3次握手)。連接如果已經(jīng)建立好則返回true,否則返回false。一般來說server和client在一臺機器上,該方法可能返回true。 connected = socketChannel.connect(address); } catch (UnresolvedAddressException e) { socketChannel.close(); throw new IOException("Can"t resolve address: " + address, e); } catch (IOException e) { socketChannel.close(); throw e; } // 對CONNECT事件進行注冊 SelectionKey key = socketChannel.register(nioSelector, SelectionKey.OP_CONNECT); KafkaChannel channel; try { // 構造一個KafkaChannel channel = channelBuilder.buildChannel(id, key, maxReceiveSize); } catch (Exception e) { ... } // 將kafkachannel綁定到SelectionKey上 key.attach(channel); // 放入到map中,id是遠端服務器的名稱 this.channels.put(id, channel); // connectct為true代表該連接不會再觸發(fā)CONNECT事件,所以這里要多帶帶處理 if (connected) { // OP_CONNECT won"t trigger for immediately connected channels log.debug("Immediately connected to node {}", channel.id()); // 加入到一個多帶帶的集合中 immediatelyConnectedKeys.add(key); // 取消對該連接的CONNECT事件的監(jiān)聽 key.interestOps(0); } }
這里的流程和標準的NIO流程差不多,需要多帶帶說下的是socketChannel#connect方法返回true的場景,該方法的注釋中有提到
*If this channel is in non-blocking mode then an invocation of this * method initiates a non-blocking connection operation. If the connection * is established immediately, as can happen with a local connection, then * this method returns true. Otherwise this method returns * false and the connection operation must later be completed by * invoking the {@link #finishConnect finishConnect} method.
也就是說在非阻塞模式下,對于local connection,連接可能在馬上就建立好了,那該方法會返回true,對于這種情況,不會再觸發(fā)之后的connect事件。因此kafka用一個多帶帶的集合immediatelyConnectedKeys將這些特殊的連接記錄下來。在接下來的步驟會進行特殊處理。
之后會調(diào)用poll方法對網(wǎng)絡事件監(jiān)聽:
public void poll(long timeout) throws IOException { ... // select方法是對java.nio.channels.Selector#select的一個簡單封裝 int readyKeys = select(timeout); ... // 如果有就緒的事件或者immediatelyConnectedKeys非空 if (readyKeys > 0 || !immediatelyConnectedKeys.isEmpty()) { // 對已就緒的事件進行處理,第2個參數(shù)為false pollSelectionKeys(this.nioSelector.selectedKeys(), false, endSelect); // 對immediatelyConnectedKeys進行處理。第2個參數(shù)為true pollSelectionKeys(immediatelyConnectedKeys, true, endSelect); } addToCompletedReceives(); ... } private void pollSelectionKeys(IterableselectionKeys, boolean isImmediatelyConnected, long currentTimeNanos) { Iterator iterator = selectionKeys.iterator(); // 遍歷集合 while (iterator.hasNext()) { SelectionKey key = iterator.next(); // 移除當前元素,要不然下次poll又會處理一遍 iterator.remove(); // 得到connect時創(chuàng)建的KafkaChannel KafkaChannel channel = channel(key); ... try { // 如果當前處理的是immediatelyConnectedKeys集合的元素或處理的是CONNECT事件 if (isImmediatelyConnected || key.isConnectable()) { // finishconnect中會增加READ事件的監(jiān)聽 if (channel.finishConnect()) { this.connected.add(channel.id()); this.sensors.connectionCreated.record(); ... } else continue; } // 對于ssl的連接還有些額外的步驟 if (channel.isConnected() && !channel.ready()) channel.prepare(); // 如果是READ事件 if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) { NetworkReceive networkReceive; while ((networkReceive = channel.read()) != null) addToStagedReceives(channel, networkReceive); } // 如果是WRITE事件 if (channel.ready() && key.isWritable()) { Send send = channel.write(); if (send != null) { this.completedSends.add(send); this.sensors.recordBytesSent(channel.id(), send.size()); } } // 如果連接失效 if (!key.isValid()) close(channel, true); } catch (Exception e) { String desc = channel.socketDescription(); if (e instanceof IOException) log.debug("Connection with {} disconnected", desc, e); else log.warn("Unexpected error from {}; closing connection", desc, e); close(channel, true); } finally { maybeRecordTimePerConnection(channel, channelStartTimeNanos); } } }
因為immediatelyConnectedKeys中的連接不會觸發(fā)CONNNECT事件,所以在poll時會多帶帶對immediatelyConnectedKeys的channel調(diào)用finishConnect方法。在明文傳輸模式下該方法會調(diào)用到PlaintextTransportLayer#finishConnect,其實現(xiàn)如下:
public boolean finishConnect() throws IOException { // 返回true代表已經(jīng)連接好了 boolean connected = socketChannel.finishConnect(); if (connected) // 取消監(jiān)聽CONNECt事件,增加READ事件的監(jiān)聽 key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT | SelectionKey.OP_READ); return connected; }
關于immediatelyConnectedKeys更詳細的內(nèi)容可以看看這里。
發(fā)送數(shù)據(jù)kafka發(fā)送數(shù)據(jù)分為兩個步驟:
1.調(diào)用Selector#send將要發(fā)送的數(shù)據(jù)保存在對應的KafkaChannel中,該方法并沒有進行真正的網(wǎng)絡IO。
// Selector#send public void send(Send send) { String connectionId = send.destination(); // 如果所在的連接正在關閉中,則加入到失敗集合failedSends中 if (closingChannels.containsKey(connectionId)) this.failedSends.add(connectionId); else { KafkaChannel channel = channelOrFail(connectionId, false); try { channel.setSend(send); } catch (CancelledKeyException e) { this.failedSends.add(connectionId); close(channel, false); } } } //KafkaChannel#setSend public void setSend(Send send) { // 如果還有數(shù)據(jù)沒有發(fā)送出去則報錯 if (this.send != null) throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress."); // 保存下來 this.send = send; // 添加對WRITE事件的監(jiān)聽 this.transportLayer.addInterestOps(SelectionKey.OP_WRITE); }
調(diào)用Selector#poll,在第一步中已經(jīng)對該channel注冊了WRITE事件的監(jiān)聽,所以在當channel可寫時,會調(diào)用到pollSelectionKeys將數(shù)據(jù)真正的發(fā)送出去。
private void pollSelectionKeys(IterableselectionKeys, boolean isImmediatelyConnected, long currentTimeNanos) { Iterator iterator = selectionKeys.iterator(); // 遍歷集合 while (iterator.hasNext()) { SelectionKey key = iterator.next(); // 移除當前元素,要不然下次poll又會處理一遍 iterator.remove(); // 得到connect時創(chuàng)建的KafkaChannel KafkaChannel channel = channel(key); ... try { ... // 如果是WRITE事件 if (channel.ready() && key.isWritable()) { // 真正的網(wǎng)絡寫 Send send = channel.write(); // 一個Send對象可能會被拆成幾次發(fā)送,write非空代表一個send發(fā)送完成 if (send != null) { // completedSends代表已發(fā)送完成的集合 this.completedSends.add(send); this.sensors.recordBytesSent(channel.id(), send.size()); } } ... } catch (Exception e) { ... } finally { maybeRecordTimePerConnection(channel, channelStartTimeNanos); } } }
當可寫時,會調(diào)用KafkaChannel#write方法,該方法中會進行真正的網(wǎng)絡IO:
public Send write() throws IOException { Send result = null; if (send != null && send(send)) { result = send; send = null; } return result; } private boolean send(Send send) throws IOException { // 最終調(diào)用SocketChannel#write進行真正的寫 send.writeTo(transportLayer); if (send.completed()) // 如果寫完了,則移除對WRITE事件的監(jiān)聽 transportLayer.removeInterestOps(SelectionKey.OP_WRITE); return send.completed(); }接收數(shù)據(jù)
如果遠端有發(fā)送數(shù)據(jù)過來,那調(diào)用poll方法時,會對接收到的數(shù)據(jù)進行處理。
public void poll(long timeout) throws IOException { ... // select方法是對java.nio.channels.Selector#select的一個簡單封裝 int readyKeys = select(timeout); ... // 如果有就緒的事件或者immediatelyConnectedKeys非空 if (readyKeys > 0 || !immediatelyConnectedKeys.isEmpty()) { // 對已就緒的事件進行處理,第2個參數(shù)為false pollSelectionKeys(this.nioSelector.selectedKeys(), false, endSelect); // 對immediatelyConnectedKeys進行處理。第2個參數(shù)為true pollSelectionKeys(immediatelyConnectedKeys, true, endSelect); } addToCompletedReceives(); ... } private void pollSelectionKeys(IterableselectionKeys, boolean isImmediatelyConnected, long currentTimeNanos) { Iterator iterator = selectionKeys.iterator(); // 遍歷集合 while (iterator.hasNext()) { SelectionKey key = iterator.next(); // 移除當前元素,要不然下次poll又會處理一遍 iterator.remove(); // 得到connect時創(chuàng)建的KafkaChannel KafkaChannel channel = channel(key); ... try { ... // 如果是READ事件 if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) { NetworkReceive networkReceive; // read方法會從網(wǎng)絡中讀取數(shù)據(jù),但可能一次只能讀取一個req的部分數(shù)據(jù)。只有讀到一個完整的req的情況下,該方法才返回非null while ((networkReceive = channel.read()) != null) // 將讀到的請求存在stagedReceives中 addToStagedReceives(channel, networkReceive); } ... } catch (Exception e) { ... } finally { maybeRecordTimePerConnection(channel, channelStartTimeNanos); } } } private void addToStagedReceives(KafkaChannel channel, NetworkReceive receive) { if (!stagedReceives.containsKey(channel)) stagedReceives.put(channel, new ArrayDeque ()); Deque deque = stagedReceives.get(channel); deque.add(receive); }
在之后的addToCompletedReceives方法中會對該集合進行處理。
private void addToCompletedReceives() { if (!this.stagedReceives.isEmpty()) { Iterator>> iter = this.stagedReceives.entrySet().iterator(); while (iter.hasNext()) { Map.Entry > entry = iter.next(); KafkaChannel channel = entry.getKey(); // 對于client端來說該isMute返回為false,server端則依靠該方法保證消息的順序 if (!channel.isMute()) { Deque deque = entry.getValue(); addToCompletedReceives(channel, deque); if (deque.isEmpty()) iter.remove(); } } } } private void addToCompletedReceives(KafkaChannel channel, Deque stagedDeque) { // 將每個channel的第一個NetworkReceive加入到completedReceives NetworkReceive networkReceive = stagedDeque.poll(); this.completedReceives.add(networkReceive); this.sensors.recordBytesReceived(channel.id(), networkReceive.payload().limit()); }
讀出數(shù)據(jù)后,會先放到stagedReceives集合中,然后在addToCompletedReceives方法中對于每個channel都會從stagedReceives取出一個NetworkReceive(如果有的話),放入到completedReceives中。
這樣做的原因有兩點:
對于SSL的連接來說,其數(shù)據(jù)內(nèi)容是加密的,所以不能精準的確定本次需要讀取的數(shù)據(jù)大小,只能盡可能的多讀,這樣會導致可能會比請求的數(shù)據(jù)讀的要多。那如果該channel之后沒有數(shù)據(jù)可以讀,會導致多讀的數(shù)據(jù)將不會被處理。
kafka需要確保一個channel上request被處理的順序是其發(fā)送的順序。因此對于每個channel而言,每次poll上層最多只能看見一個請求,當該請求處理完成之后,再處理其他的請求。在sever端,每次poll后都會將該channel給mute掉,即不再從該channel上讀取數(shù)據(jù)。當處理完成之后,才將該channelunmute,即之后可以從該socket上讀取數(shù)據(jù)。而client端則是通過InFlightRequests#canSendMore控制。
代碼中關于這段邏輯的注釋如下:
/* In the "Plaintext" setting, we are using socketChannel to read & write to the network. But for the "SSL" setting, * we encrypt the data before we use socketChannel to write data to the network, and decrypt before we return the responses. * This requires additional buffers to be maintained as we are reading from network, since the data on the wire is encrypted * we won"t be able to read exact no.of bytes as kafka protocol requires. We read as many bytes as we can, up to SSLEngine"s * application buffer size. This means we might be reading additional bytes than the requested size. * If there is no further data to read from socketChannel selector won"t invoke that channel and we"ve have additional bytes * in the buffer. To overcome this issue we added "stagedReceives" map which contains per-channel deque. When we are * reading a channel we read as many responses as we can and store them into "stagedReceives" and pop one response during * the poll to add the completedReceives. If there are any active channels in the "stagedReceives" we set "timeout" to 0 * and pop response and add to the completedReceives. * Atmost one entry is added to "completedReceives" for a channel in each poll. This is necessary to guarantee that * requests from a channel are processed on the broker in the order they are sent. Since outstanding requests added * by SocketServer to the request queue may be processed by different request handler threads, requests on each * channel must be processed one-at-a-time to guarantee ordering. */End
本文分析了kafka network層的實現(xiàn),在閱讀kafka源碼時,如果不把network層搞清楚會比較迷,比如req/resp的順序保障機制、真正進行網(wǎng)絡IO的不是send方法等等。
文章版權歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://specialneedsforspecialkids.com/yun/72791.html
摘要:快速搭建日志收集版本進行文章的第二次修改,包括了之前的簡單方案的升級過程。分割線快速搭建日志收集第一版本新項目短時間來實現(xiàn)日志采集。 快速搭建elk日志收集 kafka版本 進行文章的第二次修改,包括了之前的簡單方案的升級過程。 因為業(yè)務的不斷更新升級,為了保證線上業(yè)務也能正常使用elk服務,并且使得elk的服務和線業(yè)務流解耦(即避免直接寫入es的方式可能會帶來的耗時影響)所以我們采用...
摘要:現(xiàn)在用方式調(diào)用接口,中使用方式輸入內(nèi)容日志平臺網(wǎng)關層基于。日志平臺網(wǎng)關層基于到此為止,提取經(jīng)過網(wǎng)關的接口信息,并將其寫入日志文件就完成了,所有的接口日志都寫入了文件中。 背景介紹 1、問題現(xiàn)狀與嘗試 沒有做日志記錄的線上系統(tǒng),絕對是給系統(tǒng)運維人員留下的坑。尤其是前后端分離的項目,后端的接口日志可以解決對接、測試和運維時的很多問題。之前項目上發(fā)布的接口都是通過Oracle Service...
閱讀 1264·2021-10-18 13:32
閱讀 2333·2021-09-24 09:47
閱讀 1323·2021-09-23 11:22
閱讀 2463·2019-08-30 14:06
閱讀 571·2019-08-30 12:48
閱讀 1997·2019-08-30 11:03
閱讀 535·2019-08-29 17:09
閱讀 2462·2019-08-29 14:10