摘要:因為消費消息是在另外一個進程中,我們需要阻塞我們的進程直到結果返回,使用阻塞隊列是一種非常好的方式,這里我們使用了長度為的,的功能是檢查消息的的是不是我們之前所發送的,如果是,將返回值返回到。
推廣
https://segmentfault.com/l/15...
我們利用消息隊列實現了分布式事務的最終一致性解決方案,請大家圍觀。可以參考源碼:https://github.com/vvsuperman…,項目支持網站: http://rabbitmq.org.cn,最新文章或實現會更新在上面
聲明RPC接口為了闡述RPC我們先建立一個客戶端接口,它有一個方法,會發起一個RPC請求,而且會一直阻塞直到有結果返回
FibonacciRpcClient fibonacciRpc = new FibonacciRpcClient(); String result = fibonacciRpc.call("4"); System.out.println( "fib(4) is " + result);
留意RPC
雖然RPC很常見,但一定要非常小心的使用它,假設rpc調用的是一個非常慢的程序,將導致結果不可預料,而且非常難以調試。
使用RPC時你可以參考下列一些規范
系統設計上要有詳細的文檔描述,使組件間的依賴講清晰,做到有據可查
做好錯誤的異常處理,特別是當RPC服務掛掉或很長時間沒有響應時
盡量少用RPC,而使用異步管道,而非阻塞式的RPC,降低系統間的耦合
回調隊列(Callback queue)用RabbitMQ實現RPC比較簡單,客戶端發起請求,服務端返回對這個請求的響應。為了實現這個功能我們需要一個能夠"回調"的隊列,我們直接用默認的隊列即可
callbackQueueName = channel.queueDeclare().getQueue(); BasicProperties props = new BasicProperties .Builder() .replyTo(callbackQueueName) .build(); channel.basicPublish("", "rpc_queue", props, message.getBytes()); // ... then code to read a response message from the callback_queue ...消息屬性(Message properties)
AMQP 0-9-1 協議為每個消息定義了14個屬性,很多屬性很少會被用到,但我們要特別留意如下幾個
分發模式(deliveryMode): 標記一個消息是否需要持久化(persistent)或者是需要事務(transient)等,在第二章中有描述
消息體類型(contentType): 描述消息中傳遞具體內容的編碼方式,比如我們經常使用的JSON可以設置成:application/json
消息回應(replyTo):用于回調隊列
關系Id(correlationId): 用于將RPC的返回值關聯到對應的請求。
我們需要引入相應的包
import com.rabbitmq.client.AMQP.BasicProperties;關系Id(Correlation Id)
在前面的方法中我們為每一個RPC請求都生成了一個隊列,這是完全沒有必要的,我們為每一個客戶端建立一個隊列就可以了。
這會引起一個新的問題,因為所有的RPC都是用一個隊列,一旦有消息返回,你怎么知道返回的消息對應的是哪個請求呢?所以我們就用到了Correlation Id,作為每個請求獨一無二的標識,當我們收到返回值后,會檢查這個Id,匹配對應的響應。如果找不到Id所對應的請求,會直接拋棄它。
這里你可能會有疑問,為什么要拋棄掉未知消息呢?而不是拋出異常啥的。這跟我們服務端的競態條件(possibility of a race condition )會有關系。比如假設我們RabbitMQ服務掛掉了,它剛給我們回復消息,還沒等到回應,服務器就掛掉了,那么當RabbitMQ服務重啟時,會重發消息,客戶端會收到一條重復的消息,為了冥等性的考慮,我們需要仔細的處理返回后的處理方式。
小結RPC工作過程如下
當客戶端啟動時,它會創建一個獨立的匿名回調隊列,然后發送RPC請求,這個RPC
請求會帶兩個屬性:replyTo - RPC調用成功后需要返回的隊列名稱;correlationId - 每個請求獨一無二的標識。RPC服務提供者會等在隊列上,一旦有請求到達,它會立即響應,把自己的活干完,然后返回一個結果,根據replyTo返回到對應的隊列。而客戶端也會等著隊列中的信息返回,一旦有一個消息出現,會檢查correlationId,將結果返回給響應的請求發起者
Fibonacci級數
private static int fib(int n) { if (n == 0) return 0; if (n == 1) return 1; return fib(n-1) + fib(n-2); }
我們定義個一個fibonacci級數,只能接受正整數,而且是效率不怎么高的那種。
rpc.java如下所示
import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class RPCServer { private static final String RPC_QUEUE_NAME = "rpc_queue"; public static void main(String[] argv) { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = null; try { connection = factory.newConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null); channel.basicQos(1); System.out.println(" [x] Awaiting RPC requests"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { AMQP.BasicProperties replyProps = new AMQP.BasicProperties .Builder() .correlationId(properties.getCorrelationId()) .build(); String response = ""; try { String message = new String(body,"UTF-8"); int n = Integer.parseInt(message); System.out.println(" [.] fib(" + message + ")"); response += fib(n); } catch (RuntimeException e){ System.out.println(" [.] " + e.toString()); } finally { channel.basicPublish( "", properties.getReplyTo(), replyProps, response.getBytes("UTF-8")); channel.basicAck(envelope.getDeliveryTag(), false); // RabbitMq consumer worker thread notifies the RPC server owner thread synchronized(this) { this.notify(); } } } }; channel.basicConsume(RPC_QUEUE_NAME, false, consumer); // Wait and be prepared to consume the message from RPC client. while (true) { synchronized(consumer) { try { consumer.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } } catch (IOException | TimeoutException e) { e.printStackTrace(); } finally { if (connection != null) try { connection.close(); } catch (IOException _ignore) {} } } }
服務端的代碼比較直接,首先建立連接,建立channel以及聲明隊列。我們之后可能會建立多個消費者,為了更好的負載均衡,需要在channel.basicQos中設置prefetchCount,然后設置一個basicConsume監聽隊列,提供一個回調函數來處理請求以及返回值
RPCClient.java
import com.rabbitmq.client.*; import java.io.IOException; import java.util.UUID; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeoutException; public class RPCClient { private Connection connection; private Channel channel; private String requestQueueName = "rpc_queue"; private String replyQueueName; public RPCClient() throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); connection = factory.newConnection(); channel = connection.createChannel(); replyQueueName = channel.queueDeclare().getQueue(); } public String call(String message) throws IOException, InterruptedException { String corrId = UUID.randomUUID().toString(); AMQP.BasicProperties props = new AMQP.BasicProperties .Builder() .correlationId(corrId) .replyTo(replyQueueName) .build(); channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8")); final BlockingQueueresponse = new ArrayBlockingQueue (1); channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { if (properties.getCorrelationId().equals(corrId)) { response.offer(new String(body, "UTF-8")); } } }); return response.take(); } public void close() throws IOException { connection.close(); } //... }
客戶端代碼如下,我們建立一個連接,聲明一個"callback"隊列,我們將會往"callback"隊列提交消息,并接收RPC的返回值,具體步驟如下:
我們首先生成一個唯一的correlation Id,并保存,我們將會使用它來區分之后所接受到的信息。然后發出這個消息,消息會包含兩個屬性: replyTo以及collelationId。因為消費消息是在另外一個進程中,我們需要阻塞我們的進程直到結果返回,使用阻塞隊列BlockingQueue是一種非常好的方式,這里我們使用了長度為1的ArrayBlockQueue,handleDelivery的功能是檢查消息的的correlationId是不是我們之前所發送的,如果是,將返回值返回到BlockingQueue。此時主線程會等待返回并從ArrayBlockQueue取到返回值
從客戶端發起請求
RPCClient fibonacciRpc = new RPCClient(); System.out.println(" [x] Requesting fib(30)"); String response = fibonacciRpc.call("30"); System.out.println(" [.] Got "" + response + """); fibonacciRpc.close();
源代碼參考RPCClient.java 和 RPCServer.java
編譯
javac -cp $CP RPCClient.java RPCServer.java
我們的rpc服務端好了,啟動服務
java -cp $CP RPCServer # => [x] Awaiting RPC requests
為了獲取fibonacci級數我們只需要運行客戶端:
java -cp $CP RPCClient # => [x] Requesting fib(30)
以上的實現方式并非建立RPC請求唯一的方式,但是它有很多優點:如果一個RPC服務過于緩慢,你可以非常方便的水平擴展,只需要增加消費者的個數即可,我們的代碼還是比較簡單的,有些負責的問題并未解決,比如
如果服務全部掛了,客戶端要如何處理
如果服務超時該如何處理
非法信息該如何處理
基礎章節的內容到此就結束了,到這里,你就能夠基本明白消息隊列的基本用法,接下來我們可以進入中級內容內容的學習了。
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/68133.html
摘要:如果涉及返回值,就要用到本章提到的了。方法發送請求,并阻塞知道結果返回。當有消息時,進行計算并通過指定的發送給客戶端。當接收到,則檢查。如果和之前的匹配,則將消息返回給應用進行處理。 RPC模式 在第二章中我們學習了如何使用Work模式在多個worker之間派發時間敏感的任務。這種情況是不涉及到返回值的,worker執行任務就好。如果涉及返回值,就要用到本章提到的RPC(Remote ...
摘要:有助于將響應與請求關聯起來。如果發生這種情況,重新啟動的服務器將再次處理請求。又名服務器正在等待該隊列上的請求。當消息出現時,它檢查屬性。然后,我們進入循環,在其中等待請求消息,完成工作并發送響應。 (using php-amqplib) 前提必讀 本教程假設RabbitMQ是安裝在標準端口上運行(5672)。如果您使用不同的主機、端口或憑據,則連接設置需要調整。 如果您在本教程中遇到...
摘要:作為消息隊列的一個典型實踐,完全實現了標準,與的快快快不同,它追求的穩定可靠。同一個隊列不僅可以綁定多個生產者,而且能夠發送消息到多個消費者。消費者接受并消費消息。幾乎于完全類似是一個繼承了接口的類,方便我們來存儲消息隊列來的消息。 推廣 RabbitMQ專題講座 https://segmentfault.com/l/15... CoolMQ開源項目 我們利用消息隊列實現了分布式事務的...
摘要:推廣專題講座開源項目我們利用消息隊列實現了分布式事務的最終一致性解決方案,請大家圍觀。因此一旦有有消息,消息會廣播到所有的消費者。如此一來路由器就能夠把消息發送給相應的隊列了。 推廣 RabbitMQ專題講座 https://segmentfault.com/l/15... CoolMQ開源項目 我們利用消息隊列實現了分布式事務的最終一致性解決方案,請大家圍觀。可以參考源碼:https...
摘要:可以參考源碼,項目支持網站,最新文章或實現會更新在上面前言在訂閱發布中我們建立了一個簡單的日志系統,從而將消息廣播給一些消費者。因此,發送到路由鍵的消息會發送給隊列,發送到路由鍵或者的消息會發送給,其它的消息將被丟棄。 推廣 RabbitMQ專題講座 https://segmentfault.com/l/15... CoolMQ開源項目 我們利用消息隊列實現了分布式事務的最終一致性解決...
閱讀 2284·2023-04-25 16:42
閱讀 1198·2021-11-22 14:45
閱讀 2329·2021-10-19 13:10
閱讀 2821·2021-09-29 09:34
閱讀 3398·2021-09-23 11:21
閱讀 2094·2021-08-12 13:25
閱讀 2176·2021-07-30 15:15
閱讀 3488·2019-08-30 15:54