国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專欄INFORMATION COLUMN

BIO、偽異步 IO、AIO和NIO

ideaa / 3222人閱讀

摘要:采用通信模型的服務端通常由一個獨立的線程負責監聽客戶端的連接它接收到客戶端連接請求之后為每個客戶端創建一個新的線程進行鏈路處理處理完成之后通過輸出流返回應答給客戶端線程銷毀這就是典型的一請求一應答通信模型該模型最大的問題就是缺乏彈性伸縮能力

BIO

采用 BIO 通信模型的服務端, 通常由一個獨立的 Acceptor 線程負責監聽客戶端的連接, 它接收到客戶端連接請求之后為每個客戶端創建一個新的線程進行鏈路處理, 處理完成之后, 通過輸出流返回應答給客戶端, 線程銷毀. 這就是典型的一請求一應答通信模型.

該模型最大的問題就是缺乏彈性伸縮能力, 當客戶端并發訪問量增加后, 服務端的線程個數和客戶端并發訪問數是 1:1 的關系.

當線程數過多之后, 系統性能就會下降, 系統也會發生線程堆棧溢出、創建新線程失敗等問題, 最終導致進程宕機或者僵死, 不能對外提供服務.

BIO 通信模型圖

偽異步 IO

后端通過維護一個消息隊列和 N 個活躍線程, 來處理多個客戶端的請求接入, 當有新的客戶端接入時, 將客戶端的 Socket 封裝成一個 Task (java.lang.Runnable 接口) 放入后端線的線程池進行處理.

由于線程池可以設置消息隊列的大小和最大線程數, 因此它的資源占用是可控的, 無論多少個客戶端并發訪問, 都不會導致資源耗盡和宕機.

客戶端個數 M, 線程池最大線程數 N 的比例關系, 其中 M 可以遠遠大于 N.

注意: 當對 Socket 的輸入流進行讀取操作的時候,它會一直阻塞轄區, 直到發生如下三種事件:

有數據可讀.

可用數據已經讀取完畢.

發生空指針或IO異常.

偽異步 IO 模型圖

弊端

當對方發送請求或應答消息比較緩慢, 或者網絡傳輸比較慢時, 讀取輸入流一方的通信線程將被長時間阻塞, 如果對方要 60s 才能將數據發送完成, 讀取一方的 IO 線程也將會被同步阻塞 60s, 在此期間, 其它接入消息只能在消息隊列中排隊.

假如所有的可用線程都被故障服務器阻塞, 那后續所有的 IO 消息都將在隊列中排隊.

由于線程池采用阻塞隊列實現, 當隊列積滿之后, 后續入隊列的操作將被阻塞.

由于前端只有一個 Accptor 線程接收客戶端接入, 它被阻塞在線程池的同步阻塞隊列之后, 新的客戶端請求消息將被拒絕, 客戶端會發生大量的連接超時.

NIO

與 Socket 類和 ServerSocket 類相對應, NIO 也提供了 SocketChannel 和 ServerSocketChannel 兩種不同的套接字通道實現. 這兩種新增的通道都支持阻塞和非阻塞兩種模式.

一般來說, 低負載、低并發的應用程序可以選擇同步阻塞IO以降低編程復雜度; 對于高負載、高并發的網絡模型應用, 需要使用NIO的非阻塞模式進行開發.

緩沖區 Buffer

Buffer 是一個對象, 它包含一些要寫入或要讀出的數據. 在NIO庫中, 所有數據都是用緩沖區處理的. 在讀取數據時, 它是直接讀取到緩沖區中的; 在寫入數據時, 寫入到緩沖區中. 任何時候訪問NIO中的數據, 都是通過緩沖區進行操作的.

緩沖區實質上是一個數組. 通常它是一個字節數組, 也可以使用其他種類的數組. 但是一個緩沖區不僅僅是一個數組, 緩沖區提供了對數據的結構化訪問以及維護讀寫位置等信息.

常用緩沖區是 ByteBuffer, 一個 ByteBuffer 提供了一種功能用于操作 byte 數組. 除了 ByteBuffer, 還有其他的一些緩沖區.

ByteBuffer: 字節緩沖區

CharBuffer: 字符緩沖區

ShortBuffer: 短整形緩沖區

IntBuffer: 整形緩沖區

LongBuffer: 長整形緩沖區

FloatBuffer: 浮點型緩沖區

DoubleBuffer: 雙精度浮點型緩沖區

每一個 Buffer 類都是 Buffer 接口的一個子實例. 除了 ByteBuffer, 每個 Buffer 類都有完全一樣的操作, 只是它們所處理的類型不一樣.

通道 Channel

Channel 是一個通道, 網絡數據通過 Channel 讀取和寫入. 通道與流的不同之處在于通道是雙向的, 流只是在一個方向上移動(一個流必須是 InputStream 或者 OutputStream), 而通道可以用于讀、寫或者二者同時進行.

Java NIO中最重要的幾個Channel的實現:

FileChannel: 用于文件的數據讀寫

DatagramChannel: 用于UDP的數據讀寫

SocketChannel: 用于TCP的數據讀寫. 一般是客戶端實現

ServerSocketChannel: 允許我們監聽TCP鏈接請求, 每個請求會創建會一個SocketChannel. 一般是服務器實現

多路復用器 Selector

多路復用器提供選擇已經就緒的任務的能力. 簡單來講, Selector 會不斷的輪詢注冊在其上的 Channel, 如果某個 Channel 上面發生讀或寫事件, 這個 Channel 就處于就緒狀態, 會被 Selector 輪詢出來, 然后通過 SelectionKey 可以獲取就緒 Channel 的集合, 進行后續的 IO 操作.

一個多路復用器 Selector 可以同時輪詢多個 Channel, 由于 JDK 使用了 epoll() 代替傳統的 select 實現, 所以它并沒有最大連接句柄 1024/2048 的限制. 這也意味著只需要一個線程負責 Selector 的輪詢, 就可以接入成千上萬的客戶端.

NIO 服務端序列圖

NIO創建的 TimeServer 源碼分析
public class MultiplexerTimeServer implements Runnable {

    private Selector selector;

    private ServerSocketChannel servChannel;

    private volatile boolean stop;

    /**
     * 初始化多路復用器、綁定監聽端口
     *
     * @param port
     */
    public MultiplexerTimeServer(int port) {
        try {
            // 創建多路復用器
            selector = Selector.open();
            
            // 打開 ServerSocketChannel 用來監聽客戶端的連接, 它是所有客戶端連接的父管道.
            servChannel = ServerSocketChannel.open();
            
            // 設置 ServerSocketChannel 為異步非阻塞模式
            servChannel.configureBlocking(false);
            
            // 綁定地址和端口
            servChannel.socket().bind(new InetSocketAddress(port), 1024);
            
            // 將 ServerSocketChannel 注冊到 Reactor 線程的多路復用器 Selector 上, 監聽 ACCEPT 事件
            servChannel.register(selector, SelectionKey.OP_ACCEPT);
            System.out.println("The time server is start in port : " + port);
        } catch (IOException e) {
            e.printStackTrace();
            System.exit(1);
        }
    }

    public void stop() {
        this.stop = true;
    }

    /*
     * (non-Javadoc)
     *
     * @see java.lang.Runnable#run()
     */
    @Override
    public void run() {
        while (!stop) {
            try {
                selector.select(1000);
                Set selectedKeys = selector.selectedKeys();
                Iterator it = selectedKeys.iterator();
                SelectionKey key = null;
                while (it.hasNext()) {
                    key = it.next();
                    it.remove();
                    try {
                        handleInput(key);
                    } catch (Exception e) {
                        if (key != null) {
                            key.cancel();
                            if (key.channel() != null) {
                                key.channel().close();
                            }
                        }
                    }
                }
            } catch (Throwable t) {
                t.printStackTrace();
            }
        }

        // 多路復用器關閉后,所有注冊在上面的Channel和Pipe等資源都會被自動去注冊并關閉,所以不需要重復釋放資源
        if (selector != null) {
            try {
                selector.close();
            } catch (IOException e) {
                e.printStackTrace();
            } 
        }
            
    }

    private void handleInput(SelectionKey key) throws IOException {

        if (key.isValid()) {
            // 處理新接入的請求消息
            if (key.isAcceptable()) {
                // Accept the new connection
                ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
                SocketChannel sc = ssc.accept();
                sc.configureBlocking(false);
                // Add the new connection to the selector
                sc.register(selector, SelectionKey.OP_READ);
            }
            if (key.isReadable()) {
                // Read the data
                SocketChannel sc = (SocketChannel) key.channel();
                ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                int readBytes = sc.read(readBuffer);
                if (readBytes > 0) {
                    readBuffer.flip();
                    byte[] bytes = new byte[readBuffer.remaining()];
                    readBuffer.get(bytes);
                    String body = new String(bytes, "UTF-8");
                    System.out.println("The time server receive order : "
                    + body);
                    String currentTime = "QUERY TIME ORDER"
                .equalsIgnoreCase(body) ? new java.util.Date(
                            System.currentTimeMillis()).toString()
                            : "BAD ORDER";
                    doWrite(sc, currentTime);
                } else if (readBytes < 0) {
                    // 對端鏈路關閉
                    key.cancel();
                    sc.close();
                } else {
                    ; // 讀到0字節,忽略
                }
                
            }
        }
    }

    private void doWrite(SocketChannel channel, String response)
            throws IOException {
        if (response != null && response.trim().length() > 0) {
            byte[] bytes = response.getBytes();
            ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
            writeBuffer.put(bytes);
            writeBuffer.flip();
            channel.write(writeBuffer);
        }
    }
}

selector.select(1000); 休眠時間為1S, 無論是否有讀寫等事件發生, selector 每隔 1S 都被喚醒一次, selector 也提供了一個無參的 select 方法. 當有處于就緒狀態的 Channel 時, selector 將返回就緒狀態的 Channel 的 SelectionKey 集合, 我們通過對就緒狀態的 Channel 集合進行迭代, 就可以進行網絡的異步讀寫操作.

key.isAcceptable() 來判斷 當前 SelectionKey 的通道是否已準備好接受新的套接字連接(處理新接入的客戶端請求消息). 通過 ServerSocketChannelaccept 接收客戶端的連接請求并創建 SocketChannel 實例, 完成上述操作后, 相當于完成了TCP的三次握手, TCP物理鏈路正式建立. 注意,我們需要將新創建的 SocketChannel 設置為異步非阻塞, 同時也可以對其TCP參數進行設置, 例如TCP接收和發送緩沖區的大小等.

根據 SelectionKey 的操作位進行判斷即可獲知網絡事件的類型, 如 isAcceptable() 表示為 OP_ACCEPT 

key.isAcceptable() 來判斷 當前 SelectionKey 的通道是否已準備好進行讀取(讀取客戶端的請求消息). 首先創建一個 ByteBuffer, 由于我們事先無法得知客戶端發送的碼流大小, 作為例程, 我們開辟一個1M的緩沖區. 然后調用 SocketChannelread 方法讀取請求碼流, 注意, 由于我們已經將 SocketChannel 設置為異步非阻塞模式, 因此它的 read 是非阻塞的. 使用返回值進行判斷, 看讀取到的字節數, 返回值有三種可能的結果:

返回值大于0: 讀到了字節, 對字節進行編解碼;

返回值等于0: 沒有讀取到字節, 屬于正常場景, 忽略;

返回值為-1: 鏈路已經關閉, 需要關閉SocketChannel, 釋放資源.

當讀取到碼流以后, 我們進行解碼, 首先對 readBuffer 進行 flip 操作, 它的作用是將緩沖區當前的limit 設置為 position, position 設置為0, 用于后續對緩沖區的讀取操作.

然后根據緩沖區可讀的字節個數創建字節數組, 調用 ByteBufferget 操作將緩沖區可讀的字節數組拷貝到新創建的字節數組中, 最后調用字符串的構造函數創建請求消息體并打印. 如果請求指令是 ”QUERY TIME ORDER” 則把服務器的當前時間編碼后返回給客戶端, 下面我們看看如果異步發送應答消息給客戶端.

doWrite 方法將消息異步發送給客戶端, 首先將字符串編碼成字節數組, 根據字節數組的長度創建 ByteBuffer, 調用 ByteBufferput 操作將字節數組拷貝到緩沖區中, 然后對緩沖區進行flip操作, 最后調用 SocketChannelwrite 方法將緩沖區中的字節數組發送出去.

需要指出的是, 由于 SocketChannel 是異步非阻塞的, 它并不保證一次能夠把需要發送的字節數組發送完, 此時會出現“寫半包”問題, 我們需要注冊寫操作, 不斷輪詢 Selector 將沒有發送完的 ByteBuffer 發送完畢, 可以通過 ByteBufferhasRemain() 方法判斷消息是否發送完成.

AIO 編程

NIO 2.0 引入了新的異步通道的概念, 并提供了異步文件通道和異步套接字通道的實現. 異步通道提供以下兩種方式獲取操作結果.

通過 java.util.concurrent.Future 類來表示異步操作的結果.

在執行異步操作的時候傳入一個 java.nio.channels

CompletionHandler 接口的實現類作為操作完成的回調.

NIO 2.0 的異步套接字通道是真正的異步非阻塞 IO , 對應與 UNIX 網絡編程中的事件驅動 IO. 它不需要通過多路復用器對注冊的通道進行輪詢操作即可實現異步讀寫, 從而簡化了 NIO 的編程模型.

文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。

轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/72708.html

相關文章

  • Netty序章之BIO NIO AIO演變

    摘要:后改良為用線程池的方式代替新增線程,被稱為偽異步。最大的問題是阻塞,同步。每次請求都由程序執行并返回,這是同步的缺陷。這些都會被注冊在多路復用器上。多路復用器提供選擇已經就緒狀態任務的能力。并沒有采用的多路復用器,而是使用異步通道的概念。 Netty是一個提供異步事件驅動的網絡應用框架,用以快速開發高性能、高可靠的網絡服務器和客戶端程序。Netty簡化了網絡程序的開發,是很多框架和公司...

    VincentFF 評論0 收藏0
  • Netty序章之BIO NIO AIO演變

    摘要:后改良為用線程池的方式代替新增線程,被稱為偽異步。最大的問題是阻塞,同步。每次請求都由程序執行并返回,這是同步的缺陷。這些都會被注冊在多路復用器上。多路復用器提供選擇已經就緒狀態任務的能力。并沒有采用的多路復用器,而是使用異步通道的概念。 Netty是一個提供異步事件驅動的網絡應用框架,用以快速開發高性能、高可靠的網絡服務器和客戶端程序。Netty簡化了網絡程序的開發,是很多框架和公司...

    CntChen 評論0 收藏0
  • JDK中關于BIO,NIO,AIO,同步,異步介紹

    摘要:即可以理解為,方法都是異步的,完成后會主動調用回調函數。主要在包下增加了下面四個異步通道其中的方法,會返回一個帶回調函數的對象,當執行完讀取寫入操作后,直接調用回調函數。 本文原創地址,我的博客:jsbintask.cn/2019/04/16/…(食用效果最佳),轉載請注明出處! 在理解什么是BIO,NIO,AIO之前,我們首先需要了解什么是同步,異步,阻塞,非阻塞。假如我們現在要去銀行取...

    opengps 評論0 收藏0
  • 慕課網_《Netty入門之WebSocket初體驗》學習總結

    時間:2018年04月11日星期三 說明:本文部分內容均來自慕課網。@慕課網:https://www.imooc.com 教學源碼:https://github.com/zccodere/s... 學習源碼:https://github.com/zccodere/s... 第一章:課程介紹 1-1 課程介紹 什么是Netty 高性能、事件驅動、異步非阻塞的IO Java開源框架 基于NIO的客戶...

    Noodles 評論0 收藏0
  • JAVA IO BIO NIO AIO

    摘要:三同步非阻塞式以塊的方式處理數據面向緩存區的采用多路復用模式基于事件驅動是實現了的一個流行框架,的。阿里云分布式文件系統里用的就是。四異步非阻塞式基于事件驅動,不需要多路復用器對注冊通道進行輪詢,采用設計模式。 一、什么是IO IO 輸入、輸出 (read write accept)IO是面向流的 二、BIO BIO是同步阻塞式IO 服務端與客戶端進行三次握手后一個鏈路建立一個線程面...

    freecode 評論0 收藏0

發表評論

0條評論

最新活動
閱讀需要支付1元查看
<