摘要:依照該方案,虛擬內存空間的頁面能夠繼續存在于外部磁盤存儲,這樣就為物理內存中的其他虛擬頁面騰出了空間。造成頁錯誤的用戶進程對此不會有絲毫察覺,一切都在不知不覺中進行。虛擬內存系統俘獲頁錯誤,安排頁面調入,從磁盤上讀取頁內容,使頁有效。
本筆記主要針對JAVA NIO第1-4章,做一下總結,豆瓣評分7.5,但本人還是強烈推薦的.對JDK 1.4的NIO接口做了很充分的講解.
I/O概念所謂“I(輸入)/O(輸出)”講的無非就是把數據移進或移出緩沖區.
進程執行 I/O 操作,歸結起來,也就是向操作系統發出請求,讓它要么把緩沖區里的數據排干 (寫),要么用數據把緩沖區填滿(讀).
緩沖區操作
如上圖所示,進程使用read( )系統調用,要求其緩沖區被填滿。內核隨即向磁盤控制硬件發出命令,要求其從磁盤讀取數據。磁盤控制器把數據直接寫入內核內存緩沖區,這一步通過 DMA 完成,無需主 CPU 協助。一旦磁盤控制器把緩沖區裝滿,內核即把數據從內核空間的臨時緩沖區拷貝到進程執行read( )調用時指定的緩 沖區。
注意圖中用戶空間和內核空間的概念。用戶空間是常規進程所在區域。JVM 就是常規進程,駐守于用戶空間。用戶空間是非特權區域:比如,在該區域執行的代碼就不能直接訪問硬件設備。 內核空間是操作系統所在區域。內核代碼有特別的權力:它能與設備控制器通訊,控制著用戶區域 進程的運行狀態,等等。最重要的是,所有 I/O 都直接(如這里所述)或間接通過內核空間。
為什么不直接 讓磁盤控制器把數據送到用戶空間的緩沖區呢?
1.硬件通常不能直接訪問 用戶空間. 2.像磁盤這樣基于塊存儲的硬件設備操作的是固定大小的數據塊,而用戶進程請 求的可能是任意大小的或非對齊的數據塊。在數據往來于用戶空間與存儲設備的過程中,內核負責數據的分解、再組合工作,因此充當著中間人的角色
發散/匯聚
如上圖所示,發散/匯聚就是操作系統為了提升性能同時操縱多個緩存區的情況,這樣就不用多次執行系統調用了.
虛擬內存
虛擬內存意為使用虛假(或虛擬)地址取代物理(硬件RAM)內存地址
優點:
1.一個以上的虛擬地址可指向同一個物理內存地址 2.虛擬內存空間可大于實際可用的硬件內存
利用一個以上的虛擬地址可指向同一個物理內存地址把內核空間地址與用戶空間的虛擬地址映射到同一個物理地址,這樣, DMA 硬件(只能訪問物理內存地址)就可以填充對內核與用戶空間進程同時可見的緩沖區(上圖)。
前提條件是,內核與用戶緩沖區必須使用相同的頁對齊,緩沖區的大小還必須是磁盤控制器塊大小(通常為 512 字節磁盤扇區)的倍數。
內存頁面調度
為了支持虛擬內存的第二個特性(尋址空間大于物理內存),就必須進行虛擬內存分頁(經常稱為交換,雖然真正的交換是在進程層面完成,而非頁層面)。依照該方案,虛擬內存空間的頁面能夠繼續存在于外部磁盤存儲,這樣就為物理內存中的其他虛擬頁面騰出了空間。從本質上說,物理內存充當了分頁區的高速緩存;而所謂分頁區,即從物理內存置換出來,轉而存儲于磁盤上的內存頁面.
MMU:現代 CPU 包含一個稱為內存管理單元(MMU)的子系統,邏輯上位于 CPU 與物理內存之間。該設備包含虛擬地址向物理內存地址轉換時所需映射信息。當 CPU 引用某內存地址時,MMU負責確定該地址所在頁(往往通過對地址值進行移位或屏蔽位操作實現),并將虛擬頁號轉換為物理頁號(這一步由硬件完成,速度極快)。如果當前不存在與該虛擬頁形成有效映射的物理內存頁,MMU 會向CPU交一個頁錯誤.
頁錯誤隨即產生一個陷阱(類似于系統調用),把控制權移交給內核,附帶導致錯誤的虛擬地址信息,然后內核采取步驟驗證頁的有效性。內核會安排頁面調入操作,把缺失的頁內容讀回物理內存。這往往導致別的頁被移出物理內存,好給新來的頁讓地方。在這種情況下,如果待移出的頁已經被碰過了(自創建或上次頁面調入以來,內容已發生改變),還必須首先執行頁面調出,把頁內容拷貝到磁盤上的分頁區。
如果所要求的地址不是有效的虛擬內存地址(不屬于正在執行的進程的任何一個內存段),則該頁不能通過驗證,段錯誤隨即產生。于是,控制權轉交給內核的另一部分,通常導致的結果就是進程被強令關閉。
一旦出錯的頁通過了驗證,MMU隨即更新,建立新的虛擬到物理的映射(如有必要,中斷被移出頁的映射),用戶進程得以繼續。造成頁錯誤的用戶進程對此不會有絲毫察覺,一切都在不知 不覺中進行。
#### 文件I/O
文件 I/O 屬文件系統范疇,文件系統與磁盤迥然不同。磁盤把數據存在扇區上,通常一個扇區512字節。磁盤屬硬件設備,對何謂文件一無所知,它只是 供了一系列數據存取窗口。在這點上,磁盤扇區與內存頁頗有相似之處:都是統一大小,都可作為大的數組被訪問。
文件系統是更高層次的抽象,是安排、解釋磁盤(或其他隨機存取塊設備)數據的一種獨特方式。您所寫代碼幾乎無一例外地要與文件系統打交道,而不是直接與磁盤打交道。是文件系統定義了文件名、路徑、文件、文件屬性等抽象概念。
采用分頁技術的操作系統執行 I/O 的全過程可總結為以下幾步:
確定請求的數據分布在文件系統的哪些頁(磁盤扇區組)。磁盤上的文件內容和元數據可能跨越多個文件系統頁,而且這些頁可能也不連續。
在內核空間分配足夠數量的內存頁,以容納得到確定的文件系統頁。
在內存頁與磁盤上的文件系統頁之間建立映射。
為每一個內存頁產生頁錯誤。
虛擬內存系統俘獲頁錯誤,安排頁面調入,從磁盤上讀取頁內容,使頁有效。
一旦頁面調入操作完成,文件系統即對原始數據進行解析,取得所需文件內容或屬性信息。
內存映射文件傳統的文件I/O是通過用戶進程發布read()和write()系統調用來傳輸數據的。為了在內核空間的文件系統頁與用戶空間的內存區之間移動數據,一次以上的拷貝操作幾乎總是免不了的。這是因為,在文件系統頁與用戶緩沖區之間往往沒有一一對應關系。但是,還有一種大多數操作系統都支持的特殊類型的 I/O 操作,允許用戶進程最大限度地利用面向頁的系統I/O特性,并完全摒棄緩沖區拷貝。這就是內存映射 I/O,如圖下所示。
內存映射 I/O 使用文件系統建立從用戶空間直到可用文件系統頁的虛擬內存映射.
優點:
用戶進程把文件數據當作內存,所以無需發布read()或write()系統調用。
當用戶進程碰觸到映射內存空間,頁錯誤會自動產生,從而將文件數據從磁盤讀進內存。如果用戶修改了映射內存空間,相關頁會自動標記為臟,隨后刷新到磁盤,文件得到更新。
操作系統的虛擬內存子系統會對頁進行智能高速緩存,自動根據系統負載進行內存管理。
數據總是按頁對齊的,無需執行緩沖區拷貝。
大型文件使用映射,無需耗費大量內存,即可進行數據拷貝
Buffer
Buffer對應于上節所述概念中的緩存區.包在一個對象內的基本數據元素數組。Buffer 類相比一個簡單數組的優點是它將關于數據的數據內容和信息包含在一個單一的對象中。Buffer類以及它專有的子類定義了一個用于處理數據緩沖區的 API.有7種主要的緩沖區類,每一種都具有一種 Java 語言中的非布 類型的原始類型數據。
屬性:
//標記:一個備忘位置。調用mark( )來設定mark=postion。調用reset( )設定position= mark。標記在設定前是未定義的(undefined) private int mark = -1; //位置:下一個要被讀或寫的元素的索引。位置會自動由相應的get()和 put()函數更新。 private int position = 0; //上界:緩沖區的第一個不能被讀或寫的元素?;蛘哒f,緩沖區中現存元素的計數。 private int limit; //容量:緩沖區能夠容納的數據元素的最大數量。這一容量在緩沖區創建時被設定,并且 遠不能被改變 private int capacity;
上述屬性有以下關系
0 <= mark <= position <= limit <= capacity
基本方法:
//返回容量 public final int capacity(); //返回位置 public final int position(); //設置容量 public final Buffer position(int newPosition); //返回上屆 public final int limit() ; //標記當前position為mark public final Buffer mark(); //重回mark位置 public final Buffer reset(); //一般在把數據寫入Buffer前調用 public final Buffer clear() { position = 0; limit = capacity; mark = -1; return this; } //翻轉:將緩存字節數組的指針設置為數組的開始序列即數組下標0。這樣就可以從buffer開頭,對該buffer進行遍歷(讀?。┝? //buf.put(magic); Prepend header //in.read(buf); Read data into rest of buffer //buf.flip(); Flip buffer //out.write(buf); Write header + data to channel< public final Buffer flip() { limit = position; position = 0; mark = -1; return this; } // 一般在把數據重寫入Buffer前調用 // out.write(buf); // Write remaining data // buf.rewind(); // Rewind buffer // buf.get(array); // Copy data into array public final Buffer rewind() { position = 0; mark = -1; return this; } // limit - position public final int remaining(); // position < limit public final boolean hasRemaining(); public abstract boolean isReadOnly();
存取方法:
public abstract byte get( ); public abstract byte get (int index); public abstract ByteBuffer put (byte b); public abstract ByteBuffer put (int index, byte b);
壓縮方法:
//將position到limit之間的數據遷移至0開始處,然后limit=capacity public abstract ByteBuffer compact( );
比較方法
// true:兩個對象類型相同,兩個對象都剩余同樣數量的元素,在每個緩沖區中應被Get()函數返回的剩余數據元素序列必須一致 public boolean equals (Object ob) // 比較是針對每個緩沖區內剩余數據進行的,與它們在equals()中的方式相同,直到不相等的元素被發現或者到達緩沖區的上界。如果一個緩沖區在不相等元素發現前已經被耗盡,較短 的緩沖區被認為是小于較長的緩沖區 public int compareTo (Object ob)直接緩沖區
字節緩沖區跟其他緩沖區類型最明顯的不同在于,它們可以成為通道所執行的 I/O 的源 頭和/或目標,只有字節緩沖區有資格參與 I/O 操作。
直接緩沖區被用于與通道和固有I/O例程交互。它們通過使用固有代碼來告知操作系統直接釋放或填充內存區域,對用于通道直接或原始存取的內存區域中的字節元素的存儲盡了最大的努力。
直接字節緩沖區通常是 I/O 操作最好的選擇。在設計方面,它們支持 JVM 可用的最高效 I/O 機制。非直接字節緩沖區可以被傳遞給通道,但是這樣可能導致性能耗。通常非直接緩沖不可能成為一個本地I/O操作的目標。如果您向一個通道中傳遞一個非直接ByteBuffer對象用于寫入,通道可能會在每次調用中隱含地進行下面的操作:
創建一個臨時的直接 ByteBuffer 對象
將非直接緩沖區的內容復制到臨時緩沖區中。
使用臨時緩沖區執行低層次 I/O 操作。
臨時緩沖區對象離開作用域,并最終成為被回的無用數據。
直接緩沖區使用的內存是通過調用本地操作系統方面的代碼分配的, 過了標準JVM。建立和銷毀直接緩沖區會明顯比具有的緩沖區更加費,這取決于主操作系統以及JVM實現。直接緩沖區的內存區域不受無用存儲單元 集支配,因為它們位于標準JVM之外。
public static ByteBuffer allocateDirect (int capacity);
Channel基本接口
public interface Channel extends Closeable { public boolean isOpen(); public void close() throws IOException; }FileChannel 文件通道
文件通道總是阻塞的,不能被置于非阻模式.現代操作系統都有復雜的緩存和預取機制,使得本地磁盤 I/O 操作延遲很少。ileChannel實例只能通過在一個 打開的file對象(RandomAccessFile、FileInputStream或FileOutputStream)上調用getChannel()方法獲取.
FileChannel 對象是線程安全(thread-safe)的。多個進程可以在同一個實例上并發調用方法而 不會引起任何問題,不過并非所有的操作都是多線程的(multithreaded)。影響通道位置或者影響文件大小的操作都是單線程的(single-threaded)。如果有一個線程已經在執行會影響通道位置或文件大小的操作,那么其他試進行此類操作之一的線程必須等待。并發行為也會受到底層的操作系 統或文件系統影響。
同大多數 I/O 相關的類一樣,FileChannel 是一個反映 Java 虛擬機外部一個具體對象的抽象。 FileChannel 類保證同一個 Java 虛擬機上的所有實例看到的某個文件的 圖均是一致的,但是 Java 虛擬機卻不能對超出它控制范圍的因素 供 保。通過一個 FileChannel 實例看到的某個文件的 圖同通過一個外部的非 Java 進程看到的該文件的 圖可能一致,也可能不一致。多個進程發起的并發文件訪問的語義高度取決于底層的操作系統和(或)文件系統。一般而言,由運行在不同 Java 虛擬機上的 FileChannel 對象發起的對某個文件的并發訪問和由非Java進程發起的對該文件的并發 訪問是一致的。
新的 FileChannel 類供了一個名為map()的方法,該方法可以在一個打開的文件和一個特殊類型的ByteBuffer之間建立一個虛擬內存映射(第一章中已經歸納了什么是內存映射文件以及它們 如何同虛擬內存交互)。在FileChannel 上調用 map()方法會創建一個由磁盤文件支持的虛擬內存映射(virtual memory mapping)并在那塊虛擬內存空間外部封裝一個 MappedByteBuffer 對象.
由 map()方法返回的MappedByteBuffer對象的行為在多數方面類似一個基于內存的緩沖區,只不過該對象的數據元素存儲在磁盤上的一個文件中。調用 get()方法會從磁盤文件中獲取數據,此數據反映該文件的當前內容,即使在映射建立之后文件已經被一個外部進程做了修改。通過文件映射看到的數據同您用常規方法讀取文件看到的內容是完全一樣的。相似地,對映射的緩沖區實現一個put()會更新磁盤上的那個文件(假設對該文件您有寫的權限),并且您做的修改對于該文件的其他閱讀者也是可見的。
通過內存映射機制來訪問一個文件會比使用常規方法讀寫高效得多,甚至比使用通道的效率都高。因為不需要做明確的系統調用,那會很消耗時間。更重要的是,操作系統的虛擬內存可以自動 緩存內存頁(memory page)。這些頁是用系統內存來緩存的,所以不會消耗 Java 虛擬機內存 (memory heap). 一旦一個內存頁已經生效(從磁盤上緩存進來),它就能以完全的硬件速度再次被訪問而不需 要再次調用系統命令來獲取數據。那些包含索引以及其他需頻繁引用或更新的內容的巨大而結構化文件能因內存映射機制受益非常多。如果同時結合文件鎖定來保護關鍵區域和控制事務原子性,那您將能了解到內存映射緩沖區如何可以被很好地利用。 MemoryMappedBuffer 直接反映它所關聯的磁盤文件。如果映射有效時文件被在結構上修改, 就會產生奇 的行為(當然具體的行為是取決于操作系統和文件系統的)。MemoryMappedBuffer 有固定的大小,不過它所映射的文件卻是彈性的。具體來說,如果映射有效時文件大小變化了,那么緩沖區的部分或全部內容都可能無法訪問,并將返回未定義的數據或者拋出未檢查的異常。關于被內存映射的文件如何受其他線程或外部進程控制這一點,請務必小心對待。 所有的 MappedByteBuffer對象都是直接的,這意味著它們占用的內存空間位于 Java虛擬機內存之外(并且可能不會算作Java虛擬機的內存占用,不過這取決于操作系統的虛擬內存模型)。 因為 MappedByteBuffers 也是 ByteBuffers,所以能夠被傳遞 SocketChannel 之類通道的read()或write()以有效傳輸數據給被映射的文件或從被映射的文件讀取數據。
public abstract class MappedByteBuffer extends ByteBuffer { // 加載文件到物理內存 public final MappedByteBuffer load( ) //是否已加載 public final boolean isLoaded( ) //將緩存區的更改寫入磁盤 public final MappedByteBuffer force( ) }
當我們為一個文件建立虛擬內存映射之后,文件數據通常不會因此被從磁盤讀取到內存(這取 決于操作系統)。該過程類似打開一個文件:文件先被定位,然后一個文件句柄會被創建,當您準備好之后就可以通過這個句來訪問文件數據。對于映射緩沖區,虛擬內存系統將根據您的需要來把文件中相應區塊的數據讀進來。這個頁驗證或防錯過程需要一定的時間,因為將文件數據讀取到 內存需要一次或多次的磁盤訪問。某些場下,您可能想先把所有的頁都讀進內存以實現最小的緩沖區訪問延遲。如果文件的所有頁都是常駐內存的,那么它的訪問速度就和訪問一個基于內存的緩沖區一樣了。
load()方法會加載整個文件以使它常駐內存。正如我們在第一章所討論的,一個內存映射緩沖區會建立與某個文件的虛擬內存映射。此映射使得操作系統的底層虛擬內存子系統可以根據需要將文件中相應區塊的數據讀進內存。已經在內存中或通過驗證的頁會占用實際內存空間,并且在它們 被讀進 RAM 時會擠出最近較少使用的其他內存頁。
public class ChannelAccept { public static final String GREETING = "Hello I must be going. "; public static void main (String [] argv) throws Exception { int port = 1234; // default if (argv.length > 0) { port = Integer.parseInt (argv [0]); } ByteBuffer buffer = ByteBuffer.wrap (GREETING.getBytes( )); ServerSocketChannel ssc = ServerSocketChannel.open( ); ssc.socket( ).bind (new InetSocketAddress (port)); ssc.configureBlocking (false); while (true) { System.out.println ("Waiting for connections"); SocketChannel sc = ssc.accept( ); if (sc == null) { // no connections, snooze a while Thread.sleep (2000); } else { System.out.println ("Incoming connection from: " + sc.socket().getRemoteSocketAddress( )); buffer.rewind( ); sc.write (buffer); sc.close( ); } }ServerSocketChannel TCP服務器端
public class ConnectAsync { public static void main (String [] argv) throws Exception { String host = "localhost"; int port = 80; if (argv.length == 2) { host = argv [0]; port = Integer.parseInt (argv [1]); } InetSocketAddress addr = new InetSocketAddress (host, port); SocketChannel sc = SocketChannel.open( ); sc.configureBlocking (false); System.out.println ("initiating connection"); sc.connect (addr); while ( ! sc.finishConnect( )) { doSomethingUseful( ); } System.out.println ("connection established"); // Do something with the connected socket // The SocketChannel is still nonblocking sc.close( ); } private static void doSomethingUseful( ) { System.out.println ("doing something useless"); } }DatagramChannel UDP
public class TimeClient { private static final int DEFAULT_TIME_PORT = 37; private static final long DIFF_1900 = 2208988800L; protected int port = DEFAULT_TIME_PORT; protected List remoteHosts; protected DatagramChannel channel; public TimeClient(String[] argv) throws Exception { if (argv.length == 0) { throw new Exception("Usage: [ -p port ] host ..."); } parseArgs(argv); this.channel = DatagramChannel.open(); } protected InetSocketAddress receivePacket(DatagramChannel channel, ByteBuffer buffer) throws Exception { buffer.clear(); // Receive an unsigned 32-bit, big-endian value return ((InetSocketAddress) channel.receive(buffer)); } // Send time requests to all the supplied hosts protected void sendRequests() throws Exception { ByteBuffer buffer = ByteBuffer.allocate(1); Iterator it = remoteHosts.iterator(); while (it.hasNext()) { InetSocketAddress sa = (InetSocketAddress) it.next(); System.out.println("Requesting time from " + sa.getHostName() + ":" + sa.getPort()); // Make it empty (see RFC868) buffer.clear().flip(); // Fire and forget channel.send(buffer, sa); 112 } } // Receive any replies that arrive public void getReplies() throws Exception { // Allocate a buffer to hold a long value ByteBuffer longBuffer = ByteBuffer.allocate(8); // Assure big-endian (network) byte order longBuffer.order(ByteOrder.BIG_ENDIAN); // Zero the whole buffer to be sure longBuffer.putLong(0, 0); // Position to first byte of the low-order 32 bits longBuffer.position(4); // Slice the buffer; gives view of the low-order 32 bits ByteBuffer buffer = longBuffer.slice(); int expect = remoteHosts.size(); int replies = 0; System.out.println(""); System.out.println("Waiting for replies..."); while (true) { InetSocketAddress sa; sa = receivePacket(channel, buffer); buffer.flip(); replies++; printTime(longBuffer.getLong(0), sa); if (replies == expect) { System.out.println("All packets answered"); break; } // Some replies haven"t shown up yet System.out.println("Received " + replies + " of " + expect + " replies"); } } // Print info about a received time reply protected void printTime(long remote1900, InetSocketAddress sa) { // local time as seconds since Jan 1, 1970 113 long local = System.currentTimeMillis() / 1000; // remote time as seconds since Jan 1, 1970 long remote = remote1900 - DIFF_1900; Date remoteDate = new Date(remote * 1000); Date localDate = new Date(local * 1000); long skew = remote - local; System.out.println("Reply from " + sa.getHostName() + ":" + sa.getPort()); System.out.println(" there: " + remoteDate); System.out.println(" here: " + localDate); System.out.print(" skew: "); if (skew == 0) { System.out.println("none"); } else if (skew > 0) { System.out.println(skew + " seconds ahead"); } else { System.out.println((-skew) + " seconds behind"); } } protected void parseArgs(String[] argv) { remoteHosts = new LinkedList(); for (int i = 0; i < argv.length; i++) { String arg = argv[i]; // Send client requests to the given port if (arg.equals("-p")) { i++; this.port = Integer.parseInt(argv[i]); continue; } // Create an address object for the hostname InetSocketAddress sa = new InetSocketAddress(arg, port); // Validate that it has an address if (sa.getAddress() == null) { System.out.println("Cannot resolve address: " + arg); continue; } 114 remoteHosts.add(sa); } } public static void main(String[] argv) throws Exception { TimeClient client = new TimeClient(argv); client.sendRequests(); client.getReplies(); } }Selector
選擇器供選擇執行已經就緒的任務的能力,這使得多元I/O成為可能。就緒選擇和多元執行使得單線程能夠有效率地同時管理多個I/O通道(channels).將之前創建的一個或多個可選擇的通道注冊到選擇器對象中。一個表示通道和選擇器的鍵將會被返回。選擇鍵會記您關心的通道。它們也會對應的通道是否已經就緒。當您調用一個選擇器對象的 select()方法時,相關的鍵建會被更新, 用來檢查所有被注冊到該選擇器的通道。
在與 SelectableChannel聯合使用時,選擇器供了這種服務,但這里面有更多的事情需要去了解。就緒選擇的真正價值在于潛在的大量的通道可以同時進行就緒狀態的檢查。調用者可以輕松地決定多個通道中的哪一個準備好要運行。有兩種方式可以選擇:被激發的線程可以處于休眠狀態,直到一個或者多個注冊到選擇器的通道就緒,或者它也可以周期性地詢選擇器,看看從上次檢查之后,是否有通道處于就緒狀態。如果您考慮一下需要管理大量并發的連接的網絡服務器(webserver)的實現,就可以很容易地想到如何善加利用這些能力。
真正的就緒選擇必須由操作系統來做。操作系統的一項最重要的功能就是處理 I/O 請求并通知 各個線程它們的數據已經準備好了。選擇器類供了這種抽象,使得 Java 代碼能夠以可移植的方式,請求底層的操作系統供就緒選擇服務
Selector:
選擇器類管理著一個被注冊的通道集合的信息和它們的就緒狀態。通道是和選擇器一起被注冊的,并且使用選擇器來更新通道的就緒狀態。當這么做的時候,可以選擇將被激發的線程掛起,直到有就緒的的通道。
public abstract class Selector implements Closeable { protected Selector() { } //實例化Selector public static Selector open() throws IOException { return SelectorProvider.provider().openSelector(); } public abstract SelectorProvider provider(); //返回 注冊到它們之上的通道的集合,不可以直接修改的 public abstract Setkeys(); //返回 就緒的鍵.已注冊的鍵的集合的子集,這個集合的每個成員都是相關的通道被選擇器(在前一個選擇操作中)斷為已經準備好的,并且包含于鍵的interest集合中的操作/ public abstract Set selectedKeys(); //非阻塞 public abstract int selectNow() throws IOException; //設定時間內阻塞 public abstract int select(long timeout) throws IOException; //完全阻塞 public abstract int select() throws IOException; //停止阻塞中的select方法 public abstract Selector wakeup(); //測試一個選擇器是否處于被打開的狀態 public abstract boolean isOpen(); //釋放它可能占用的資源并將所有相關的選擇鍵設置為無效,一個在選擇操作中阻 的線程都將被醒,就像wakeup()方法被調用了一樣。與選擇器相關的通道將被注銷,而鍵將被取消 public abstract void close() throws IOException; }
select()方法:
1. 已取消的鍵的集合將會被檢查。如果它是非空的,每個已取消的鍵的集合中的鍵將從另外兩個集合中移除,并且相關的通道將被注銷。這個步驟結束后,已取消的鍵的集合將是空的。 2. 已注冊的鍵的集合中的鍵的interest集合將被檢查。在這個步驟中的檢查執行過后,對interest集合的改動不會影響剩余的檢查過程。 一旦就緒條件被定下來,底層操作系統將會進行查詢,以確定每個通道所關心的操作的真實就緒狀態。依賴于特定的select()方法調用,如果沒有通道已經準備好,線程可能會在這時阻塞,通常會有一個超時值。 直到系統調用完成為止,這個過程可能會使得調用線程一段時間,然后當前每個通道的就緒狀態將確定下來。對于那些還沒準備好的通道將不會執行任何的操作。對于那些操作系統指示至少已經準備好interest集合中的一種操作的通道,將執行以下兩種操作中的一種: - 如果通道的鍵還沒有處于已選擇的鍵的集合中,那么鍵的 ready 集合將被清空,然后表示操 作系統發現的當前通道已經準備好的操作的比特 碼將被設置。 - 否則,也就是鍵在已選擇的鍵的集合中。鍵的ready集合將被表示操作系統發現的當前已經準備好的操作的比特碼更新。所有之前的已經不再是就緒狀態的操作不會被清除。事實上,所有的比特位都不會被清理。由操作系統決定的 ready 集合是與之前的ready集合按位分離的,一旦鍵被放置于選擇器的已選擇的鍵的集合中,它的ready集合將是的。比特位只會被設置,不會被清理。 3. 步驟2可能會花費很長時間,特別是所激發的線程處于休眠狀態時。與該選擇器相關的鍵可能會同時被取消。當步驟2結束時,步驟1將重新執行,以完成任意一個在選擇進行的過程中,鍵已經被取消的通道的注銷 4. select操作返回的值是ready集合在步驟2中被修改的鍵的數量,而不是已選擇的鍵的集合中的通道的總數。返回值不是已準備好的通道的總數,而是從上一個 select()調用之后進入就緒狀態的通道的數量。之前的調用中就緒的,并且在本次調用中仍然就緒的通道不會被計入,而那些在前一次調用中已經就緒但已經不再處于就緒狀態的通道也不會被計入。這些通道可能仍然在已選擇的 鍵的集合中,但不會被計入返回值中。返回值可能是 0。
選擇是累積的。一旦一個選擇器將一個鍵加到它的已選擇的鍵的集合中,它就不會移除這個鍵。并且,一旦一個鍵處于已選擇的鍵的集合中,這個鍵的ready集合將只會被設置,而不會被清理。
合理地使用選擇器的是理解選擇器維護的選擇鍵集合所演的角色。最重要的部分是當鍵已經不再在已選擇的鍵的集合中時將會發生什么。當通道上的至少一個感興趣的操作就緒時,鍵的ready集合就會被清空,并且當前已經就緒的操作將會被加到ready集合中。該鍵之后將被 加到已選擇的鍵的集合中。
清理一個 SelectKey的ready集合的方式是將這個鍵從已選擇的鍵的集合中移除(removed掉)。選擇鍵的就緒狀態只有在選擇器對象在選擇操作過程中才會修改。處理思想是只有在已選擇的鍵的集合中的鍵才被認為是包含了合法的就緒信息的。這些信息將在鍵中長久地存在,直到鍵從已選擇的鍵的集合中移除,以通知選擇器您已經看到并對它進行了處理。如果下一次通道的一些感興趣的操作發生時,鍵將被重新設置以反映當時通道的狀態并再次被 加到已選擇的鍵的集合中。
SelectableChannel:
FileChannel對象不是可選擇的,SocketChannel,SocketServerChannel,DatagramChannel是可選擇的.
public abstract class SelectableChannel extends AbstractChannel implements Channel{ // 注冊通道到Selector上,第二個參數表示所關心的通道操作 public abstract SelectionKey register (Selector sel, int ops) throws ClosedChannelException; // 同上,第三個參數attach到SelectionKey public abstract SelectionKey register (Selector sel, int ops,Object att) throws ClosedChannelException; public abstract boolean isRegistered( ); // 返回與該通道和指定的選擇器相關的鍵 public abstract SelectionKey keyFor (Selector sel); public abstract int validOps( ); //通道在被注冊到一個選擇器上之前,必須先設置為false,否則會拋出IllegalBlockingModeException public abstract void configureBlocking (boolean block) throws IOException; public abstract boolean isBlocking( ); public abstract Object blockingLock( ); }
SelectionKey:
選擇鍵封裝了特定的通道與特定的選擇器的注冊關系
public abstract class SelectionKey{ public static final int OP_READ public static final int OP_WRITE public static final int OP_CONNECT public static final int OP_ACCEPT //返回相關SelectableChannel public abstract SelectableChannel channel( ); //返回相關 Selector public abstract Selector selector( ); //不會立即注銷。直到下一次操作發生為止,它們仍然會處于被注冊的狀態. public abstract void cancel( ); //是否仍然有效 public abstract boolean isValid( ); //返回 通道/ 選擇器組合體所關心的操作(instrest 集合) public abstract int interestOps( ); //修改 instrest 集合 public abstract void interestOps (int ops); // 通道準備好要執行的操作,ready集合是interest集合的子集,并且表示了interest集合中從上次調用select()以來已經就緒的那些操作 //通過相關的選擇鍵的readyOps()方法返回的就緒狀態指示只是一個 示,不是保證。底層的通道在任何時候都會不斷改變。其他線程可能在通道上執行操作并影響它的就緒狀態。 public abstract int readyOps( ); //是否可讀 public final boolean isReadable( ) //是否可寫 public final boolean isWritable( ) //是否可連接 public final boolean isConnectable( ) //是否可接受連接 public final boolean isAcceptable( ) //附上對象 public final Object attach (Object ob) //返回 附著的對象 public final Object attachment( ) }
1.建立選擇器
Selector selector = Selector.open( ); channel1.register (selector, SelectionKey.OP_READ); channel2.register (selector, SelectionKey.OP_WRITE); channel3.register (selector, SelectionKey.OP_READ |SelectionKey.OP_WRITE); // Wait up to 10 seconds for a channel to become ready readyCount = selector.select (10000);
上述代碼,建了一個新的選擇器,然后將這三個(已經存在的)socket 通道注冊到選擇器上,而且感興趣的操作各不相同
2.完整的交互例子
import java.net.InetSocketAddress; import java.net.ServerSocket; import java.nio.ByteBuffer; import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; public class SelectSockets { public static int PORT_NUMBER = 1234; public static void main(String[] argv) throws Exception { new SelectSockets().go(argv); } public void go(String[] argv) throws Exception { int port = PORT_NUMBER; if (argv.length > 0) { // Override default listen port port = Integer.parseInt(argv[0]); } ServerSocketChannel serverChannel = ServerSocketChannel.open(); ServerSocket serverSocket = serverChannel.socket(); Selector selector = Selector.open(); serverSocket.bind(new InetSocketAddress(port)); serverChannel.configureBlocking(false); serverChannel.register(selector, SelectionKey.OP_ACCEPT); while (true) { int n = selector.select(); if (n == 0) { continue; } Iterator it = selector.selectedKeys().iterator(); while (it.hasNext()) { SelectionKey key = (SelectionKey) it.next(); // Is a new connection coming in? if (key.isAcceptable()) { ServerSocketChannel server = (ServerSocketChannel) key.channel(); SocketChannel channel = server.accept(); registerChannel(selector, channel, SelectionKey.OP_READ); sayHello(channel); } } } } protected void registerChannel(Selector selector, SelectableChannel channel, int ops) throws Exception { if (channel == null) { return; } channel.configureBlocking(false); channel.register(selector, ops); } private ByteBuffer buffer = ByteBuffer.allocateDirect(1024); protected void readDataFromSocket(SelectionKey key) throws Exception { SocketChannel socketChannel = (SocketChannel) key.channel(); int count; buffer.clear(); // Empty buffer while ((count = socketChannel.read(buffer)) > 0) { buffer.flip(); // Make buffer readable while (buffer.hasRemaining()) { socketChannel.write(buffer); } buffer.clear(); // Empty buffer } if (count < 0) { socketChannel.close(); } } private void sayHello(SocketChannel channel) throws Exception { buffer.clear(); buffer.put("Hi there! ".getBytes()); buffer.flip(); channel.write(buffer); } }
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/70246.html
摘要:從使用到原理學習線程池關于線程池的使用,及原理分析分析角度新穎面向切面編程的基本用法基于注解的實現在軟件開發中,分散于應用中多出的功能被稱為橫切關注點如事務安全緩存等。 Java 程序媛手把手教你設計模式中的撩妹神技 -- 上篇 遇一人白首,擇一城終老,是多么美好的人生境界,她和他歷經風雨慢慢變老,回首走過的點點滴滴,依然清楚的記得當初愛情萌芽的模樣…… Java 進階面試問題列表 -...
摘要:它使用了事件通知以確定在一組非阻塞套接字中有哪些已經就緒能夠進行相關的操作。目前,可以把看作是傳入入站或者傳出出站數據的載體。出站事件是未來將會觸發的某個動作的操作結果,這些動作包括打開或者關閉到遠程節點的連接將數據寫到或者沖刷到套接字。 netty的概念 定義 Netty 是一款異步的事件驅動的網絡應用程序框架,支持快速地開發可維護的高性能的面向協議的服務器和客戶端。我們可以很簡單的...
閱讀 1269·2021-09-23 11:51
閱讀 1386·2021-09-04 16:45
閱讀 630·2019-08-30 15:54
閱讀 2081·2019-08-30 15:52
閱讀 1599·2019-08-30 11:17
閱讀 3104·2019-08-29 13:59
閱讀 2014·2019-08-28 18:09
閱讀 386·2019-08-26 12:15