摘要:當操作系統發生事件,并且準備好數據后,在主動通知應用程序,觸發相應的函數。當失敗時觸發該方法,第一個參數代表操作失敗引發的異常或錯誤。
BIO編程
回顧下Linux下阻塞IO模型:
再看看Java的BIO編程模型:
/** * 類說明:客戶端 */ public class BioClient { public static void main(String[] args) throws InterruptedException, IOException { //通過構造函數創建Socket,并且連接指定地址和端口的服務端 Socket socket = new Socket(DEFAULT_SERVER_IP,DEFAULT_PORT); System.out.println("請輸入請求消息:"); //啟動讀取服務端輸出數據的線程 new ReadMsg(socket).start(); PrintWriter pw = null; //允許客戶端在控制臺輸入數據,然后送往服務器 while(true){ pw = new PrintWriter(socket.getOutputStream()); pw.println(new Scanner(System.in).next()); pw.flush(); } } //讀取服務端輸出數據的線程 private static class ReadMsg extends Thread { Socket socket; public ReadMsg(Socket socket) { this.socket = socket; } @Override public void run() { //負責socket讀寫的輸入流 try (BufferedReader br = new BufferedReader( new InputStreamReader(socket.getInputStream()))){ String line = null; //通過輸入流讀取服務端傳輸的數據 //如果已經讀到輸入流尾部,返回null,退出循環 //如果得到非空值,就將結果進行業務處理 while((line=br.readLine())!=null){ System.out.printf("%s ",line); } } catch (SocketException e) { System.out.printf("%s ", "服務器斷開了你的連接"); } catch (Exception e) { e.printStackTrace(); } finally { clear(); } } //必要的資源清理工作 private void clear() { if (socket != null) try { socket.close(); } catch (IOException e) { e.printStackTrace(); } } } } /** * 類說明:bio的服務端主程序 */ public class BioServer { //服務器端必須 private static ServerSocket server; //線程池,處理每個客戶端的請求 private static ExecutorService executorService = Executors.newFixedThreadPool(5); private static void start() throws IOException{ try{ //通過構造函數創建ServerSocket //如果端口合法且空閑,服務端就監聽成功 server = new ServerSocket(DEFAULT_PORT); System.out.println("服務器已啟動,端口號:" + DEFAULT_PORT); while(true){ Socket socket= server.accept(); System.out.println("有新的客戶端連接----" ); //當有新的客戶端接入時,打包成一個任務,投入線程池 executorService.execute(new BioServerHandler(socket)); } }finally{ if(server!=null){ server.close(); } } } public static void main(String[] args) throws IOException { start(); } } /** * 類說明: */ public class BioServerHandler implements Runnable{ private Socket socket; public BioServerHandler(Socket socket) { this.socket = socket; } public void run() { try(//負責socket讀寫的輸出、輸入流 BufferedReader in = new BufferedReader( new InputStreamReader(socket.getInputStream())); PrintWriter out = new PrintWriter(socket.getOutputStream(), true)){ String message; String result; //通過輸入流讀取客戶端傳輸的數據 //如果已經讀到輸入流尾部,返回null,退出循環 //如果得到非空值,就將結果進行業務處理 while((message = in.readLine())!=null){ System.out.println("Server accept message:"+message); result = response(message); //將業務結果通過輸出流返回給客戶端 out.println(result); } }catch(Exception e){ e.printStackTrace(); }finally{ if(socket != null){ try { socket.close(); } catch (IOException e) { e.printStackTrace(); } socket = null; } } } }
過程:
服務端提供IP和監聽端口
客戶端通過連接操作想服務端監聽的地址發起連接請求,通過三次握手連接
如果連接成功建立,雙方就可以通過套接字進行通信
最早的時候服務器端是針對一個連接新建一個線程來處理→演變成服務端針對每個客戶端連接把請求丟進線程池來處理任務
缺點:若高并發場景且處理時間稍長則許多請求會阻塞一直等待,嚴重影響性能.
先回顧下Linux下AIO模型:
原生JDK網絡編程AIO:
異步IO采用“訂閱-通知”模式:即應用程序向操作系統注冊IO監聽,然后繼續做自己的事情。當操作系統發生IO事件,并且準備好數據后,在主動通知應用程序,觸發相應的函數。
注意:異步IO里面客戶端和服務端均采用這種“訂閱-通知”模式.
AIO編程幾個核心類:
①:AsynchronousServerSocketChannel:類似BIO里面的ServerSocket
②:AsynchronousSocketChannel :類似BIO里面的socket用來通信,有三個方法:connect():用于連接到指定端口,指定IP地址的服務器,read()、write():完成讀寫
注意點:
1.這三個方法會執行就相當于上面圖解里面的Subscrible函數向操作系統監聽線程。
2.這幾個方法里面有個參數,比如write(ByteBuffer src,A attachment,CompletionHandler Channel可看做JDK對IO的抽象,除了網絡通道,還有文件通道FileChannel。
③:CompletionHandler:源碼注釋是異步IO操作中用來處理消費的結果,其實也就是結果回調函數,連接丶讀寫都是異步操作都需要實現此接口。
而CompletionHandler接口中定義了兩個方法,
completed(V result , A attachment):當IO完成時觸發該方法,該方法的第一個參數代表IO操作返回的對象,第二個參數代表發起IO操作時傳入的附加參數。
faild(Throwable exc, A attachment):當IO失敗時觸發該方法,第一個參數代表IO操作失敗引發的異常或錯誤。
先上代碼
客戶端:
/** * 類說明:aio的客戶端主程序 */ public class AioClient { //IO通信處理器 private static AioClientHandler clientHandle; public static void start(){ if(clientHandle!=null) return; clientHandle = new AioClientHandler(DEFAULT_SERVER_IP,DEFAULT_PORT); //負責網絡通訊的線程 new Thread(clientHandle,"Client").start(); } //向服務器發送消息 public static boolean sendMsg(String msg) throws Exception{ if(msg.equals("q")) return false; clientHandle.sendMessag(msg); return true; } public static void main(String[] args) throws Exception{ AioClient.start(); System.out.println("請輸入請求消息:"); Scanner scanner = new Scanner(System.in); while(AioClient.sendMsg(scanner.nextLine())); } } /** * 類說明:IO通信處理器,負責連接服務器,對外暴露對服務端發送數據的API */ public class AioClientHandler implements CompletionHandler,Runnable { private AsynchronousSocketChannel clientChannel; private String host; private int port; private CountDownLatch latch;//防止線程退出 public AioClientHandler(String host, int port) { this.host = host; this.port = port; try { //創建一個實際異步的客戶端通道 clientChannel = AsynchronousSocketChannel.open(); } catch (IOException e) { e.printStackTrace(); } } @Override public void run() { //創建CountDownLatch,因為是異步調用,下面的connect不會阻塞, // 那么整個run方法會迅速結束,那么負責網絡通訊的線程也會迅速結束 latch = new CountDownLatch(1); //發起異步連接操作,回調參數就是這個實例本身, // 如果連接成功會回調這個實例的completed方法 clientChannel.connect(new InetSocketAddress(host,port), null,this); try { latch.await(); clientChannel.close(); } catch (InterruptedException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } //連接成功,這個方法會被系統調用 @Override public void completed(Void result, AioClientHandler attachment) { System.out.println("已經連接到服務端。"); } //連接失敗,這個方法會被系統調用 @Override public void failed(Throwable exc, AioClientHandler attachment) { System.err.println("連接失敗。"); exc.printStackTrace(); latch.countDown(); try { clientChannel.close(); } catch (IOException e) { e.printStackTrace(); } } //對外暴露對服務端發送數據的API public void sendMessag(String msg){ /*為了把msg變成可以在網絡傳輸的格式*/ byte[] bytes = msg.getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length); writeBuffer.put(bytes); writeBuffer.flip(); /*進行異步寫,同樣的這個方法會迅速返回, 需要提供一個接口讓系統在一次網絡寫操作完成后通知我們的應用程序。 所以我們傳入一個實現了CompletionHandler的AioClientWriteHandler 第1個writeBuffer,表示我們要發送給服務器的數據; 第2個writeBuffer,考慮到網絡寫有可能無法一次性將數據寫完,需要進行多次網絡寫, 所以將writeBuffer作為附件傳遞給AioClientWriteHandler。 */ clientChannel.write(writeBuffer,writeBuffer, new AioClientWriteHandler(clientChannel,latch)); } } /** * 類說明:網絡寫的處理器,CompletionHandler 中 * Integer:本次網絡寫操作完成實際寫入的字節數, * ByteBuffer:寫操作的附件,存儲了寫操作需要寫入的數據 */ public class AioClientWriteHandler implements CompletionHandler { private AsynchronousSocketChannel clientChannel; private CountDownLatch latch; public AioClientWriteHandler(AsynchronousSocketChannel clientChannel, CountDownLatch latch) { this.clientChannel = clientChannel; this.latch = latch; } @Override public void completed(Integer result, ByteBuffer buffer) { //有可能無法一次性將數據寫完,需要檢查緩沖區中是否還有數據需要繼續進行網絡寫 if(buffer.hasRemaining()){ clientChannel.write(buffer,buffer,this); }else{ //寫操作已經完成,為讀取服務端傳回的數據建立緩沖區 ByteBuffer readBuffer = ByteBuffer.allocate(1024); /*這個方法會迅速返回,需要提供一個接口讓 系統在讀操作完成后通知我們的應用程序。*/ clientChannel.read(readBuffer,readBuffer, new AioClientReadHandler(clientChannel,latch)); } } @Override public void failed(Throwable exc, ByteBuffer attachment) { System.err.println("數據發送失敗..."); try { clientChannel.close(); latch.countDown(); } catch (IOException e) { } } } /** * 類說明:網絡讀的處理器 * CompletionHandler 中 * Integer:本次網絡讀操作實際讀取的字節數, * ByteBuffer:讀操作的附件,存儲了讀操作讀到的數據 * */ public class AioClientReadHandler implements CompletionHandler { private AsynchronousSocketChannel clientChannel; private CountDownLatch latch; public AioClientReadHandler(AsynchronousSocketChannel clientChannel, CountDownLatch latch) { this.clientChannel = clientChannel; this.latch = latch; } @Override public void completed(Integer result,ByteBuffer buffer) { buffer.flip(); byte[] bytes = new byte[buffer.remaining()]; buffer.get(bytes); String msg; try { msg = new String(bytes,"UTF-8"); System.out.println("accept message:"+msg); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } @Override public void failed(Throwable exc,ByteBuffer attachment) { System.err.println("數據讀取失敗..."); try { clientChannel.close(); latch.countDown(); } catch (IOException e) { } } }
服務端:
/** * 類說明:服務器主程序 */ public class AioServer { private static AioServerHandler serverHandle; //統計客戶端個數 public volatile static long clientCount = 0; public static void start(){ if(serverHandle!=null) return; serverHandle = new AioServerHandler(DEFAULT_PORT); new Thread(serverHandle,"Server").start(); } public static void main(String[] args){ AioServer.start(); } } /** * 類說明:處理用戶連接的處理器 */ public class AioAcceptHandler implements CompletionHandler{ @Override public void completed(AsynchronousSocketChannel channel, AioServerHandler serverHandler) { AioServer.clientCount++; System.out.println("連接的客戶端數:" + AioServer.clientCount); //重新注冊監聽,讓別的客戶端也可以連接 serverHandler.channel.accept(serverHandler,this); ByteBuffer readBuffer = ByteBuffer.allocate(1024); //1)ByteBuffer dst:接收緩沖區,用于從異步Channel中讀取數據包; //2)? A attachment:異步Channel攜帶的附件,通知回調的時候作為入參使用; //3)? CompletionHandler :系統回調的業務handler,進行讀操作 channel.read(readBuffer,readBuffer, new AioReadHandler(channel)); } @Override public void failed(Throwable exc, AioServerHandler serverHandler) { exc.printStackTrace(); serverHandler.latch.countDown(); } } /** * 類說明:讀數據的處理器 */ public class AioReadHandler implements CompletionHandler { private AsynchronousSocketChannel channel; public AioReadHandler(AsynchronousSocketChannel channel) { this.channel = channel; } //讀取到消息后的處理 @Override public void completed(Integer result, ByteBuffer attachment) { //如果條件成立,說明客戶端主動終止了TCP套接字,這時服務端終止就可以了 if(result == -1) { try { channel.close(); } catch (IOException e) { e.printStackTrace(); } return; } //flip操作 attachment.flip(); byte[] message = new byte[attachment.remaining()]; attachment.get(message); try { System.out.println(result); String msg = new String(message,"UTF-8"); System.out.println("server accept message:"+msg); String responseStr = response(msg); //向客戶端發送消息 doWrite(responseStr); } catch (Exception e) { e.printStackTrace(); } } //發送消息 private void doWrite(String result) { byte[] bytes = result.getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length); writeBuffer.put(bytes); writeBuffer.flip(); //異步寫數據 channel.write(writeBuffer, writeBuffer, new CompletionHandler () { @Override public void completed(Integer result, ByteBuffer attachment) { if(attachment.hasRemaining()){ channel.write(attachment,attachment,this); }else{ //讀取客戶端傳回的數據 ByteBuffer readBuffer = ByteBuffer.allocate(1024); //異步讀數據 channel.read(readBuffer,readBuffer, new AioReadHandler(channel)); } } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { channel.close(); } catch (IOException e) { e.printStackTrace(); } } }); } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { this.channel.close(); } catch (IOException e) { e.printStackTrace(); } } } /** * 類說明:響應網絡操作的處理器 */ public class AioServerHandler implements Runnable { public CountDownLatch latch; /*進行異步通信的通道*/ public AsynchronousServerSocketChannel channel; public AioServerHandler(int port) { try { //創建服務端通道 channel = AsynchronousServerSocketChannel.open(); //綁定端口 channel.bind(new InetSocketAddress(port)); System.out.println("Server is start,port:"+port); } catch (IOException e) { e.printStackTrace(); } } @Override public void run() { latch = new CountDownLatch(1); //用于接收客戶端的連接,異步操作, // 需要實現了CompletionHandler接口的處理器處理和客戶端的連接操作 channel.accept(this,new AioAcceptHandler()); try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } } }
疑難點1:
怎么理解這里客戶端寫操作的處理器回調方法?
客戶端把ByteBuffer里面的數據寫到AsynchronousSocketChannel這個管道上,
如果ByteBuffer里面數據很大,超過了管道容量,這時會先完成寫操作,服務端收到數據回調這個completed方法
則需要ByteBuffer再寫入剩下的數據到管道里,每發完一次數據通知一次,這個管道容量取決于網卡的緩沖區。這個completed方法并不是說ByteBuffer的數據寫完了,而是當前網卡這份數據寫完了.
疑難點2:
Buffer:
查看源碼可看到幾個重要屬性:
capacity:表示分配的內存大小
position:類似指針類的索引,讀取或寫入的位置標識符,下一個可寫入的初始位置/下一個可讀取的初始位置
limit:可讀或可寫的范圍,小于等于capacity,當小于capacity,limit到capaticy的最大容量值的這段空間不予寫入是放一些初始化值的.
ByteBuffer可以理解為放在內存中的一個數組。
比如圖中一開始是寫入模式,寫入五個字節,地址為0-4,position在5,調用flip方法后切換到讀模式,position變為0即開始序列,limit變為5,這樣就可以buffer開頭開始讀取了.
應用場景:
可以服務端用AIO模型,客戶端使用BIO簡化編程,本文的例子即可調試,啟動AioServer再啟動BioClient,通信是沒問題的
AIO編程相對復雜,代碼中一些關鍵方法都有注釋,目前Linux下沒有真正意義上的AIO,實際上是用了NIO里面的epoll(true),底層原理還是用了IO復用(NIO).windows實現了AIO,AIO是未來的方向,需待linux內核支持.
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/74892.html
摘要:后改良為用線程池的方式代替新增線程,被稱為偽異步。最大的問題是阻塞,同步。每次請求都由程序執行并返回,這是同步的缺陷。這些都會被注冊在多路復用器上。多路復用器提供選擇已經就緒狀態任務的能力。并沒有采用的多路復用器,而是使用異步通道的概念。 Netty是一個提供異步事件驅動的網絡應用框架,用以快速開發高性能、高可靠的網絡服務器和客戶端程序。Netty簡化了網絡程序的開發,是很多框架和公司...
摘要:后改良為用線程池的方式代替新增線程,被稱為偽異步。最大的問題是阻塞,同步。每次請求都由程序執行并返回,這是同步的缺陷。這些都會被注冊在多路復用器上。多路復用器提供選擇已經就緒狀態任務的能力。并沒有采用的多路復用器,而是使用異步通道的概念。 Netty是一個提供異步事件驅動的網絡應用框架,用以快速開發高性能、高可靠的網絡服務器和客戶端程序。Netty簡化了網絡程序的開發,是很多框架和公司...
摘要:即可以理解為,方法都是異步的,完成后會主動調用回調函數。主要在包下增加了下面四個異步通道其中的方法,會返回一個帶回調函數的對象,當執行完讀取寫入操作后,直接調用回調函數。 本文原創地址,我的博客:jsbintask.cn/2019/04/16/…(食用效果最佳),轉載請注明出處! 在理解什么是BIO,NIO,AIO之前,我們首先需要了解什么是同步,異步,阻塞,非阻塞。假如我們現在要去銀行取...
時間:2018年04月11日星期三 說明:本文部分內容均來自慕課網。@慕課網:https://www.imooc.com 教學源碼:https://github.com/zccodere/s... 學習源碼:https://github.com/zccodere/s... 第一章:課程介紹 1-1 課程介紹 什么是Netty 高性能、事件驅動、異步非阻塞的IO Java開源框架 基于NIO的客戶...
閱讀 811·2023-04-25 20:18
閱讀 2092·2021-11-22 13:54
閱讀 2527·2021-09-26 09:55
閱讀 3857·2021-09-22 15:28
閱讀 2969·2021-09-03 10:34
閱讀 1710·2021-07-28 00:15
閱讀 1629·2019-08-30 14:25
閱讀 1281·2019-08-29 17:16