摘要:如果數(shù)據(jù)庫檢測到是連續(xù)的,則表明沒有串包,如果不連續(xù),則表示串包,數(shù)據(jù)庫會直接丟棄該連接。源碼分析上一節(jié)我們分析到,當一個前端連接過來,并不是直接和綁定,而是先插入到線程的注冊隊列中這樣能釋放的壓力處理更多前端連接。
報文格式
這一節(jié)我們來講Cobar Handshake的過程。
MySQL服務端和客戶端交互的所有的包格式都是統(tǒng)一的,報文格式如下圖:
MySQL報文的消息頭共有4個字節(jié),前3字節(jié)表示的是實際數(shù)據(jù)的長度(不包含消息頭),并且字節(jié)是按照小端模式排放的。
第四個字節(jié)MySQL為了防止串包用的,其原理是每收到一個報文,都在sequence id上加1。如果數(shù)據(jù)庫檢測到sequence id是連續(xù)的,則表明沒有串包,如果不連續(xù),則表示串包,數(shù)據(jù)庫會直接丟棄該連接。
小端模式就是低位字節(jié)排放在內存的低地址端,高位字節(jié)排放在內存的高地址端。 大端模式則相反。
下面是Handshake包的結構,括號內表示該字段的字節(jié)數(shù):
seed部分是加密種子,分為前后兩個部分,通過隨機數(shù)生成。
源碼分析上一節(jié)我們分析到,當一個前端連接過來,并不是直接和selector綁定,而是先插入到R線程的注冊隊列中,這樣能釋放NIOAcceptor的壓力,處理更多前端連接。所以,連接和selector的綁定過程是在R線程中進行的,由register方法實現(xiàn),代碼如下:
private void register(Selector selector) { NIOConnection c = null; while ((c = registerQueue.poll()) != null) { try { c.register(selector); } catch (Throwable e) { c.error(ErrorCode.ERR_REGISTER, e); } } }
實際的綁定操作是由NIOConnection的register方法實現(xiàn)的,NIOConnection接口的抽象類是AbstractConnection,我們來看它實現(xiàn)的register方法:
@Override public void register(Selector selector) throws IOException { try { // 該連接只監(jiān)聽socket可讀事件 processKey = channel.register(selector, SelectionKey.OP_READ, this); isRegistered = true; } finally { if (isClosed.get()) { clearSelectionKey(); } } }
我們發(fā)現(xiàn),前端連接注冊選擇器時,只監(jiān)聽了可讀事件。這是考慮到,Java的NIO屬于水平觸發(fā)LT(只要滿足條件,就觸發(fā)一個事件),使用水平觸發(fā)時,如果應用程序不需要寫就不要關注socket可寫的事件,否則就會無限次地立即返回write ready notification,長期關注socket可寫事件會出現(xiàn)CPU打滿的情況,所以在使用JDK的NIO編程時,如果沒有數(shù)據(jù)往外寫,就取消寫事件,有數(shù)據(jù)往外寫時再注冊寫事件。
FrontendConnection繼承了AbstractConnection,它又重新實現(xiàn)了register方法,代碼如下:
@Override public void register(Selector selector) throws IOException { // 調用父類的register方法 super.register(selector); if (!isClosed.get()) { // 生成認證數(shù)據(jù) byte[] rand1 = RandomUtil.randomBytes(8); byte[] rand2 = RandomUtil.randomBytes(12); // 保存認證數(shù)據(jù) byte[] seed = new byte[rand1.length + rand2.length]; System.arraycopy(rand1, 0, seed, 0, rand1.length); System.arraycopy(rand2, 0, seed, rand1.length, rand2.length); this.seed = seed; // 發(fā)送握手數(shù)據(jù)包 HandshakePacket hs = new HandshakePacket(); hs.packetId = 0; hs.protocolVersion = Versions.PROTOCOL_VERSION; hs.serverVersion = Versions.SERVER_VERSION; hs.threadId = id; hs.seed = rand1; hs.serverCapabilities = getServerCapabilities(); hs.serverCharsetIndex = (byte) (charsetIndex & 0xff); hs.serverStatus = 2; hs.restOfScrambleBuff = rand2; // 異步寫入Handshake包 hs.write(this); } }
該方法生成了HandShake包,和上面結構圖相一致,關鍵是最后異步寫入HandShake包的write方法,代碼如下:
public void write(FrontendConnection c) { // 分配緩存 ByteBuffer buffer = c.allocate(); // 將HandShake包寫入緩存 BufferUtil.writeUB3(buffer, calcPacketSize()); buffer.put(packetId); buffer.put(protocolVersion); BufferUtil.writeWithNull(buffer, serverVersion); BufferUtil.writeUB4(buffer, threadId); BufferUtil.writeWithNull(buffer, seed); BufferUtil.writeUB2(buffer, serverCapabilities); buffer.put(serverCharsetIndex); BufferUtil.writeUB2(buffer, serverStatus); buffer.put(FILLER_13); // buffer.position(buffer.position() + 13); BufferUtil.writeWithNull(buffer, restOfScrambleBuff); // 將ByteBuffer中的數(shù)據(jù)異步寫入Socket c.write(buffer); }
我們再來看最后一行的write方法:
@Override public void write(ByteBuffer buffer) { // 檢查連接是否關閉,若關閉則將緩存回收 if (isClosed.get()) { processor.getBufferPool().recycle(buffer); return; } if (isRegistered) { try { // 將緩存先插入對隊列中,其實就是一個循環(huán)數(shù)組,如數(shù)組已滿,則 wait; // 這個隊列是AbstractConnection的一個成員變量 writeQueue.put(buffer); } catch (InterruptedException e) { error(ErrorCode.ERR_PUT_WRITE_QUEUE, e); return; } // 插入隊列后,調用NIOProcessor的postWrite方法,其實就是NIOReacor的postWrite方法 processor.postWrite(this); } else { // 若連接未注冊,也回收緩存 processor.getBufferPool().recycle(buffer); close(); } }
我們看NIOReactor的postWrite方法:
final void postWrite(NIOConnection c) { reactorW.writeQueue.offer(c); }
其實是將連接插入到W線程的writeQueue阻塞隊列中,我們再來看W線程的run方法,
@Override public void run() { NIOConnection c = null; for (;;) { try { if ((c = writeQueue.take()) != null) { write(c); } } catch (Throwable e) { LOGGER.warn(name, e); } } } private void write(NIOConnection c) { try { c.writeByQueue(); } catch (Throwable e) { c.error(ErrorCode.ERR_WRITE_BY_QUEUE, e); } }
輪詢阻塞隊列,若隊列不為空,則取出連接,基于隊列寫方法writeByQueue將緩存中的數(shù)據(jù)寫入socket,下一節(jié)再分析writeByQueue方法。
總結閱讀源碼后,發(fā)現(xiàn)Cobar從前端連接的accept并注冊selector到發(fā)送Handshake包都是異步,本質是將連接插入到R線程和W線程的阻塞隊列中,不立即進行注冊和寫操作,從而實現(xiàn)整個過程的異步化,提高了Cobar的吞吐量。
以上。
原文鏈接https://segmentfault.com/a/11...
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/70582.html
摘要:的使用方法就不多介紹了,本文的主要內容是剖析的源代碼。而又有一個私有的靜態(tài)變量,以及獲取這個私有靜態(tài)變量的靜態(tài)方法,顯然,這是一個單例設計模式,使程序運行的時候全局只有一個對象。 簡介 當業(yè)務的數(shù)據(jù)量和訪問量急劇增加的情況下,我們需要對數(shù)據(jù)進行水平拆分,從而降低單庫的壓力,并且數(shù)據(jù)的水平拆分需要對業(yè)務透明,屏蔽掉水平拆分的細節(jié)。并且,前端業(yè)務的高并發(fā)會導致后端的數(shù)據(jù)庫連接過多,從而DB...
摘要:淘寶定制基于,是國內第一個優(yōu)化定制且開源的服務器版虛擬機。數(shù)據(jù)庫開源數(shù)據(jù)庫是基于官方版本的一個分支,由阿里云數(shù)據(jù)庫團隊維護,目前也應用于阿里巴巴集團業(yè)務以及阿里云數(shù)據(jù)庫服務。淘寶服務器是由淘寶網發(fā)起的服務器項目。 Java JAVA 研發(fā)框架 SOFAStack SOFAStack(Scalable Open Financial Architecture Stack)是用于快速構建金融...
閱讀 1756·2021-11-24 09:39
閱讀 1686·2021-11-22 15:22
閱讀 1003·2021-09-27 13:36
閱讀 3230·2021-09-24 10:34
閱讀 3329·2021-07-26 23:38
閱讀 2633·2019-08-29 16:44
閱讀 974·2019-08-29 16:39
閱讀 1104·2019-08-29 16:20