摘要:每個(gè)消費(fèi)者會(huì)得到平均數(shù)量的。為了確保不會(huì)丟失,采用確認(rèn)機(jī)制。如果中斷退出了關(guān)閉了,關(guān)閉了,或是連接丟失了而沒(méi)有發(fā)送,會(huì)認(rèn)為該消息沒(méi)有完整的執(zhí)行,會(huì)將該消息重新入隊(duì)。該消息會(huì)被發(fā)送給其他的。當(dāng)消費(fèi)者中斷退出,會(huì)重新分派。
Work模式
原文地址
在第一章中,我們寫(xiě)了通過(guò)一個(gè)queue來(lái)發(fā)送和接收message的簡(jiǎn)單程序。在這一章中,我們會(huì)創(chuàng)建一個(gè)workqueue,來(lái)將執(zhí)行時(shí)間敏感的任務(wù)分發(fā)到多個(gè)worker中。
work模式主要的意圖是要避免等待完成一個(gè)耗時(shí)的任務(wù)。取而代之地,我們延遲任務(wù)的執(zhí)行,將任務(wù)封裝成消息,將之發(fā)送到queue。一個(gè)運(yùn)行著的worker進(jìn)程會(huì)彈出這個(gè)任務(wù)并執(zhí)行它。當(dāng)運(yùn)行多個(gè)worker進(jìn)程時(shí),任務(wù)會(huì)在它們之間分派。
這種模式在web應(yīng)用中特別有用,因?yàn)樵谝粋€(gè)較短的HTTP請(qǐng)求窗口中不會(huì)去執(zhí)行一個(gè)復(fù)雜的任務(wù)。
準(zhǔn)備工作在上一章中,我們發(fā)送了一個(gè)”Hello World!"的message。現(xiàn)在我們將發(fā)送一個(gè)代表了復(fù)雜任務(wù)的字符串。這不是一個(gè)實(shí)際的任務(wù),比如像調(diào)整圖片大小或是重新渲染pdf文檔,我們通Thead.sleep() 來(lái)模擬一個(gè)耗時(shí)的任務(wù)。message中的小圓點(diǎn)表示其復(fù)雜度,圓點(diǎn)越多則任務(wù)的執(zhí)行越耗時(shí)。比如“Hello..."的message將耗時(shí)3秒。
我們簡(jiǎn)單的修改上一章的Send.java代碼,允許在命令行發(fā)送任意message。新的類(lèi)叫做NewTask.java
String message = String.join(" ", argv); channel.basicPublish("", "hello", null, message.getBytes()); System.out.println(" [x] Sent "" + message + """);
同樣的,我們修改上一章中的Recv.java,讓它在處理message的時(shí)候根據(jù)小圓點(diǎn)進(jìn)行睡眠。新的類(lèi)叫Worker.java
DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received "" + message + """); try { doWork(message); } finally { System.out.println(" [x] Done"); } }; boolean autoAck = true; // acknowledgment is covered below channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });
private static void doWork(String task) throws InterruptedException { for (char ch: task.toCharArray()) { if (ch == ".") Thread.sleep(1000); } }
像在第一章一樣編譯這兩個(gè)類(lèi)
javac -cp $CP NewTask.java Worker.javaRound-robin分派
使用Task模式的一個(gè)明顯的優(yōu)勢(shì)是讓并行執(zhí)行任務(wù)變得簡(jiǎn)單。我們只需要啟動(dòng)更多的worker就可以消減堆積的message,系統(tǒng)水平擴(kuò)展簡(jiǎn)單。
首先,我們?cè)谕粫r(shí)間啟動(dòng)兩個(gè)worker。他們都會(huì)從queue獲得message,來(lái)看一下具體細(xì)節(jié)。
打開(kāi)了三個(gè)終端,兩個(gè)是跑worker的。
java -cp $CP Worker
java -cp $CP Worker
第三個(gè)終端里來(lái)發(fā)布新的任務(wù)message。
java -cp $CP NewTask First message. java -cp $CP NewTask Second message.. java -cp $CP NewTask Third message... java -cp $CP NewTask Fourth message.... java -cp $CP NewTask Fifth message.....
讓我們看看worker的處理message的情況.第一個(gè)worker收到了第1,3,5message,第二個(gè)worker收到了第2,4個(gè)message。
默認(rèn)情況下,RabbitMQ會(huì)順序的將message發(fā)給下一個(gè)消費(fèi)者。每個(gè)消費(fèi)者會(huì)得到平均數(shù)量的message。這種方式稱(chēng)之為round-robin(輪詢(xún)).
Message 確認(rèn)執(zhí)行任務(wù)需要一定的時(shí)間。你可能會(huì)好奇如果一個(gè)worker開(kāi)始執(zhí)行任務(wù),但是中途異常退出,會(huì)是什么結(jié)果。在我們現(xiàn)在的代碼中,一旦RabbitMQ將消息發(fā)送出去了,它會(huì)立即將該message刪除。這樣的話(huà),就可能丟失message。
在實(shí)際場(chǎng)景中,我們不想丟失任何一個(gè)task。如果一個(gè)worker異常中斷了,我們希望這個(gè)task能分派給另一個(gè)worker。
為了確保不會(huì)丟失message,RabbitMQ采用message確認(rèn)機(jī)制。RabbitMQ只有收到該message的Ack之后,才會(huì)刪除該消息。
如果worker中斷退出了( channel關(guān)閉了,connection關(guān)閉了,或是TCP連接丟失了)而沒(méi)有發(fā)送Ack,RabbitMQ會(huì)認(rèn)為該消息沒(méi)有完整的執(zhí)行,會(huì)將該消息重新入隊(duì)。該消息會(huì)被發(fā)送給其他的worker。這樣就不用message丟失,即使是在worker經(jīng)常異常中斷退出的場(chǎng)景下。
不會(huì)有任何message會(huì)timeout。當(dāng)消費(fèi)者中斷退出,RabbitMQ會(huì)重新分派message。即使消息的執(zhí)行會(huì)花費(fèi)很長(zhǎng)的時(shí)間。
默認(rèn)情況下,message是需要人工確認(rèn)的。在上面的例子中,我們通過(guò)autoAck=true來(lái)關(guān)閉了人工確認(rèn)。像下面這樣,我們將該標(biāo)志設(shè)置為false,worker就需要在完成了任務(wù)之后,發(fā)送確認(rèn)。
channel.basicQos(1); // accept only one unack-ed message at a time (see below) DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received "" + message + """); try { doWork(message); } finally { System.out.println(" [x] Done"); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } }; boolean autoAck = false; channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });
上面的代碼保證即使當(dāng)worker還在處理一條消息,而強(qiáng)制它退出,也不會(huì)丟失message。然后不久,所有未被確認(rèn)的消息都會(huì)被重新分派。
發(fā)送確認(rèn)必須和接收相同的channel。使用不同的channel進(jìn)行確認(rèn)會(huì)導(dǎo)致channel-level protocol 異常。
忘記確認(rèn)消息是一個(gè)比較常見(jiàn)的錯(cuò)誤,但是其后果是很?chē)?yán)重的。當(dāng)client退出時(shí),message會(huì)被重新分派,但是RabbitMQ會(huì)占用越來(lái)越多的內(nèi)存,因它無(wú)法釋放那些未被確認(rèn)的message。
可以通過(guò)rabbitmqctl來(lái)打印messages_unacknowledged:
##linux sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged ##windows rabbitmqctl.bat list_queues name messages_ready messages_unacknowledgedMessage 持久化
我們學(xué)習(xí)了在消費(fèi)者出現(xiàn)問(wèn)題的時(shí)候不丟失message。但是如果RabbitMQ服務(wù)器宕機(jī)了,我們還是會(huì)丟失message。
當(dāng)RabbitMQ宕機(jī)時(shí),默認(rèn)情況下,它會(huì)”忘記“所有的queue和message。為了確保message不丟失,我們需要確認(rèn)兩件事情:我們要使得queue和message都是持久的。
首先,我們要確保RabbitMQ不會(huì)丟失我們?cè)O(shè)置好的queue。所以,我們要把它聲明成持久的:
boolean durable = true; channel.queueDeclare("hello", durable, false, false, null);
雖然代碼沒(méi)有任何問(wèn)題,但是光這樣是無(wú)效的。因?yàn)槲覀冎耙呀?jīng)定義過(guò)名字為hello的queue。RabbitMQ不允許你使用不同的參數(shù)去重新定義一個(gè)已經(jīng)存在的queue,而且這還不會(huì)反悔任何錯(cuò)誤信息。但是我們還是有別的方法,讓我們使用一個(gè)別的名字,比如task_queue:
boolean durable = true; channel.queueDeclare("task_queue", durable, false, false, null);
聲明queue的改變要在生產(chǎn)者和消費(fèi)者的代碼里都進(jìn)行修改。
接著我們要設(shè)置message的持久性,我們通過(guò)設(shè)置MessageProperties為PERSISTENT_TEXT_PLAIN:
import com.rabbitmq.client.MessageProperties; channel.basicPublish("", "task_queue", MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
將message標(biāo)記成持久的不能100%保證message不會(huì)丟失,雖然這告訴RabbitMQ將message保存到磁盤(pán),然而在RabbitMQ從接到message到保存之間,仍然有一小段時(shí)間。同時(shí)RabbitMQ不會(huì)給每一條message執(zhí)行fsync(2) -- 可能只是保存到了cache而沒(méi)有寫(xiě)到磁盤(pán)上去。所以持久的保證也不是非常強(qiáng),然后對(duì)我們簡(jiǎn)單的task queue來(lái)說(shuō)則足夠了。如果需要一個(gè)非常強(qiáng)的保證,則可以使用發(fā)布確認(rèn)的方式。Fair 分派
你可能已經(jīng)注意到分派的工作沒(méi)有如我們所期望的來(lái)執(zhí)行。比如在有2個(gè)worker的情況系,所有偶數(shù)的message耗時(shí)很長(zhǎng),而所有奇數(shù)的message則耗時(shí)很短,這樣其中一個(gè)worker則一直被分派到偶數(shù)的message,而另一個(gè)則一直是奇數(shù)的message。RabbitMQ對(duì)此并不知曉,進(jìn)而繼續(xù)這樣分派著message。
這樣的原因是RabbitMQ是在message入queue的時(shí)候確定分派的。它不關(guān)心消費(fèi)者ack的情況。
我們可以通過(guò)basicQos方法和prefetchCount(1)來(lái)解決這個(gè)問(wèn)題。這個(gè)設(shè)置是讓RabbitMQ給worker一次一個(gè)message。或者這么說(shuō),直到worker處理完之前的message并發(fā)送ack,才給worker下一個(gè)message。否則,Rabbitmq會(huì)將message發(fā)送給其它不忙的worker。
int prefetchCount = 1; channel.basicQos(prefetchCount);
注意queue的大小。如果所有的worker都處于忙碌狀態(tài),queue可能會(huì)被裝滿(mǎn)。必須監(jiān)控queue深度,可能要開(kāi)啟更多的worker,或者采取其他的措施。開(kāi)始執(zhí)行
NewTask.java的最終版本
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.MessageProperties; public class NewTask { private static final String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); String message = String.join(" ", argv); channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8")); System.out.println(" [x] Sent "" + message + """); } } }
Worker.java的最終版本
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; public class Worker { private static final String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); final Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); channel.basicQos(1); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received "" + message + """); try { doWork(message); } finally { System.out.println(" [x] Done"); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } }; channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> { }); } private static void doWork(String task) { for (char ch : task.toCharArray()) { if (ch == ".") { try { Thread.sleep(1000); } catch (InterruptedException _ignored) { Thread.currentThread().interrupt(); } } } } }
使用message ack和prefetchCount,來(lái)設(shè)定work queue。持久化選項(xiàng)則在RabbitMQ重啟后能讓任務(wù)得以恢復(fù)。
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://specialneedsforspecialkids.com/yun/73901.html
摘要:發(fā)布訂閱模式在之前的文章里,創(chuàng)建了。我們稱(chēng)之為發(fā)布訂閱模式。其實(shí)我們是用到了默認(rèn)的,用空字符串來(lái)標(biāo)識(shí)。空字符串代表了沒(méi)有名字的被路由到了由指定名字的。和這種關(guān)系的建立我們稱(chēng)之為從現(xiàn)在開(kāi)始這個(gè)就會(huì)將推向我們的隊(duì)列了。 發(fā)布訂閱模式 在之前的文章里,創(chuàng)建了work queue。work queue中,每一個(gè)task都會(huì)派發(fā)給一個(gè)worker。在本章中,我們會(huì)完成完全不一樣的事情 - 我們會(huì)...
摘要:如果涉及返回值,就要用到本章提到的了。方法發(fā)送請(qǐng)求,并阻塞知道結(jié)果返回。當(dāng)有消息時(shí),進(jìn)行計(jì)算并通過(guò)指定的發(fā)送給客戶(hù)端。當(dāng)接收到,則檢查。如果和之前的匹配,則將消息返回給應(yīng)用進(jìn)行處理。 RPC模式 在第二章中我們學(xué)習(xí)了如何使用Work模式在多個(gè)worker之間派發(fā)時(shí)間敏感的任務(wù)。這種情況是不涉及到返回值的,worker執(zhí)行任務(wù)就好。如果涉及返回值,就要用到本章提到的RPC(Remote ...
摘要:這樣的消息分發(fā)機(jī)制稱(chēng)作輪詢(xún)。在進(jìn)程掛了之后,所有的未被確認(rèn)的消息會(huì)被重新分發(fā)。忘記確認(rèn)這是一個(gè)普遍的錯(cuò)誤,丟失。為了使消息不會(huì)丟失,兩件事情需要確保,我們需要持久化隊(duì)列和消息。 工作隊(duì)列 showImg(https://segmentfault.com/img/remote/1460000008229494?w=332&h=111); 在第一篇中,我們寫(xiě)了一個(gè)程序從已經(jīng)聲明的隊(duì)列中收發(fā)...
摘要:主題模式在上一章我們改進(jìn)了我們的日志系統(tǒng),如果使用我們只能簡(jiǎn)單進(jìn)行廣播,而使用則允許消費(fèi)者可以進(jìn)行一定程度的選擇。為的會(huì)同時(shí)發(fā)布到這兩個(gè)。當(dāng)為時(shí),會(huì)接收所有的。當(dāng)中沒(méi)有使用通配符和時(shí),的行為和一致。 主題模式 在上一章我們改進(jìn)了我們的日志系統(tǒng),如果使用fanout我們只能簡(jiǎn)單進(jìn)行廣播,而使用direct則允許消費(fèi)者可以進(jìn)行一定程度的選擇。但是direct還是有其局限性,其路由不支持多個(gè)...
摘要:路由模式在之前的文章中我們建立了一個(gè)簡(jiǎn)單的日志系統(tǒng)。更形象的表示,如對(duì)中的感興趣。為了進(jìn)行說(shuō)明,像下圖這么來(lái)設(shè)置如圖,可以看到有兩個(gè)綁到了類(lèi)型為的上。如圖的設(shè)置中,一個(gè)為的就會(huì)同時(shí)發(fā)送到和。接收程序可以選擇要接收日志的嚴(yán)重性級(jí)別。 路由模式 在之前的文章中我們建立了一個(gè)簡(jiǎn)單的日志系統(tǒng)。我們可以通過(guò)這個(gè)系統(tǒng)將日志message廣播給很多接收者。 在本篇文章中,我們?cè)谶@之上,添加一個(gè)新的功...
閱讀 486·2019-08-30 15:44
閱讀 901·2019-08-30 10:55
閱讀 2731·2019-08-29 15:16
閱讀 930·2019-08-29 13:17
閱讀 2804·2019-08-26 13:27
閱讀 572·2019-08-26 11:53
閱讀 2123·2019-08-23 18:31
閱讀 1890·2019-08-23 18:23