摘要:使用線程池可以節省那種系統開銷,同時允許實現者利用并行硬件的優勢。但是對于連接生存期比較長的協議來說,線程池的大小仍然限制了系統同時可以處理的客戶端數量。
NIO主要包含兩部分,Selector和Channel、Buffer。
為什么需要需要NIO基本的Java套接字對于小規模系統可以很好地運行,但當涉及同時處理上千個客戶端的服務器時,可能就會產生一些問題。由于創建、維護、切換線程需要的系統開銷,一客戶一線程的方式在系統擴展性方面受到了限制。使用線程池可以節省那種系統開銷,同時允許實現者利用并行硬件的優勢。
但是對于連接生存期比較長的協議來說,線程池的大小仍然限制了系統同時可以處理的客戶端數量。
另外對于服務器需要由不同客戶端同時訪問和修改的信息時,對于多線程就得進行同步,這會變得更加復雜,即使用同步機制將增加更多的系統調度和上下文切換開銷,而程序員對這些開銷又無法控制。
由于多線程的同步的復雜性,一些程序員寧愿繼續使用單線程方法,這類服務器只用一個線程來處理所有客戶端——不是順序處理,而是一次全部處理。這種服務器不能為任何客戶端提供I/O操作的阻塞等待,而必須排他地使用非阻塞方式(nonblocking)I/O。
前面的while true,不斷地輪詢(poll)accept方法,這種忙等(busy waiting)方法會引入系統開銷,因為程序需要反復循環地連接I/O源,卻又發現什么都不用做。
我們需要一種方法來一次輪詢一組客戶端,以查找那個客戶端需要服務,這正是NIO要介紹的Selector和Channel的抽象關鍵點。
一個Channel實例代表了一個可輪詢(pollable)的I/O目標,如套接字(或一個文件、設備等)。Channel能夠注冊一個Selector類的實例。
Selector的select方法允許你詢問在一組信道中,哪一個當前需要服務(被接受、讀、寫)。
Stream的抽象,好處是隱藏了底層緩沖區的有限性,提供了一個能夠容納任意長度數據的容器的假象。要實現這樣一個假象,要么會產生大量的內存開銷,要么會引入大量的上下文切換,不好控制。
使用Buffer抽象的原因是:Buffer抽象代表了一個有限容量(finite-capacity)的數據容器——其本質是一個數組,由指針指示了在哪存放數據和從哪讀取數據。使用Buffer的好處是:
1)與讀寫緩沖區數據相關聯的系統開銷都暴露給了程序員。例如,如果想要向緩沖區存入數據,但是又沒有足夠的空間時,就必須采取一些措施來獲得空間(即移出一些數據,或移開已經在那個位置的數據來獲得空間,或者創建一個新的新的實例)。這意味著需要額外的工作,但是你可以控制它什么時候發生,如何發生,以及是否發生。
2)一些對Java對象的特殊Buffer映射操作能夠直接操作底層平臺的資源(例如操作系統的緩沖區),這些操作節省了在不同地址空間中復制數據的開銷。
綜上,Channel實例代表了一個與設備的連接,通過它可以進行輸入輸出操作。信道(channel)和套接字(socket)的不同之處在于:channel通常需要調用靜態工廠方法來獲取實例。channel使用的不是流,而是使用緩沖區來發送或讀取數據。
Buffer有固定的、有限的容量,并由內部狀態記錄了有多少數據放入或取出,就像是一個有限容量的隊列一樣。
SelectorNIO的強大功能部分來自于channel的非阻塞特性。accept可能因為等待一個客戶端連接而阻塞,read可能因為沒有數據可讀而阻塞,直到連接的另一端傳來新數據。
總的來說,創建/接收連接或讀寫數據等I/O調用,都可能無限期地阻塞等待,直到底層的網絡實現發生了什么。慢速的、有損耗的網絡,或僅僅是簡單的網絡故障都可能導致任意時間的延遲。
而NIO則立即返回:
public class TCPEchoClientNonblocking { public static void main(String args[]) throws Exception { if ((args.length < 2) || (args.length > 3)) // Test for correct # of args throw new IllegalArgumentException("Parameter(s):[ ]"); String server = args[0]; // Server name or IP address // Convert input String to bytes using the default charset byte[] argument = args[1].getBytes(); int servPort = (args.length == 3) ? Integer.parseInt(args[2]) : 7; // Create channel and set to nonblocking SocketChannel clntChan = SocketChannel.open(); clntChan.configureBlocking(false); // Initiate connection to server and repeatedly poll until complete if (!clntChan.connect(new InetSocketAddress(server, servPort))) { while (!clntChan.finishConnect()) { System.out.print("."); // Do something else } } ByteBuffer writeBuf = ByteBuffer.wrap(argument); ByteBuffer readBuf = ByteBuffer.allocate(argument.length); int totalBytesRcvd = 0; // Total bytes received so far int bytesRcvd; // Bytes received in last read while (totalBytesRcvd < argument.length) { if (writeBuf.hasRemaining()) { clntChan.write(writeBuf); } if ((bytesRcvd = clntChan.read(readBuf)) == -1) { throw new SocketException("Connection closed prematurely"); } totalBytesRcvd += bytesRcvd; System.out.print("."); // Do something else } System.out.println("Received: " + // convert to String per default charset new String(readBuf.array(), 0, totalBytesRcvd)); clntChan.close(); } }
上面的輪詢僅僅是演示用。
需要使用Selector類來避免忙等的輪詢。考慮一個即時的消息服務器,可能有上千個客戶端同時連接到了服務器,但任何時刻都只有非常少量的消息需要讀取和分發。這就需要一種方法阻塞等待,直到至少有一個信道可以進行I/O操作,并指出是哪個信道。NIO的選擇器就實現了這樣的功能。一個Selector實例可以同時檢查一組信道的I/O狀態。用專業術語來說,選擇器就是一個多路開關選擇器,因為一個選擇器能夠管理多個信道上的I/O操作。
要使用選擇器,需要創建一個Selector實例并將其注冊到想要監控的信道上(注意,這要通過channel的方法實現,而不是使用selector的方法)。最后,調用選擇器的select方法,該方法會阻塞等待,直到還有一個或更多的信道準備好了I/O操作或等待超時。select方法返回可進行I/O操作的信道數量。
public class TCPServerSelector { private static final int BUFSIZE = 256; // Buffer size (bytes) private static final int TIMEOUT = 3000; // Wait timeout (milliseconds) public static void main(String[] args) throws IOException { if (args.length < 1) { // Test for correct # of args throw new IllegalArgumentException("Parameter(s):..."); } // Create a selector to multiplex listening sockets and connections Selector selector = Selector.open(); // Create listening socket channel for each port and register selector for (String arg : args) { ServerSocketChannel listnChannel = ServerSocketChannel.open(); listnChannel.socket().bind(new InetSocketAddress(Integer.parseInt(arg))); listnChannel.configureBlocking(false); // must be nonblocking to register // Register selector with channel. The returned key is ignored listnChannel.register(selector, SelectionKey.OP_ACCEPT); } // Create a handler that will implement the protocol TCPProtocol protocol = new EchoSelectorProtocol(BUFSIZE); while (true) { // Run forever, processing available I/O operations // Wait for some channel to be ready (or timeout) if (selector.select(TIMEOUT) == 0) { // returns # of ready chans System.out.print("."); continue; } // Get iterator on set of keys with I/O to process Iterator keyIter = selector.selectedKeys().iterator(); while (keyIter.hasNext()) { SelectionKey key = keyIter.next(); // Key is bit mask // Server socket channel has pending connection requests? if (key.isAcceptable()) { protocol.handleAccept(key); } // Client socket channel has pending data? if (key.isReadable()) { protocol.handleRead(key); } // Client socket channel is available for writing and // key is valid (i.e., channel not closed)? if (key.isValid() && key.isWritable()) { protocol.handleWrite(key); } keyIter.remove(); // remove from set of selected keys } } } }
由于select方法只是向selector所關聯的鍵集合中添加元素,因此,如果不移除每個處理過的鍵,它就會在下次調用select方法時仍然保留在集合中,而且可能會有無用的操作來調用它。
具體的處理方法
public class EchoSelectorProtocol implements TCPProtocol { private int bufSize; // Size of I/O buffer public EchoSelectorProtocol(int bufSize) { this.bufSize = bufSize; } public void handleAccept(SelectionKey key) throws IOException { SocketChannel clntChan = ((ServerSocketChannel) key.channel()).accept(); clntChan.configureBlocking(false); // Must be nonblocking to register // Register the selector with new channel for read and attach byte buffer clntChan.register(key.selector(), SelectionKey.OP_READ, ByteBuffer .allocate(bufSize)); } public void handleRead(SelectionKey key) throws IOException { // Client socket channel has pending data SocketChannel clntChan = (SocketChannel) key.channel(); ByteBuffer buf = (ByteBuffer) key.attachment(); long bytesRead = clntChan.read(buf); if (bytesRead == -1) { // Did the other end close? clntChan.close(); } else if (bytesRead > 0) { // Indicate via key that reading/writing are both of interest now. key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE); } } public void handleWrite(SelectionKey key) throws IOException { /* * Channel is available for writing, and key is valid (i.e., client channel * not closed). */ // Retrieve data read earlier ByteBuffer buf = (ByteBuffer) key.attachment(); buf.flip(); // Prepare buffer for writing SocketChannel clntChan = (SocketChannel) key.channel(); clntChan.write(buf); if (!buf.hasRemaining()) { // Buffer completely written? // Nothing left, so no longer interested in writes key.interestOps(SelectionKey.OP_READ); } buf.compact(); // Make room for more data to be read in } }
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/65929.html
摘要:學習和掌握技術已經不是一個攻城獅的加分技能,而是一個必備技能。是雙向的,不僅可以讀取數據還能保存數據,程序不能直接讀寫通道,只與緩沖區交互為了讓大家不被高并發與大量連接處理問題所困擾,動力節點推出了高效處理模型應用教程。 大家肯定了解Java IO, 但是對于NIO一般是陌生的,而現在使用到NIO的場景越來越多,很多技術框...
摘要:內存溢出的情況就是從類加載器加載的時候開始出現的,內存溢出分為兩大類和。以下舉出個內存溢出的情況,并通過實例代碼的方式講解了是如何出現內存溢出的。內存溢出問題描述元空間的溢出,系統會拋出。這樣就會造成棧的內存溢出。 導言: 對于java程序員來說,在虛擬機自動內存管理機制的幫助下,不需要自己實現釋放內存,不容易出現內存泄漏和內存溢出的問題,由虛擬機管理內存這一切看起來非常美好,但是一旦...
摘要:基礎知識基礎語法基礎知識編程第一步基礎知識基本數據類型基礎知識解釋器基礎知識注釋基礎知識運算符基礎知識數字基礎知識字符串基礎知識列表基礎知識元組基礎知識字典基礎知識條件控制基礎知識循環基礎知識迭代器與生成器基礎知識函數基礎知識數據結構基礎知 Python3基礎知識 | 基礎語法?Python3基礎知識 | 編程第一步?Python3基礎知識 | 基本數據類型Python3基礎知識 | ...
閱讀 2423·2021-10-09 09:59
閱讀 2177·2021-09-23 11:30
閱讀 2591·2019-08-30 15:56
閱讀 1145·2019-08-30 14:00
閱讀 2939·2019-08-29 12:37
閱讀 1253·2019-08-28 18:16
閱讀 1656·2019-08-27 10:56
閱讀 1022·2019-08-26 17:23