摘要:任務隊列最主要的功能就是解耦高耗時的操作,否則程序會一直等在那里,浪費大量資源。我們將任務封裝成一個消息發(fā)送給隊列,后臺的任務進程會得到這個任務并執(zhí)行它,而且可以配置多個任務進程,進一步加大吞吐率。為了確保消息不丟失,支持消息確認。
推廣
https://segmentfault.com/l/15...
我們利用消息隊列實現(xiàn)了分布式事務的最終一致性解決方案,請大家圍觀。可以參考源碼:https://github.com/vvsuperman…,項目支持網站: http://rabbitmq.org.cn,最新文章或實現(xiàn)會更新在上面
前言在第一篇中我們描述了如何最簡單的RabbitMQ操作,如何發(fā)送、接受消息。在今天這篇文章中我們將描述如何創(chuàng)建一個任務隊列,來將高耗時的任務分發(fā)到多個消費者,從而提高處理效率。
任務隊列最主要的功能就是解耦高耗時的操作,否則程序會一直等在那里,浪費大量資源。反之我們會把這個操作交給隊列,讓它延后再做。我們將任務封裝成一個消息發(fā)送給隊列,后臺的任務進程會得到這個任務并執(zhí)行它,而且可以配置多個任務進程,進一步加大吞吐率。
特別是對于網絡請求,一次短短的HTTP請求是要求迅速響應的,不可能讓它一直停頓在高耗時操作上。
準備工作在第一章中我們發(fā)送了“Hello World!”。現(xiàn)在來完成更復雜一點的,因為這里并沒有真正的高耗時操作,比如縮放圖像或輸出一個pdf。因此我們只是用Thread.sleep()來假裝我們很繁忙,而且會用"."來表示需要停頓的秒數(shù),比如一個叫Hello...的任務將停頓3秒鐘。
我們簡單的更改下Send.java,稱之為 NewTask.java.
String message = getMessage(argv); channel.basicPublish("", "hello", null, message.getBytes()); System.out.println(" [x] Sent "" + message + """);
然后是工具類
private static String getMessage(String[] strings){ if (strings.length < 1) return "Hello World!"; return joinStrings(strings, " "); } private static String joinStrings(String[] strings, String delimiter) { int length = strings.length; if (length == 0) return ""; StringBuilder words = new StringBuilder(strings[0]); for (int i = 1; i < length; i++) { words.append(delimiter).append(strings[i]); } return words.toString(); }
當然,我們的Recv.java也需要進行一些改造,它需要對每一個"."停頓1秒,Work.java如下
final Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received "" + message + """); try { doWork(message); } finally { System.out.println(" [x] Done"); } } }; boolean autoAck = true; // acknowledgment is covered below channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer); private static void doWork(String task) throws InterruptedException { for (char ch: task.toCharArray()) { if (ch == ".") Thread.sleep(1000); } }
編譯上面這些代碼
javac -cp $CP NewTask.java Worker.java輪詢調度
任務隊列的一個最大優(yōu)點是可以并行工作,能夠非常容易的水平擴張。
首先,讓我們同時運行兩個工作線程,他們能夠同時從隊列獲取消息。我們也需要同時開啟3個console:1個生產者,2個消費者
消費者C1
# shell 1 java -cp $CP Worker # => [*] Waiting for messages. To exit press CTRL+C
消費者C2
# shell 2 java -cp $CP Worker # => [*] Waiting for messages. To exit press CTRL+C
讓我們運行生產者
# shell 3 java -cp $CP NewTask # => First message. java -cp $CP NewTask # => Second message.. java -cp $CP NewTask # => Third message... java -cp $CP NewTask # => Fourth message.... java -cp $CP NewTask # => Fifth message.....
讓我們看看消費者們
消費者C1
java -cp $CP Worker # => [*] Waiting for messages. To exit press CTRL+C # => [x] Received "First message." # => [x] Received "Third message..." # => [x] Received "Fifth message....."
消費者C2
java -cp $CP Worker # => [*] Waiting for messages. To exit press CTRL+C # => [x] Received "Second message.." # => [x] Received "Fourth message..
RabbitMQ默認有序的將會發(fā)送消息給下一個消費者,所以每一個消費者都會得到相同數(shù)量的消息,這種方式就叫做輪詢調度(round-robin),你可以嘗試下更多的消費者
消息確認一個任務可能非常耗時,如果消費者在做一個高耗時任務時掛掉了,我們將會丟失所有發(fā)送到這個消費者上的消息。這是非常不可取的,所以我們希望能夠明確的知道消息是否消費成功,如果一個消費掛了,我們能夠知道,并且將消息發(fā)送給下一個消費者。
為了確保消息不丟失,RabbitMQ支持消息確認。收到消息后消費者會給RabbitMQ服務器發(fā)送一個ack(我已經收到消息了),RabbitMQ就會在服務上刪除這個消息了。
如果一個消費者掛了(連接關閉,channel關閉,或者是TCP連接丟失)而沒有發(fā)送ack,RabbitMQ就會知道消息并沒有消費成功,于是乎消息會被放到消息隊列重新消費。如果此時還有其它消費者的話,消息會發(fā)送給其它消費者來消費,確保消息不會丟失
消息并沒有超時時間這個概念,消息只會在消費者掛掉了時候重發(fā),即使是一個非常非常耗時的的消費者也不會發(fā)生重發(fā)
手動消息確認(Manual message acknowledgments)默認是打開的,雖然我們之前關閉了它:autoAck=true。讓我們先將它設置為false
channel.basicQos(1); // accept only one unack-ed message at a time (see below) final Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received "" + message + """); try { doWork(message); } finally { System.out.println(" [x] Done"); channel.basicAck(envelope.getDeliveryTag(), false); } } }; boolean autoAck = false; channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);
這樣一來,即使你使用CTRL+C強制殺死了一個消費者,消費者所丟失的消息也將會被重發(fā),會被另一個消費者所接受并消費。
忘記應答很容易犯忘記應答的錯誤,但會導致非常嚴重的后果。Messages會被重發(fā),RabbitMQ會消耗越來越多的內存因為unacked的消息無法釋放(甚至更嚴重,RabbitMQ內部維護了一個最大打開線程數(shù),如果太多的消息沒有應答,RabbitMQ甚至會整個崩潰掉)
你可以用Rabbitmqctl查看未被應答的消息數(shù)
sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
windows下:
rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged消息持久化
我們現(xiàn)在知道了可以通過應答來保證消息不丟失,但萬一RabbitMQ掛了呢?還是可能會導致消息丟失。因此我們可以通過持久化的機制,包括將隊列以及隊列中的消息持久化的方式,來保證即便RabbitMQ掛了,當它重啟的時候,隊列以及消息也能夠恢復
首先做隊列的持久化,聲明隊列為durable
boolean durable = true; channel.queueDeclare("hello", durable, false, false, null);
但很可惜的是,這種聲明方式并不適用與上面的方法,因為我們已經將“Hello”定義為一個非持久化的隊列了,是不能再將他改為持久化的,如果這樣做,將會直接返回一個error信息。所以,我們需要重新再定義一個隊列
boolean durable = true; channel.queueDeclare("task_queue", durable, false, false, null);
在保證隊列的持久化后需要保證消息的持久化-將消息設置為PERSISTENT_TEXT_PLAIN
import com.rabbitmq.client.MessageProperties; channel.basicPublish("", "task_queue", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());公平分發(fā)
但這樣還是存在問題:假設有如下的情形,一個消費者非常耗時,而一個消費者非???,由于消息都是公平的發(fā)送,所以它們都是接收到相同數(shù)量的消息,會導致一個消費者非常忙碌,而另外一個消費者非??臻e,而RabbitMQ無法得知這一點。
為了解決這個缺陷我們引入了basicQos方法以及prefetchCount =1的設置。這會告訴RabbitMQ一次只給消費者一個消息:如果這個消息未確認,將不會發(fā)送新的消息,從而它會將消息發(fā)送給其它并不那么忙的消費者
int prefetchCount = 1; channel.basicQos(prefetchCount);留意queue size
如果所有的消費者都非常忙,隊列可能會很快被填滿,所以你需要留意這一點,要么增加更多的消費者,或者采取其它的策略。
整合NewTask.java
import java.io.IOException; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; import com.rabbitmq.client.MessageProperties; public class NewTask { private static final String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] argv) throws java.io.IOException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); String message = getMessage(argv); channel.basicPublish( "", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); System.out.println(" [x] Sent "" + message + """); channel.close(); connection.close(); } //... }
Worker.java
import com.rabbitmq.client.*; import java.io.IOException; public class Worker { private static final String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); final Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); channel.basicQos(1); final Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received "" + message + """); try { doWork(message); } finally { System.out.println(" [x] Done"); channel.basicAck(envelope.getDeliveryTag(), false); } } }; boolean autoAck = false; channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer); } private static void doWork(String task) { for (char ch : task.toCharArray()) { if (ch == ".") { try { Thread.sleep(1000); } catch (InterruptedException _ignored) { Thread.currentThread().interrupt(); } } } } }
使用消息確認和prefetchCount你就能設置一個持久化隊列了,同時,使用durable和persist,,即使RabbitMQ掛掉了,重啟后也能夠重發(fā)消息
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/68099.html
摘要:推廣專題講座開源項目我們利用消息隊列實現(xiàn)了分布式事務的最終一致性解決方案,請大家圍觀。因此一旦有有消息,消息會廣播到所有的消費者。如此一來路由器就能夠把消息發(fā)送給相應的隊列了。 推廣 RabbitMQ專題講座 https://segmentfault.com/l/15... CoolMQ開源項目 我們利用消息隊列實現(xiàn)了分布式事務的最終一致性解決方案,請大家圍觀??梢詤⒖荚创a:https...
摘要:作為消息隊列的一個典型實踐,完全實現(xiàn)了標準,與的快快快不同,它追求的穩(wěn)定可靠。同一個隊列不僅可以綁定多個生產者,而且能夠發(fā)送消息到多個消費者。消費者接受并消費消息。幾乎于完全類似是一個繼承了接口的類,方便我們來存儲消息隊列來的消息。 推廣 RabbitMQ專題講座 https://segmentfault.com/l/15... CoolMQ開源項目 我們利用消息隊列實現(xiàn)了分布式事務的...
摘要:可以參考源碼,項目支持網站,最新文章或實現(xiàn)會更新在上面前言在訂閱發(fā)布中我們建立了一個簡單的日志系統(tǒng),從而將消息廣播給一些消費者。因此,發(fā)送到路由鍵的消息會發(fā)送給隊列,發(fā)送到路由鍵或者的消息會發(fā)送給,其它的消息將被丟棄。 推廣 RabbitMQ專題講座 https://segmentfault.com/l/15... CoolMQ開源項目 我們利用消息隊列實現(xiàn)了分布式事務的最終一致性解決...
摘要:推廣專題講座開源項目我們利用消息隊列實現(xiàn)了分布式事務的最終一致性解決方案,請大家圍觀。主題交換機也可以當成其它交換機來使用,假如隊列綁定到了那么它會接收所有的消息,就像廣播路由器一樣而如果未使用,那么就跟直達路由器一樣了。 推廣 RabbitMQ專題講座 https://segmentfault.com/l/15... CoolMQ開源項目 我們利用消息隊列實現(xiàn)了分布式事務的最終一致性...
摘要:因為消費消息是在另外一個進程中,我們需要阻塞我們的進程直到結果返回,使用阻塞隊列是一種非常好的方式,這里我們使用了長度為的,的功能是檢查消息的的是不是我們之前所發(fā)送的,如果是,將返回值返回到。 推廣 RabbitMQ專題講座 https://segmentfault.com/l/15... CoolMQ開源項目 我們利用消息隊列實現(xiàn)了分布式事務的最終一致性解決方案,請大家圍觀??梢詤⒖?..
閱讀 3114·2021-11-23 09:51
閱讀 1974·2021-09-09 09:32
閱讀 1084·2019-08-30 15:53
閱讀 2957·2019-08-30 11:19
閱讀 2464·2019-08-29 14:15
閱讀 1432·2019-08-29 13:52
閱讀 553·2019-08-29 12:46
閱讀 2818·2019-08-26 12:18