摘要:生產者只能把消息發到交換器。是否要追加到一個特殊的隊列是否要追加到許多的隊列或者丟掉這條消息這些規則被定義為交換類型。有一點很關鍵,向不存在的交換器發布消息是被禁止的。如果仍然沒有隊列綁定交換器,消息會丟失。
發布與訂閱 (Publish/Subscribe)
在之前的章節中,我們創建了工作隊列,之前的工作隊列的假設是每個任務只被分發到一個worker。在這一節中,我們會做一些完全不一樣的事--把一條消息發送給多個消費者,這個模式叫做“發布/訂閱”(publish/subscribe)。
舉個例子,我們要構建一個簡易的日志系統。由兩個程序組成---一個來發出日志消息,另一個接收并把消息顯示出來。
在我們的日志系統當中,每一個正在運行的接收程序都會收到消息。這樣,我們可以運行一個receiver并把log定向到磁盤,然后再跑一個receiver,看看它是否會在屏幕上顯示日志。
事實上,被發布的消息會被廣播到所有的receiver那里。
交換器(Exchanges)在之前的引導中,我們從一個隊列中做了收發的操作。是時候介紹在Rabbit中的全部的消息模型了。
讓我們先快速地回顧一下之前學習的,
producer 是一個發送消息的應用
queue 是一個存儲消息的buffer
consumer 是一個接收消息的應用
RabbitMQ中,消息模型的核心思想是生產者絕不會把消息直接發到隊列。實際上,生產者通常不知道一條消息是否已經被發送到任意一個隊列中。
生產者只能把消息發到交換器。交換器是個簡單的東西。一方面接收從生產者那邊來的消息,另一方面把他們push到隊列中。交換器一定要知道當它們接收到消息之后要如何處理。是否要追加到一個特殊的隊列?是否要追加到許多的隊列?或者丟掉這條消息?這些規則被定義為交換類型。
以下是可以使用的交換類型:direct, topic, header, fanout。我們介紹一下最后一個--fanout。讓我們先創建一個fanout類型的交換器“logs”:
ch.assertExchange("logs", "fanout", {durable: false})
fanout類型的交換器非常簡單,我們可以從單單從名字上猜測,它就是把它接收到的消息廣播給所有已知的隊列。這也就是我們的logger所需要的。
列出所有的交換器(Listing exchanges)
你可以使用rabbitmqctl
$sudo rabbitmqctl list_exchanges Listing exchanges ... direct amq.direct direct amq.fanout fanout amq.headers headers amq.match headers amq.rabbitmq.log topic amq.rabbitmq.trace topic amq.topic topic logs fanout ...done.
在列表中,一些amq.*的交換器和一些默認的(未命名的),都是被默認創建的,但是可能是你用不到的
未命名的交換器(Nameless exchange)
在之前的章節中我們未提過交換器,但是我們仍然能夠把消息傳到隊列中,這就是我們使用了默認的交換器,因為我們使用了空的字符串("")。
之前我們是這樣發布一條消息的
ch.sendToQueue("hello", new Buffer("Hello World!"));
這里我們使用默認的或者未命名的交換器,如果第一個參數存在的話,消息會被路由到這個參數名的隊列。
現在,我們可以使用我們定義好的交換器
ch.publish("logs", "", new Buffer("Hello World!"));
第二個參數為空的話代表我們不想把消息推到指定的隊列,只是想發布到logs的交換器中。
臨時隊列 (Temporary queues)你還記得我們之前用的聲明過的隊列(hello 和 task_queue)嗎?。能夠指明一個隊列的名字對我們來說是重要的--我們需要把workers指到相同的隊列。
當你想要分享給消費者和生產者隊列的時候,給隊列起一個名字很重要。
但著不是我們logger這個程序需要的,我們想監聽所有的log消息,不是一部分log消息。同樣的,我們對正在流動的消息也感興趣(not in the old ones).我們需要完成兩件事情:
第一,不管我們什么時候連接Rabbit,都需要一個新的,空的隊列。我們可以創建一個隨機的隊列名字,或者讓服務器為我們隨機選擇一個隊列名字。
第二,不管我們什么時候斷開與消費者的連接,隊列需要自動銷毀。
在amqp.node的客戶端中,當我們傳入字符串的時候,可以創建一個帶有名字的未持久化的隊列
ch.assertQueue("", {exclusive: true});
這個方法返回一個帶有隨機名字的隊列實例,比如amq.gen-JzTY20BRgKO-HjmUJj0wLg。
當連接被斷開的時候,這個隊列會被銷毀,因為我們在聲明的時候{exclusive:true}
我們已經創建了一個fanout類型的交換器和一個隊列,現在我們需要告訴交換器把消息發送給隊列,隊列與交換器之間的關系我們稱之為綁定。
ch.bindQueue(queue_name, "logs", "");
現在開始,logs的交換器為追加消息到我們的隊列
Listing bindings:
你可以列出已經存在的綁定關系,你應該猜到。rabbitmqctl list_bindings。
整合(Putting it all together)生產者的程序,用來發出log消息,和之前章節沒有太多的不同,最重要的改變就是現在我們是把消息發布到我們的logs的交換器中,而不是之前的在未聲明的情況下使用。發送的時候我們需要提供一個路由鍵,但是在fanout類型當中,這個可以忽略。下面是emit_log.js的代碼
#!/usr/bin/env node var amqp = require("amqplib/callback_api"); amqp.connect("amqp://localhost", function(err, conn) { conn.createChannel(function(err, ch) { var ex = "logs"; var msg = process.argv.slice(2).join(" ") || "Hello World!"; ch.assertExchange(ex, "fanout", {durable: false}); ch.publish(ex, "", new Buffer(msg)); console.log(" [x] Sent %s", msg); }); setTimeout(function() { conn.close(); process.exit(0) }, 500); });
(emit_log.js 源碼)
正如你所見,在與交換器建立連接之后。有一點很關鍵,向不存在的交換器發布消息是被禁止的。
如果仍然沒有隊列綁定交換器,消息會丟失。但是對我們來說還好,如果仍然沒有消費者監聽,我們可以安全地丟棄這些消息。
receive_logs.js的代碼
#!/usr/bin/env node var amqp = require("amqplib/callback_api"); amqp.connect("amqp://localhost", function(err, conn) { conn.createChannel(function(err, ch) { var ex = "logs"; ch.assertExchange(ex, "fanout", {durable: false}); ch.assertQueue("", {exclusive: true}, function(err, q) { console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q.queue); ch.bindQueue(q.queue, ex, ""); ch.consume(q.queue, function(msg) { console.log(" [x] %s", msg.content.toString()); }, {noAck: true}); }); }); });
(receive_logs,js源碼)
如果你想要保存log,你可以打開控制臺輸入
$ ./receive_logs.js > logs_from_rabbit.log
如果你想在屏幕上看到log,再打開一個控制臺
$ ./receive_logs.js
當然,需要發出logs
$ ./emit_log.js
使用rabbitmqctl list_bindings,你可以確定剛才的代碼確實創建了交換器和隊列,有兩個receive_logs.js的程序在運行。
$ sudo rabbitmqctl list_bindings Listing bindings ... logs exchange amq.gen-JzTY20BRgKO-HjmUJj0wLg queue [] logs exchange amq.gen-vso0PVvyiRIL2WoV3i48Yg queue [] ...done.
這個結果的簡要解釋:數據從logs交換器到兩個服務器分配的隊列。這也是我們想要的結果。
要如何監聽一部分的消息?讓我們移到下一章。
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/88110.html
摘要:這樣的消息分發機制稱作輪詢。在進程掛了之后,所有的未被確認的消息會被重新分發。忘記確認這是一個普遍的錯誤,丟失。為了使消息不會丟失,兩件事情需要確保,我們需要持久化隊列和消息。 工作隊列 showImg(https://segmentfault.com/img/remote/1460000008229494?w=332&h=111); 在第一篇中,我們寫了一個程序從已經聲明的隊列中收發...
摘要:一個表示編譯器檢測到一個無效的引用值。在實際情況中,往往是在獲取一個未被賦值的引用時被拋出。任何一個函數上下文都有一個被稱為活動對象的變量對象。沒有找到的話,就會認為引用名沒有基礎值并拋出的錯誤。下沒有下的屬性僅存在于被啟動的情況下。 和其他語言相比,javascript中的對于undefined的理解還是有點讓人困惑的。特別是試著理解ReferenceErrors錯誤(x is no...
摘要:概述技術棧錯誤詳情報警機器人經常有如下警告過程確定報錯位置有日志就很好辦首先看日志在哪里打的從三個地方入手我們自己的代碼沒有的代碼從上下來沒有的代碼在容器中執行 bug概述 技術棧 nginx uwsgi bottle 錯誤詳情 報警機器人經常有如下警告: 1 2018-xx-xxT06:59:03.038Z 660ece0ebaad admin/admin 14 - - Sock...
摘要:允許接收和轉發消息。一個等待接收消息的程序是一個消費者。發送者會先連接到發送一條消息,然后退出。注意這里的是要和之前的名稱一致。翻譯日期另因為想入門第一次想著翻譯,第一次然后希望多多提出不足。 gitBook https://joursion.gitbooks.io/... Title: RabbitMQ tutorials ---- Hello World (Javascript) ...
閱讀 2420·2021-11-18 10:02
閱讀 687·2021-10-08 10:04
閱讀 2250·2021-09-03 10:51
閱讀 3540·2019-08-30 15:44
閱讀 2799·2019-08-29 14:09
閱讀 2464·2019-08-29 12:21
閱讀 2064·2019-08-26 13:45
閱讀 1800·2019-08-26 13:25