摘要:這樣的消息分發(fā)機(jī)制稱作輪詢。在進(jìn)程掛了之后,所有的未被確認(rèn)的消息會(huì)被重新分發(fā)。忘記確認(rèn)這是一個(gè)普遍的錯(cuò)誤,丟失。為了使消息不會(huì)丟失,兩件事情需要確保,我們需要持久化隊(duì)列和消息。
工作隊(duì)列
在第一篇中,我們寫了一個(gè)程序從已經(jīng)聲明的隊(duì)列中收發(fā)消息,在這篇中,我們會(huì)創(chuàng)建一個(gè)工作隊(duì)列(Work Queue)來分發(fā)works里面的耗時(shí)任務(wù)。
其主要思想就是避免立即執(zhí)行耗資源的任務(wù),并等待它完成。相反的,我們要讓這些任務(wù)在稍后的一個(gè)時(shí)間執(zhí)行。我們把任務(wù)封裝成一個(gè)消息放到隊(duì)列中。
一個(gè)工作進(jìn)程會(huì)在后臺(tái)執(zhí)行,取出(Pop)任務(wù)并最終會(huì)完成這項(xiàng)任務(wù),當(dāng)你運(yùn)行多個(gè)work的時(shí)候,這些任務(wù)會(huì)在它們之間共享。
這個(gè)概念在web應(yīng)用中也是非常有用的,當(dāng)在一個(gè)http請求窗口中不可能完成一個(gè)復(fù)雜的任務(wù)時(shí)候。
準(zhǔn)備在之前的引導(dǎo)中,我們發(fā)送了一個(gè)’Hello World‘的消息。現(xiàn)在我們要發(fā)送一個(gè)字符串代表一個(gè)復(fù)雜的任務(wù),我們沒有像調(diào)整
圖片大小或者渲染一個(gè)pdf文件這樣的在真實(shí)場景中的任務(wù),所以我們使用setTimeout來模擬我們正處于忙碌狀態(tài)。我們把‘.’的數(shù)量代表這個(gè)字符串的復(fù)雜度;
每一個(gè)‘." 會(huì)消耗一秒鐘,例:一個(gè)模擬的任務(wù)"Hello..." 會(huì)消耗三秒鐘。
從之前的例子,我們稍稍修改一下send.js 的代碼,允許命令行可以發(fā)送任意的消息。這個(gè)程序在工作隊(duì)列中安排好任務(wù),所以我們稱它new_task.js
var q = "task_queue"; var msg = process.argv.slice(2).join(" ") || "Hello World!"; ch.assertQueue(q, {durable: true}); ch.sendToQueue(q, new Buffer(msg), {persistent: true}); console.log(" [x] Sent "%s"", msg);
我們的之前的receive.js同樣需要一些改變,需要對消息內(nèi)容中的每個(gè)"."模擬成一個(gè)會(huì)消耗一秒的任務(wù)。它要從隊(duì)列中取出一條消息并執(zhí)行這個(gè)任務(wù),我們把它稱作worker.js
ch.consume(q, function(msg) { var secs = msg.content.toString().split(".").length - 1; console.log(" [x] Received %s", msg.content.toString()); setTimeout(function() { console.log(" [x] Done"); }, secs * 1000); }, {noAck: true});
注意我們模擬的執(zhí)行時(shí)間
執(zhí)行我們的程序
shell1$ ./worker.js shell2$ ./new_task.js循環(huán)調(diào)度
使用任務(wù)隊(duì)列(Task Queue)的其中的一個(gè)優(yōu)勢是有簡化并行工作的能力。如果我們有很多堆積的未完成的任務(wù),我們只需添加更多的worker來進(jìn)行擴(kuò)展。
首先,我們嘗試同時(shí)啟動(dòng)兩個(gè)worker.js,他們都會(huì)從隊(duì)列中受到消息,但是實(shí)際上呢?我們來看看
你需要打開第三個(gè)命令行,兩個(gè)來運(yùn)行worker.js腳本,我們稱作C1,C2
shell1$ ./worker.js [*] Waiting for messages. To exit press CTRL+C
shell2$ ./worker.js [*] Waiting for messages. To exit press CTRL+C
在第三個(gè)命令行工具中,我們會(huì)發(fā)布新的任務(wù),一旦你啟動(dòng)消費(fèi)者,你可以發(fā)布一些消息:
shell3$ ./new_task.js First message. shell3$ ./new_task.js Second message.. shell3$ ./new_task.js Third message... shell3$ ./new_task.js Fourth message.... shell3$ ./new_task.js Fifth message.....
讓我們看看什么被分發(fā)到我們的worker
shell1$ ./worker.js [*] Waiting for messages. To exit press CTRL+C [x] Received "First message." [x] Received "Third message..." [x] Received "Fifth message....."
shell2$ ./worker.js [*] Waiting for messages. To exit press CTRL+C [x] Received "Second message.." [x] Received "Fourth message...."
默認(rèn)情況下,RabbitMQ會(huì)依次地把消息推送到下一個(gè)消費(fèi)者,平均每個(gè)消費(fèi)者會(huì)得到相同數(shù)量的消息。這樣的消息分發(fā)機(jī)制稱作輪詢。可以嘗試3個(gè)或更多的worker。
## 消息確認(rèn) (Message acknowledgment)
要完成一個(gè)任務(wù)需要一些事件,你可能會(huì)想,當(dāng)一個(gè)消費(fèi)者開始執(zhí)行一個(gè)長的任務(wù)但只執(zhí)行一部分就die了會(huì)發(fā)生什么。就我們當(dāng)前的代碼,一旦RabbitMQ 分發(fā)了一條消息到消費(fèi)者那邊,就會(huì)立即從存儲(chǔ)中移除這條消息。這樣的話,如果你殺掉了進(jìn)程,我們將會(huì)丟失這條正在被處理的消息。
我們也同樣丟失了我們發(fā)送給這個(gè)進(jìn)程的但還沒被處理的消息。
但是我們不想丟失任何的任務(wù),如果一個(gè)進(jìn)程掛掉,我們希望這個(gè)任務(wù)會(huì)被分發(fā)到其他的進(jìn)程。
為了確保每一條消息絕不會(huì)丟失,RabbitMQ支持 消息確認(rèn),一個(gè)ack標(biāo)志會(huì)從消費(fèi)者那邊返回去通知RabbitMQ當(dāng)前的這個(gè)消息已經(jīng)收到并且已經(jīng)完成,于是RabbitMQ就可以取刪掉這個(gè)任務(wù)了。
如果一個(gè)消費(fèi)者掛了(通道被關(guān)閉,連接關(guān)閉,或者TCP連接丟失)而沒有發(fā)送ack標(biāo)志,RabbitMQ會(huì)明白這條任務(wù)還沒被執(zhí)行完,并會(huì)重新放回隊(duì)列中,如果當(dāng)時(shí)有其他的消費(fèi)者在線,這個(gè)消息會(huì)被快速地發(fā)送給其他的消費(fèi)者。這樣的話你就可以保證沒有消息會(huì)遺失,即使進(jìn)程只是偶爾會(huì)掛掉。
不管消息處理是否超時(shí),RabbitMQ只會(huì)在消費(fèi)者掛掉的時(shí)候重新分發(fā)消息。這對于那些要處理很久很久的消息也是好的(add:不會(huì)被判定為noack,而重新分發(fā))
在之前的例子中,消息確認(rèn)是被關(guān)閉的,是時(shí)候打開它了,使用{noAck: false}(你也可以移除這個(gè)操作選項(xiàng))選項(xiàng),當(dāng)我們完成這個(gè)任務(wù)的時(shí)候發(fā)送一個(gè)正確的消息確認(rèn)。
ch.consume(q, function(msg) { var secs = msg.content.toString().split(".").length - 1; console.log(" [x] Received %s", msg.content.toString()); setTimeout(function() { console.log(" [x] Done"); ch.ack(msg); }, secs * 1000); }, {noAck: false});
使用這樣的代碼,你可以確定即使在它還在處理消息的時(shí)候你使用CTRL+C 殺掉進(jìn)程也不會(huì)有數(shù)據(jù)丟失。在進(jìn)程掛了之后,所有的未被確認(rèn)的消息會(huì)被重新分發(fā)。
## 忘記確認(rèn) 這是一個(gè)普遍的錯(cuò)誤,丟失ack。只是一個(gè)簡單的錯(cuò)誤,但結(jié)果確實(shí)很嚴(yán)重的。當(dāng)客戶端停止的時(shí)候,消息會(huì)被重新分發(fā)(像是被隨機(jī)分發(fā)),但是RabbitMQ會(huì)占用越來越多的內(nèi)存當(dāng)它不能取釋放掉任何未被確認(rèn)的消息。 為了調(diào)試這種類型的錯(cuò)誤,你可以使用`rabbitmqctl`來輸出未被確認(rèn)的消息字段: $ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged Listing queues ... hello 0 0 ...done.消息持久化
我們學(xué)習(xí)了確保在進(jìn)程掛掉仍保證任務(wù)不會(huì)被丟失,但我們的任務(wù)還是會(huì)在RabbitMQ服務(wù)停止的時(shí)候丟失。
當(dāng)RabbitMQ 退出或者崩潰,除非你叫它不要丟失,不然隊(duì)列和消息都會(huì)丟失。為了使消息不會(huì)丟失,兩件事情需要確保,我們需要持久化隊(duì)列和消息。
首先,我們要讓RabbitMQ 不會(huì)丟失隊(duì)列,為此,我們要先聲明
ch.assertQueue("hello", {durable: true});
盡管這樣的操作是對的,但是在我們現(xiàn)在的配置中是不起作用的,這是因?yàn)槲覀円呀?jīng)定義了一個(gè)未持久化的叫做hello的隊(duì)列,RabbitMQ不允許你改變一個(gè)已經(jīng)存在的隊(duì)列的參數(shù),如果你這樣做,程序?qū)?huì)返回錯(cuò)誤。
但是有一個(gè)快速的辦法 --- 讓我們定義一個(gè)新的隊(duì)列,叫做task_queue
ch.assertQueue("task_queue", {durable: true});
這個(gè)durable選項(xiàng),需要消費(fèi)者和生產(chǎn)者都去使用。
此時(shí)我們能保證task_queue隊(duì)列不會(huì)在RabbitMQ重啟的時(shí)候丟失,現(xiàn)在我們需要對消息進(jìn)行持久化 --- 使用presistent的Channel.sendToQueue選項(xiàng),
ch.sendToQueue(q, new Buffer(msg), {persistent: true});
注意:消息持久化 消息持久化,不能完全地保證消息不會(huì)丟失,盡管它告訴RabbitMQ要把消息存到磁盤當(dāng)中,總存在一個(gè)RabbitMQ接收到消息,但還未處理完的情況。另外,RabbitMQ并不是對每個(gè)消息做到幀同步,有可能只是被寫到緩存中,還沒被寫到磁盤。 消息持久化不能完全保證,但已經(jīng)遠(yuǎn)遠(yuǎn)滿足我們的簡單的工作隊(duì)列的需求,如果你需要更強(qiáng)的持久化的保證,你可以使用[publisher confirms](https://www.rabbitmq.com/confirms.html)。均衡調(diào)度(Fair dispatch)
你可能已經(jīng)注意到現(xiàn)在的調(diào)度并不是我們想要的,例:在有兩個(gè)worker的情況下,當(dāng)所有的奇數(shù)消息都是重的而偶數(shù)消息是輕量的,那會(huì)有一個(gè)worker會(huì)一直處于忙碌狀態(tài),而另一個(gè)worker幾乎不工作,
RabbitMQ,并不知道這些情況,只知道持續(xù)地均勻地分發(fā)消息。
這樣發(fā)生的原因是RabbitMQ只是在消息進(jìn)入隊(duì)列的時(shí)候進(jìn)行分發(fā)的工作,不管消費(fèi)者的未確認(rèn)的消息的數(shù)量,只是一味地分發(fā)第N條消息給第N個(gè)消費(fèi)者。
為了解決這樣的問題,我們使用方法prefetch,并設(shè)置值為1,表示RabbitMQ不會(huì)同時(shí)給一個(gè)worker超過一條消息,即,不會(huì)分發(fā)一條新的消息直到worker完成并且發(fā)送ack標(biāo)志。否則,RabbitMQ會(huì)把消息發(fā)送給下一個(gè)不在忙碌狀態(tài)的worker.
ch.prefetch(1);
注意隊(duì)列的大小 如果所有的worker都處于忙碌狀態(tài),你的隊(duì)列可以被填滿,你可能需要一個(gè)監(jiān)控,或者添加更多的worker,或者有其他的解決方案。整合
最后的new_task.js的代碼:
#!/usr/bin/env node var amqp = require("amqplib/callback_api"); amqp.connect("amqp://localhost", function(err, conn) { conn.createChannel(function(err, ch) { var q = "task_queue"; var msg = process.argv.slice(2).join(" ") || "Hello World!"; ch.assertQueue(q, {durable: true}); ch.sendToQueue(q, new Buffer(msg), {persistent: true}); console.log(" [x] Sent "%s"", msg); }); setTimeout(function() { conn.close(); process.exit(0) }, 500); });
new_task.js source
worker.js的代碼:
#!/usr/bin/env node var amqp = require("amqplib/callback_api"); amqp.connect("amqp://localhost", function(err, conn) { conn.createChannel(function(err, ch) { var q = "task_queue"; ch.assertQueue(q, {durable: true}); ch.prefetch(1); console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q); ch.consume(q, function(msg) { var secs = msg.content.toString().split(".").length - 1; console.log(" [x] Received %s", msg.content.toString()); setTimeout(function() { console.log(" [x] Done"); ch.ack(msg); }, secs * 1000); }, {noAck: false}); }); });
worker.js source
使用消息確認(rèn)和預(yù)處理,你可以建立一個(gè)工作隊(duì)列。持久化選項(xiàng)使得消息可以在RabbitMQ會(huì)重啟的情況下得以保留。
獲得更多的關(guān)于Channel的方法和消息的屬性,你可以瀏覽amqplib docs
翻譯:Joursion
日期 :2016/12/25
歡迎交流,學(xué)習(xí)。
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://specialneedsforspecialkids.com/yun/88111.html
摘要:生產(chǎn)者只能把消息發(fā)到交換器。是否要追加到一個(gè)特殊的隊(duì)列是否要追加到許多的隊(duì)列或者丟掉這條消息這些規(guī)則被定義為交換類型。有一點(diǎn)很關(guān)鍵,向不存在的交換器發(fā)布消息是被禁止的。如果仍然沒有隊(duì)列綁定交換器,消息會(huì)丟失。 發(fā)布與訂閱 (Publish/Subscribe) 在之前的章節(jié)中,我們創(chuàng)建了工作隊(duì)列,之前的工作隊(duì)列的假設(shè)是每個(gè)任務(wù)只被分發(fā)到一個(gè)worker。在這一節(jié)中,我們會(huì)做一些完全不一...
摘要:允許接收和轉(zhuǎn)發(fā)消息。一個(gè)等待接收消息的程序是一個(gè)消費(fèi)者。發(fā)送者會(huì)先連接到發(fā)送一條消息,然后退出。注意這里的是要和之前的名稱一致。翻譯日期另因?yàn)橄肴腴T第一次想著翻譯,第一次然后希望多多提出不足。 gitBook https://joursion.gitbooks.io/... Title: RabbitMQ tutorials ---- Hello World (Javascript) ...
摘要:每個(gè)消費(fèi)者會(huì)得到平均數(shù)量的。為了確保不會(huì)丟失,采用確認(rèn)機(jī)制。如果中斷退出了關(guān)閉了,關(guān)閉了,或是連接丟失了而沒有發(fā)送,會(huì)認(rèn)為該消息沒有完整的執(zhí)行,會(huì)將該消息重新入隊(duì)。該消息會(huì)被發(fā)送給其他的。當(dāng)消費(fèi)者中斷退出,會(huì)重新分派。 Work模式 原文地址showImg(https://segmentfault.com/img/bVbqlXr?w=694&h=252); 在第一章中,我們寫了通過一個(gè)...
摘要:發(fā)布訂閱模式在之前的文章里,創(chuàng)建了。我們稱之為發(fā)布訂閱模式。其實(shí)我們是用到了默認(rèn)的,用空字符串來標(biāo)識(shí)。空字符串代表了沒有名字的被路由到了由指定名字的。和這種關(guān)系的建立我們稱之為從現(xiàn)在開始這個(gè)就會(huì)將推向我們的隊(duì)列了。 發(fā)布訂閱模式 在之前的文章里,創(chuàng)建了work queue。work queue中,每一個(gè)task都會(huì)派發(fā)給一個(gè)worker。在本章中,我們會(huì)完成完全不一樣的事情 - 我們會(huì)...
摘要:平均每個(gè)消費(fèi)者將得到相同數(shù)量的消息。消息確認(rèn)完成任務(wù)可能需要幾秒鐘。為了確保消息不會(huì)丟失,支持消息確認(rèn)。沒有任何消息超時(shí)當(dāng)這個(gè)消費(fèi)者中止了,將會(huì)重新分配消息時(shí)。這是因?yàn)橹皇钦{(diào)度消息時(shí),消息進(jìn)入隊(duì)列。 showImg(https://segmentfault.com/img/bVXNuN?w=332&h=111); 介紹 在上一個(gè) Hello World 教程中,我們編寫了從指定隊(duì)列發(fā)送...
閱讀 3115·2023-04-25 15:02
閱讀 2806·2021-11-23 09:51
閱讀 2030·2021-09-27 13:47
閱讀 1984·2021-09-13 10:33
閱讀 957·2019-08-30 15:54
閱讀 2640·2019-08-30 15:53
閱讀 2853·2019-08-29 13:58
閱讀 881·2019-08-29 13:54