摘要:緩沖區(qū)的限制不能為負,并且不能大于其容量。如果指向的位置超過限制,則拋出異常。使用臨時緩沖區(qū)執(zhí)行低層次操作。臨時緩沖區(qū)對象離開作用域,并最終成為被回收的無用數(shù)據(jù)。
前天剛好看了點《UNIX網(wǎng)絡(luò)編程》,比較頭大。現(xiàn)在我來整理一下所學所得,并用于個人備忘。如果有不對,請批評。
想要解鎖更多新姿勢?請訪問https://blog.tengshe789.tech/
IO模型介紹IO模型是什么?很多書籍或者百度百度百科,都沒有給出明確的解釋,我也不敢亂下定義。以我愚見,IO模型,是通過根據(jù)前人主觀意識的思考而構(gòu)成客觀闡述IO復雜操作邏輯的物件。
要知道,應(yīng)用程序使用系統(tǒng)資源的一個過程,進程無法直接操作IO設(shè)備的,因為用戶進程不能直接訪問磁盤,所以要通過內(nèi)核的系統(tǒng)調(diào)用讀取,這個內(nèi)核讀取的過程就是用戶進程等待的過程,等待內(nèi)核讀取后將數(shù)據(jù)從內(nèi)核內(nèi)存復制到進程內(nèi)存。因此操作系統(tǒng)設(shè)立一個IO模型進行規(guī)范,就非常有必要了。
為了更好地了解IO模型,我們需要事先回顧下:同步、異步、阻塞、非阻塞
同步與異步:描述的是用戶線程與內(nèi)核的交互方式,同步指用戶線程發(fā)起IO請求后需要等待或者輪詢內(nèi)核IO操作完成后才能繼續(xù)執(zhí)行;而異步是指用戶線程發(fā)起IO請求后仍然繼續(xù)執(zhí)行,當內(nèi)核IO操作完成后會通知用戶線程,或者調(diào)用用戶線程注冊的回調(diào)函數(shù)。
阻塞與非阻塞:描述是用戶線程調(diào)用內(nèi)核IO操作的方式,阻塞是指IO操作需要徹底完成后才返回到用戶空間;而非阻塞是指IO操作被調(diào)用后立即返回給用戶一個狀態(tài)值,無需等到IO操作徹底完成。
IO模型一共有5類:
blocking-IO BIO(阻塞IO)
non-blocking IO NIO(非阻塞IO)
IO multiplexing IO多路復用
signal driven IO 信號驅(qū)動IO
asynchronous IO AIO(異步IO)
由于signal driven IO(信號驅(qū)動IO)在實際中并不常用,所以主要介紹其余四種IO Model。
BIO(blocking io)先來看看讀操作流程
從圖中可以看出,用戶進程調(diào)用了recvfrom這個系統(tǒng)調(diào)用,kernel就開始了IO的第一個階段:準備數(shù)據(jù)。
對于network io來說,很多時候數(shù)據(jù)在一開始還沒有到達(比如,還沒有收到一個完整的UDP包),這個時候kernel就要等待足夠的數(shù)據(jù)到來。
而在用戶進程這邊,整個進程會被阻塞。當kernel一直等到數(shù)據(jù)準備好了,它就會將數(shù)據(jù)從kernel中拷貝到用戶內(nèi)存,然后kernel返回結(jié)果,用戶進程才解除block的狀態(tài),重新運行起來。
也就是說,blocking IO的特點就是在IO執(zhí)行的兩個階段(等待數(shù)據(jù)和拷貝數(shù)據(jù)兩個階段)都被block了。
JAVA 阻塞 demo下面的例子主要使用Socket通道進行編程。服務(wù)端如下:
/** * @program: socketTest * @description: one thread demo for bio version * @author: tEngSHe789 * @create: 2018-08-26 21:17 **/ public class Server { public static void main(String[] args) { try { ServerSocket serverSocket=new ServerSocket(8888); System.out.println("服務(wù)端Start...."); //等待客戶端就緒 -> 堵塞 while (true){ Socket socket = serverSocket.accept(); System.out.println("發(fā)現(xiàn)客戶端連接"); InputStream is=socket.getInputStream(); byte[] b =new byte[1024]; //等待客戶端發(fā)送請求 -> 堵塞 while (true) { int data = is.read(b); String info=null; if (data!=-1){ info=new String(b,0,data,"GBK"); } System.out.println(info); } } } catch (IOException e) { } } }
客戶端
/** * @program: socketTest * @description: one thread demo for bio version * @author: tEngSHe789 **/ public class Client { public static void main(String[] args) { try { Socket socket=new Socket("127.0.0.1",8888); OutputStream os = socket.getOutputStream(); System.out.println("正在發(fā)送數(shù)據(jù)"); os.write("這是來自客戶端的信息".getBytes()); os.flush(); } catch (IOException e) { e.printStackTrace(); } } }PY 阻塞 demo
服務(wù)端
import socket s = socket.socket() s.bind(("127.0.0.1",8888)) print("服務(wù)端啟動....") # 等待客戶端就緒 -> 堵塞 s.listen() # 等待客戶端發(fā)送請求 -> 堵塞 conn,addr = s.accept() msg = conn.recv(1024).decode("utf-8") print(msg) conn.close() s.close()
客戶端
import socket s = socket.socket() s.connect(("127.0.0.1",8888)) print("客戶端已啟動....") s.send("正在發(fā)送數(shù)據(jù)".encode("utf-8")) s.close()NIO(non blocking io)
NIO就不一樣了,recvform系統(tǒng)調(diào)用調(diào)用之后,進程并沒有被阻塞,內(nèi)核馬上返回給進程,如果數(shù)據(jù)還沒準備好,此時會返回一個error。進程在返回之后,可以干點別的事情,然后再發(fā)起recvform系統(tǒng)調(diào)用。重復上面的過程,循環(huán)往復的進行recvform系統(tǒng)調(diào)用。這個過程通常被稱之為輪詢。
輪詢檢查內(nèi)核數(shù)據(jù),直到數(shù)據(jù)準備好,再拷貝數(shù)據(jù)到進程,進行數(shù)據(jù)處理。需要注意,拷貝數(shù)據(jù)整個過程,進程仍然是屬于阻塞的狀態(tài)。
JAVA 與NIOJava NIO(New IO)是一個可以替代標準Java IO API的IO API(從Java 1.4開始),Java NIO提供了與標準IO不同的IO工作方式。
在java中,標準的IO基于字節(jié)流和字符流進行操作的,而NIO是基于通道(Channel)和緩沖區(qū)(Buffer)進行操作,數(shù)據(jù)總是從通道讀取到緩沖區(qū)中,或者從緩沖區(qū)寫入到通道中。
我們先看看Buffer類
Buffer類Java NIO中的Buffer主要用于與NIO通道進行交互,數(shù)據(jù)是從通道讀入到緩沖區(qū),從緩沖區(qū)寫入通道中的。概念上,緩沖區(qū)可以看成包在一個對象內(nèi)的數(shù)組,下面看一個圖
這是一個新創(chuàng)建的容量為10的ByteBuffer邏輯圖,他有四個屬性來提供關(guān)于其包含的數(shù)據(jù)元素信息,分別是:
1)容量(capacity):表示Buffer最大數(shù)據(jù)容量,緩沖區(qū)容量不能為負,并且建立后不能修改。
2)限制(limit):也叫上界。第一個不應(yīng)該讀取或者寫入的數(shù)據(jù)的索引,即位于limit后的數(shù)據(jù)不可以讀寫。緩沖區(qū)的限制不能為負,并且不能大于其容量(capacity)。
3)位置(position):下一個要讀取或?qū)懭氲臄?shù)據(jù)的索引。緩沖區(qū)的位置不能為負,并且不能大于其限制(limit)。
4)標記(mark)與重置(reset):標記是一個索引,通過Buffer中的mark()方法指定Buffer中一個特定的position,之后可以通過調(diào)用reset()方法恢復到這個position。
從這幅圖可以看到,他的容量(capacity)和限制(limit)設(shè)置為10,位置設(shè)置為0,每個緩沖區(qū)容量是固定的,標記是未定義的,其他三個屬性可以通過使用緩沖區(qū)解決。
緩沖區(qū)存儲數(shù)據(jù)支持的數(shù)據(jù)類型支持七種數(shù)據(jù)類型,他們是:
1.byteBuffer
2.charBuffer
3.shortBuffer
4.IntBuffer
5.LongBuffer
6.FloatBuffer
7.DubooBuffer
使用Buffer讀寫數(shù)據(jù)一般遵循以下四個步驟:
(1) 寫入數(shù)據(jù)到Buffer,一般有可以從Channel讀取到緩沖區(qū)中,也可以調(diào)用put方法寫入。
(2) 調(diào)用flip()方法,切換數(shù)據(jù)模式。
(3) 從Buffer中讀取數(shù)據(jù),一般從緩沖區(qū)讀取數(shù)據(jù)寫入到通道中,也可以調(diào)用get方法讀取。
(4) 調(diào)用clear()方法或者compact()方法。
緩沖區(qū)API首先,用allocate 指定緩沖區(qū)大小1024
ByteBuffer byteBuffer=ByteBuffer.allocate(1024);
我們可以用put 存入數(shù)據(jù)到緩沖區(qū)
byteBuffer.put("tengshe789".getBytes());
當調(diào)用put時,會指出下一個元素應(yīng)當被插入的位置,位置(position)指向的是下一個元素。如果指向的位置超過限制(limit),則拋出BufferOverFlowException異常。
Flip將一個能夠繼續(xù)添加數(shù)據(jù)元素的填充狀態(tài)的緩沖區(qū)翻轉(zhuǎn)成一個準備讀出元素的釋放狀態(tài)
byteBuffer.flip();
具體有什么用呢?
對于已經(jīng)寫滿了緩沖區(qū),如果將緩沖區(qū)內(nèi)容傳遞給一個通道,以使內(nèi)容能被全部寫出。
但如果通道現(xiàn)在在緩沖區(qū)上執(zhí)行g(shù)et,那么它將從我們剛剛插入的有用數(shù)據(jù)之外取出未定義數(shù)據(jù)。通過翻轉(zhuǎn)將位置值重新設(shè)為 0,通道就會從正確位置開始獲取。
例如我們定義了一個容量是10的buffer,并填入hello,如下圖所示
翻轉(zhuǎn)后如下圖所示
Rewind與 flip相似,但不影響上界屬性。它只是將位置值設(shè)回 0。可以使用 rewind()后退,重讀已經(jīng)被翻轉(zhuǎn)的緩沖區(qū)中的數(shù)據(jù)。
byteBuffer.rewind();
翻轉(zhuǎn)完了,就可以用get獲取緩沖區(qū)數(shù)據(jù)了
byte[] b= new byte[byteBuffer.limit()]; byteBuffer.get(b);
當調(diào)用get時,會指出下一個元素應(yīng)當被索引的位置,位置(position)返回時會+1s。如果指向的位置超過限制(limit),則拋出BufferUnderFlowException異常。如果提供的索引超過范圍,也會拋出IndexOutOfBoundsException異常
remaining可以告訴你從當前位置(position)到限制(limit)還剩的元素數(shù)目
int count = byteBuffer.remaining();
clear將緩沖區(qū)重置為空狀態(tài)
byteBuffer.clear();
如果我們只想從緩沖區(qū)中釋放一部分數(shù)據(jù),而不是全部,然后重新填充。為了實現(xiàn)這一點,未讀的數(shù)據(jù)元素需要下移以使第一個元素索引為 0。盡管重復這樣做會效率低下,但這有時非常必要,而 API 對此為您提供了一個 compact()函數(shù)。
byteBuffer.compact();
標記是一個索引,通過Buffer中的mark()方法指定Buffer中一個特定的position,之后可以通過調(diào)用reset()方法恢復到這個position。要知道緩沖區(qū)的標記在mark()函數(shù)被調(diào)用前時未定義的,如果標記未定義,調(diào)用reset()會導致InvalidMarkException異常
byteBuffer.position(2).mark().position(4).reset();
要注意,java.nio中的類特意被設(shè)計為支持級聯(lián)調(diào)用,優(yōu)雅的使用級聯(lián)調(diào)用,可以產(chǎn)生優(yōu)美易讀的代碼。
直接緩沖區(qū)與非直接緩沖區(qū)上面我們說了ByteBuffer,也就是緩沖區(qū)的用法,譬如用allocate() 方法指定緩沖區(qū)大小,然后進行填充或翻轉(zhuǎn)操作等等等。我們所創(chuàng)建的緩沖區(qū),都屬于直接緩沖區(qū)。他們都是在JVM中內(nèi)存中創(chuàng)建,在每次調(diào)用基礎(chǔ)操作系統(tǒng)的一個本機IO之前或者之后,虛擬機都會將緩沖區(qū)的內(nèi)容復制到中間緩沖區(qū)(或者從中間緩沖區(qū)復制內(nèi)容),緩沖區(qū)的內(nèi)容駐留在JVM內(nèi),因此銷毀容易,但是占用JVM內(nèi)存開銷,處理過程中有復制操作。
非直接緩沖區(qū)寫入步驟:
1.創(chuàng)建一個臨時的直接ByteBuffer對象。
2.將非直接緩沖區(qū)的內(nèi)容復制到臨時緩沖中。
3.使用臨時緩沖區(qū)執(zhí)行低層次I/O操作。
4.臨時緩沖區(qū)對象離開作用域,并最終成為被回收的無用數(shù)據(jù)。
/** * @program: UndirectBuffer * @description: 利用通道完成文件的復制(非直接緩沖區(qū)) * @author: tEngSHe789 **/ public class UndirectBuffer { public static void main(String[] args) throws IOException { // 創(chuàng)建流 FileInputStream fis = new FileInputStream("d://blog.md"); FileOutputStream fos = new FileOutputStream("d://blog.md"); //獲取管道 FileChannel in = fis.getChannel(); FileChannel out = fos.getChannel(); // 分配指定大小的緩沖區(qū) ByteBuffer buffer = ByteBuffer.allocate(1024); while (in.read(buffer) !=-1){ buffer.flip();// 準備讀數(shù)據(jù)了 out.write(buffer); buffer.clear(); } out.close(); in.close(); fis.close(); fos.close(); } }
直接緩沖區(qū),是通過 allocateDirect() 方法在JVM內(nèi)存外開辟內(nèi)存,在每次調(diào)用基礎(chǔ)操作系統(tǒng)的一個本機IO之前或者之后,虛擬機都會避免將緩沖區(qū)的內(nèi)容復制到中間緩沖區(qū)(或者從中間緩沖區(qū)復制內(nèi)容),緩沖區(qū)的內(nèi)容駐留在物理內(nèi)存內(nèi),會少一次復制過程,如果需要循環(huán)使用緩沖區(qū),用直接緩沖區(qū)可以很大地提高性能。
雖然直接緩沖區(qū)使JVM可以進行高效的I/O操作,但它使用的內(nèi)存是操作系統(tǒng)分配的,繞過了JVM堆棧,建立和銷毀比堆棧上的緩沖區(qū)要更大的開銷。
/** * @program: DirectBuffer * @description: 使用直接緩沖區(qū)完成文件的復制(內(nèi)存映射文件) * @author: tEngSHe789 **/ public class DirectBuffer { public static void main(String[] args) throws IOException { //創(chuàng)建管道 FileChannel in=FileChannel.open(Paths.get("d://blog.md"),StandardOpenOption.READ); FileChannel out=FileChannel.open(Paths.get("d://blog.md"),StandardOpenOption.WRITE ,StandardOpenOption.READ,StandardOpenOption.CREATE); // 拿到將管道內(nèi)容映射到內(nèi)存的直接緩沖區(qū)映射文件(一個位置在硬盤的基于內(nèi)存的緩沖區(qū)) MappedByteBuffer inMappedByteBuffer = in.map(FileChannel.MapMode.READ_ONLY, 0, in.size()); MappedByteBuffer outMappedByteBuffer = out.map(FileChannel.MapMode.READ_WRITE, 0, in.size()); // 對直接緩沖區(qū)進行數(shù)據(jù)讀寫操作 byte[] bytes=new byte[inMappedByteBuffer.limit()]; inMappedByteBuffer.get(bytes); outMappedByteBuffer.put(bytes); in.close(); out.close(); } }
字節(jié)緩沖區(qū)要么是直接的,要么是非直接的。如果為直接字節(jié)緩沖區(qū),則 Java 虛擬機會盡最大努力直接在此緩沖區(qū)上執(zhí)行本機 I/O 操作。也就是說,在每次調(diào)用基礎(chǔ)操作系統(tǒng)的一個本機 I/O 操作之前(或之后),虛擬機都會盡量避免將緩沖區(qū)的內(nèi)容復制到中間緩沖區(qū)中(或從中間緩沖區(qū)中復制內(nèi)容)。
直接字節(jié)緩沖區(qū)可以通過調(diào)用此類的 allocateDirect() 工廠方法來創(chuàng)建。此方法返回的緩沖區(qū)進行分配和取消分配所需成本通常高于非直接緩沖區(qū)。直接緩沖區(qū)的內(nèi)容可以駐留在常規(guī)的垃圾回收堆之外,因此,它們對應(yīng)用程序的內(nèi)存需求量造成的影響可能并不明顯。所以,建議將直接緩沖區(qū)主要分配給那些易受基礎(chǔ)系統(tǒng)的本機 I/O 操作影響的大型、持久的緩沖區(qū)。一般情況下,最好僅在直接緩沖區(qū)能在程序性能方面帶來明顯好處時分配它們。
直接字節(jié)緩沖區(qū)還可以通過 FileChannel 的 map() 方法 將文件區(qū)域直接映射到內(nèi)存中來創(chuàng)建。該方法返回MappedByteBuffer 。 Java 平臺的實現(xiàn)有助于通過 JNI 從本機代碼創(chuàng)建直接字節(jié)緩沖區(qū)。如果以上這些緩沖區(qū)中的某個緩沖區(qū)實例指的是不可訪問的內(nèi)存區(qū)域,則試圖訪問該區(qū)域不會更改該緩沖區(qū)的內(nèi)容,并且將會在訪問期間或稍后的某個時間導致拋出不確定的異常。
字節(jié)緩沖區(qū)是直接緩沖區(qū)還是非直接緩沖區(qū)可通過調(diào)用其 isDirect() 方法來確定。提供此方法是為了能夠在性能關(guān)鍵型代碼中執(zhí)行顯式緩沖區(qū)管理。
Channel通道是java.nio的第二個創(chuàng)新,表示提供 IO 設(shè)備(例如:文件、套接字)的直接連接。
若需要使用 NIO 系統(tǒng),需要獲取用于連接 IO 設(shè)備的通道以及用于容納數(shù)據(jù)的緩沖區(qū)。然后操作緩沖區(qū),對數(shù)據(jù)進行處理。這其中,Channel負責傳輸, Buffer 負責存儲。
通道是由java.nio.channels 包定義的,Channel 表示 IO 源與目標打開的連接。Channel 類似于傳統(tǒng)的“流”。只不過 Channel本身不能直接訪問數(shù)據(jù), Channel 只能與Buffer 進行交互。
接口java.nio.channels.Channel 接口:
FileChannel
SocketChannel
ServerSocketChannel
DatagramChannel
與緩沖區(qū)不同,通道API主要由接口指定,不同操作系統(tǒng)上通道的實現(xiàn)會不一樣
實現(xiàn)直接緩沖區(qū)與非直接緩沖區(qū)的栗子
分散讀取與聚集寫入通道可以有選擇地實現(xiàn)兩個新的接口: ScatteringByteChannel 和 GatheringByteChannel。
ScatteringByteChannel 有2個read方法,我們都叫她分散讀取(scattering Reads),分散讀取中,通道依次填充每個緩沖區(qū)。填滿一個緩沖區(qū)后,它就開始填充下一個。在某種意義上,緩沖區(qū)數(shù)組就像一個大緩沖區(qū)。
GatheringByteChannel中有2個wirte方法,我們都叫她聚集寫入(gathering Writes),他可以將多個緩沖區(qū)的數(shù)據(jù)聚集到通道中
分散讀取/聚集寫入對于將數(shù)據(jù)劃分為幾個部分很有用。例如,您可能在編寫一個使用消息對象的網(wǎng)絡(luò)應(yīng)用程序,每一個消息被劃分為固定長度的頭部和固定長度的正文。您可以創(chuàng)建一個剛好可以容納頭部的緩沖區(qū)和另一個剛好可以容難正文的緩沖區(qū)。當您將它們放入一個數(shù)組中并使用分散讀取來向它們讀入消息時,頭部和正文將整齊地劃分到這兩個緩沖區(qū)中。
我們從緩沖區(qū)所得到的方便性對于緩沖區(qū)數(shù)組同樣有效。因為每一個緩沖區(qū)都跟蹤自己還可以接受多少數(shù)據(jù),所以分散讀取會自動找到有空間接受數(shù)據(jù)的第一個緩沖區(qū)。在這個緩沖區(qū)填滿后,它就會移動到下一個緩沖區(qū)。
Python與NIO服務(wù)端(具體見注釋)
from socket import * import time s=socket(AF_INET,SOCK_STREAM) s.bind(("127.0.0.1",8888)) s.listen(5) s.setblocking(False) #設(shè)置socket的接口為非阻塞 conn_l=[] # 存儲和server的連接 的 連接 del_l=[] # 存儲和和server的斷開 的 連接 while True: try: # 這個過程是不阻塞的 conn,addr=s.accept() # 當沒人連接的時候會報錯,走exception(<- py中是except) conn_l.append(conn) except BlockingIOError: print(conn_l) for conn in conn_l: try: data=conn.recv(1024) if not data: del_l.append(conn) # 這個過程是不阻塞的 data=conn.recv(1024) # 不阻塞 if not data: # 如果拿不到data del_l.append(conn) # 在廢棄列表中添加conn continue conn.send(data.upper()) except BlockingIOError: pass except ConnectionResetError: del_l.append(conn) for conn in del_l: conn_l.remove(conn) conn.close() del_l=[]
客戶端
from socket import * c=socket(AF_INET,SOCK_STREAM) c.connect(("127.0.0.1",8888)) while True: msg=input(">>: ") if not msg:continue c.send(msg.encode("utf-8")) data=c.recv(1024) print(data.decode("utf-8"))IO復用(IO multiplexing)
I/O多路復用實際上就是用select, poll, epoll監(jiān)聽多個io對象,當io對象有變化(有數(shù)據(jù))的時候就通知用戶進程。有些地方也稱這種IO方式為事件驅(qū)動IO(event driven IO)。與多進程和多線程技術(shù)相比,I/O多路復用技術(shù)的最大優(yōu)勢是系統(tǒng)開銷小,系統(tǒng)不必創(chuàng)建進程/線程,也不必維護這些進程/線程,從而大大減小了系統(tǒng)的開銷。當然具體的可以看看這篇博客,現(xiàn)在先來看下I/O多路復用的流程:
(1)當用戶進程調(diào)用了select,那么整個進程會被block;
(2)而同時,kernel會“監(jiān)視”所有select負責的socket;
(3)當任何一個socket中的數(shù)據(jù)準備好了,select就會返回;
(4)這個時候用戶進程再調(diào)用read操作,將數(shù)據(jù)從kernel拷貝到用戶進程。
這個圖和BIO的圖其實并沒有太大的不同,事實上還更差一些。因為這里需要使用兩個系統(tǒng)調(diào)用(select和recvfrom),而BIO只調(diào)用了一個系統(tǒng)調(diào)用(recvfrom)。但是,用select的優(yōu)勢在于它可以同時處理多個connection。
JAVA實現(xiàn)IO復用這里我們使用的是java.nio下模塊來完成I/O多路復用的例子。我用到的Selector(選擇器),是Java NIO中能夠檢測一到多個NIO通道,并能夠知曉通道是否為諸如讀寫事件做好準備的組件。這樣,一個多帶帶的線程可以管理多個channel,從而管理多個網(wǎng)絡(luò)連接。
Selector的使用 Selector的創(chuàng)建Selector selector = Selector.open();向Selector注冊通道
為了將Channel和Selector配合使用,必須將channel注冊到selector上。通過SelectableChannel.register()方法來實現(xiàn),如下:
channel.configureBlocking(false); SelectionKey key = channel.register(selector,Selectionkey.OP_READ);
register()方法的第二個參數(shù)是一個“interest集合”,意思是在通過Selector監(jiān)聽Channel時對什么事件感興趣。可以監(jiān)聽四種不同類型的事件:Connect、Accept、Read、Write
通道觸發(fā)了一個事件意思是該事件已經(jīng)就緒。所以,某個channel成功連接到另一個服務(wù)器稱為“連接就緒”。一個server socket channel準備好接收新進入的連接稱為“接收就緒”。一個有數(shù)據(jù)可讀的通道可以說是“讀就緒”。等待寫數(shù)據(jù)的通道可以說是“寫就緒”。
這四種事件用SelectionKey的四個常量來表示:
SelectionKey.OP_CONNECT可連接
SelectionKey.OP_ACCEPT可接受連接
SelectionKey.OP_READ可讀
SelectionKey.OP_WRITE可寫
SelectionKey當向Selector注冊Channel時,register()方法會返回一個SelectionKey對象。它包含了:
interest集合
ready集合
Channel
Selector
附加的對象(可選)
interest集合是你所選擇的感興趣的事件集合。可以通過SelectionKey讀寫interest集合,像這樣:
int interestSet = selectionKey.interestOps(); boolean isInterestedInAccept = (interestSet & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT; boolean isInterestedInConnect = interestSet & SelectionKey.OP_CONNECT; boolean isInterestedInRead = interestSet & SelectionKey.OP_READ; boolean isInterestedInWrite = interestSet & SelectionKey.OP_WRITE;
可以看到,用“位與”操作interest 集合和給定的SelectionKey常量,可以確定某個確定的事件是否在interest 集合中。
ready 集合是通道已經(jīng)準備就緒的操作的集合。在一次選擇(Selection)之后,你會首先訪問這個ready set。Selection將在下一小節(jié)進行解釋。可以這樣訪問ready集合:
int readySet = selectionKey.readyOps();
可以用像檢測interest集合那樣的方法,來檢測channel中什么事件或操作已經(jīng)就緒。但是,也可以使用以下四個方法,它們都會返回一個布爾類型:
selectionKey.isAcceptable(); selectionKey.isConnectable(); selectionKey.isReadable(); selectionKey.isWritable();從SelectionKey訪問Channel和Selector
Channel channel = selectionKey.channel(); Selector selector = selectionKey.selector();java代碼
/** * @program: NIOServer * @description: 服務(wù)端 * @author: tEngSHe789 **/ public class NIOServer { public static void main(String[] args) throws IOException { System.out.println("服務(wù)端Start...."); // 創(chuàng)建通道 ServerSocketChannel serverSocketChannel=ServerSocketChannel.open(); // 設(shè)置非阻塞 serverSocketChannel.configureBlocking(false); // 綁定連接 serverSocketChannel.bind(new InetSocketAddress(8888)); // 獲取選擇器 Selector selector=Selector.open(); // 將通道注冊到選擇器 serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); // 輪調(diào)式獲取選擇“已經(jīng)準備就緒”的事件 while (selector.select() > 0){ // 獲取當前選擇器的左右已經(jīng)準備就緒的監(jiān)聽事件(選擇key) Iterator客戶端:iterator=selector.selectedKeys().iterator(); while (iterator.hasNext()){ // 獲取準備就緒事件 SelectionKey selectionKey=iterator.next(); // 判斷具體是什么事件 if (selectionKey.isAcceptable()){//如果是“接受就緒” SocketChannel socketChannel=serverSocketChannel.accept();// 獲取連接 socketChannel.configureBlocking(false); // 設(shè)置非阻塞 //將該通道注冊到服務(wù)器上 socketChannel.register(selector, SelectionKey.OP_READ); }else if (selectionKey.isReadable()){//如是“已經(jīng)就緒” SocketChannel socketChannel= (SocketChannel) selectionKey.channel();//獲取連接 //讀數(shù)據(jù) ByteBuffer buffer=ByteBuffer.allocate(1024); int len = 0; //分散讀取 len=socketChannel.read(buffer); while (len > 0){ buffer.flip(); System.out.println(new String(buffer.array(),0,len)); buffer.clear(); } } iterator.remove(); } } } }
/** * @program: NIOClient * @description: 客戶端 * @author: tEngSHe789 **/ public class NIOClient { public static void main(String[] args) throws IOException { System.out.println("客戶端Start...."); // 創(chuàng)建通道 SocketChannel socketChannel=SocketChannel.open(new InetSocketAddress("127.0.0.1",8888)); // 設(shè)置SocketChannel接口為非阻塞 socketChannel.configureBlocking(false); //指定緩沖區(qū)大小 ByteBuffer buffer=ByteBuffer.allocate(1024); Scanner scanner=new Scanner(System.in); while (scanner.hasNext()){ String msg = scanner.next(); // 存儲 buffer.put((new Date().toString()+" "+msg).getBytes()); // 翻轉(zhuǎn) buffer.flip(); // 聚集寫入 socketChannel.write(buffer); // 釋放 buffer.clear(); } socketChannel.close(); } }python實現(xiàn)IO復用
對比java用的是Selector,可以幫我們在默認操作系統(tǒng)下選擇最合適的select, poll, epoll這三種多路復合模型,python是通過一種機制一個進程能同時等待多個文件描述符,而這些文件描述符(套接字描述符)其中的任意一個進入讀就緒狀態(tài),select()函數(shù)就可以返回。
服務(wù)端from socket import * import select s=socket(AF_INET,SOCK_STREAM) s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) s.bind(("127.0.0.1",8888)) s.listen(5) s.setblocking(False) #設(shè)置socket的接口為非阻塞 read_l=[s,] # 數(shù)據(jù)可讀通道的列表 while True: # 監(jiān)聽的read_l中的socket對象內(nèi)部如果有變化,那么這個對象就會在r_l # 第二個參數(shù)里有什么對象,w_l中就有什么對象 # 第三個參數(shù) 如果這里的對象內(nèi)部出錯,那會把這些對象加到x_l中 # 1 是超時時間 r_l,w_l,x_l=select.select(read_l,[],[],1) print(r_l) for ready_obj in r_l: if ready_obj == s: conn,addr=ready_obj.accept() #此時的ready_obj等于s read_l.append(conn) else: try: data=ready_obj.recv(1024) #此時的ready_obj等于conn if not data: ready_obj.close() read_l.remove(ready_obj) raise Exception("連接斷開") ready_obj.send(data.upper()) except ConnectionResetError: ready_obj.close() read_l.remove(ready_obj)客戶端
from socket import * c=socket(AF_INET,SOCK_STREAM) c.connect(("127.0.0.1",8888)) while True: msg=input(">>>: ") if not msg:continue c.send(msg.encode("utf-8")) data=c.recv(1024) print(data.decode("utf-8"))AIO(asynchronous io)
真正的異步I/O很牛逼,流程大概如下:
(1)用戶進程發(fā)起read操作之后,立刻就可以開始去做其它的事。
(2)而另一方面,從kernel的角度,當它受到一個asynchronous read之后,首先它會立刻返回,所以不會對用戶進程產(chǎn)生任何block。
(3)然后,kernel會等待數(shù)據(jù)準備完成,然后將數(shù)據(jù)拷貝到用戶內(nèi)存,當這一切都完成之后,kernel會給用戶進程發(fā)送一個signal,告訴它read操作完成了。
JavaJava中使用AIO需要用到j(luò)ava.nio.channels.AsynchronousChannelGroup和java.nio.channels.AsynchronousServerSocketChannel的包,由于實際項目鮮有人用,就不演示了
總結(jié)回顧一下各個IO Model的比較,如圖所示:
blocking io :阻塞型io,再熟悉不過,處理accept、read、write都會阻塞用戶進程
non blocking io:當通過系統(tǒng)調(diào)用的時候,如果沒有連接或者數(shù)據(jù)到達就直接返回一個錯誤,用戶進程不阻塞但是不斷的輪詢。注意這個不是java nio框架中對應(yīng)的網(wǎng)絡(luò)模型
io multiplexing:io多路復用才是nio對應(yīng)的網(wǎng)絡(luò)io模型。該模型對于用戶進程也是阻塞的,優(yōu)點是可以同時支持多個connetciotn。前三種都屬于同步模式,既然都是同步的,如果要做到看似非阻塞,那么就需要輪詢機制。相對于上一種模型,這種只是將輪詢從用戶進程轉(zhuǎn)移到了操作系統(tǒng)內(nèi)核,通過調(diào)用select函數(shù),不斷輪詢多個connection是否ready,如果有一種ready好的,就通過事件通知用戶進程,用戶進程再通過事件來處理。所以在java的nio中會看到一大堆事件處理。這種模型的阻塞不是在socket層面的阻塞,而是在調(diào)動select函數(shù)的阻塞。而且相對于blocking io,還多了一次select的系統(tǒng)調(diào)用,其實性能會更低,所以在低吞吐量下,這種io不見得比bio+線程池的模型優(yōu)越。
sign driven:極少使用,不知道
async io :java7時候開始升級,也成為nio2。實現(xiàn)了異步的io。前三種都是通過用戶進程在主動獲取(bio的阻塞,nbio的輪詢和iomult的按事件獲取),而aio交互很簡單,用戶進程調(diào)用后立即返回,用戶進程不阻塞,內(nèi)核當完成網(wǎng)絡(luò)io和數(shù)據(jù)復制后,主動通知用戶進程。前面說到的系統(tǒng)內(nèi)核做的操作,除了等待網(wǎng)絡(luò)io就緒數(shù)據(jù)到達內(nèi)核,還有從系統(tǒng)內(nèi)核復制用戶空間去的過程,異步io這兩者對于用戶進程而言都是非阻塞的,而前三種,在數(shù)據(jù)從內(nèi)核復制到用戶空間這個過程,都是阻塞的。
參考資料前言說的那本書
Ron Hitchens于2002年 著的《java nio》
findumars
冬瓜蔡
彼岸船夫
NIO的/分散讀取和聚集寫入
并發(fā)編程網(wǎng)
感謝
續(xù)1s時間全片結(jié)束,覺得我寫的不錯?想要了解更多精彩新姿勢?趕快打開我的
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://specialneedsforspecialkids.com/yun/42337.html
摘要:緩沖區(qū)的限制不能為負,并且不能大于其容量。如果指向的位置超過限制,則拋出異常。使用臨時緩沖區(qū)執(zhí)行低層次操作。臨時緩沖區(qū)對象離開作用域,并最終成為被回收的無用數(shù)據(jù)。 前天剛好看了點《UNIX網(wǎng)絡(luò)編程》,比較頭大。現(xiàn)在我來整理一下所學所得,并用于個人備忘。如果有不對,請批評。 想要解鎖更多新姿勢?請訪問https://blog.tengshe789.tech/ IO模型介紹 IO模型是什么...
摘要:參考鏈接面向?qū)ο缶幊棠P同F(xiàn)在的很多編程語言基本都具有面向?qū)ο蟮乃枷耄热绲鹊龋嫦驅(qū)ο蟮闹饕枷雽ο螅悾^承,封裝,多態(tài)比較容易理解,這里就不多多描述了。 前言 在我們的日常日發(fā)和學習生活中會常常遇到一些名詞,比如 命令式編程模型,聲明式編程模型,xxx語言是面向?qū)ο蟮牡鹊龋@個編程模型到處可見,但是始終搞不清是什么?什么語言又是什么編程模型,當你新接觸一門語言的時候,有些問題是需...
摘要:編程基礎(chǔ)要學習如何用進行數(shù)據(jù)分析,數(shù)據(jù)分析師建議第一步是要了解一些的編程基礎(chǔ),知道的數(shù)據(jù)結(jié)構(gòu),什么是向量列表數(shù)組字典等等了解的各種函數(shù)及模塊。數(shù)據(jù)分析師認為數(shù)據(jù)分析有的工作都在處理數(shù)據(jù)。 showImg(https://segmentfault.com/img/bVbnbZo?w=1024&h=653); 本文為CDA數(shù)據(jù)分析研究院原創(chuàng)作品,轉(zhuǎn)載需授權(quán) 1.為什么選擇Python進行數(shù)...
摘要:四種模型把同步阻塞同步非阻塞異步阻塞異步非阻塞的模型講得很清楚。有人對于模型有一些批判,認為多線程模型同步阻塞模型不比事件模型差,講了提到的多線程模型的性能瓶頸在如今的內(nèi)核里已經(jīng)不存在了,而多線程模型開發(fā)起來更簡單。 四種IO模型 Boost application performance using asynchronous I/O把同步阻塞、同步非阻塞、異步阻塞、異步非阻塞的模型講...
閱讀 1863·2023-04-26 02:46
閱讀 1995·2021-11-25 09:43
閱讀 1140·2021-09-29 09:35
閱讀 2095·2019-08-30 15:56
閱讀 3418·2019-08-30 15:54
閱讀 2627·2019-08-29 16:35
閱讀 3116·2019-08-29 15:25
閱讀 3282·2019-08-29 14:01