摘要:發(fā)布訂閱模式在之前的文章里,創(chuàng)建了。我們稱之為發(fā)布訂閱模式。其實我們是用到了默認(rèn)的,用空字符串來標(biāo)識??兆址砹藳]有名字的被路由到了由指定名字的。和這種關(guān)系的建立我們稱之為從現(xiàn)在開始這個就會將推向我們的隊列了。
發(fā)布訂閱模式
在之前的文章里,創(chuàng)建了work queue。work queue中,每一個task都會派發(fā)給一個worker。在本章中,我們會完成完全不一樣的事情 - 我們會派發(fā)一條message給多個消費者。我們稱之為發(fā)布訂閱模式。
為了更好來說明,我們將要構(gòu)建一個簡單的日志系統(tǒng)。會由兩部分代碼構(gòu)成,第一部分來發(fā)送日志message,第二部分會接受并打印日志。
在我們的日志系統(tǒng)中,每一個接收程序都會收到日志message。這種方式下,我們可以運行一個接收程序?qū)⑷罩颈4娴酱疟P,同時使用另外一個接收程序?qū)⑷罩敬蛴〉狡聊弧?/p>
本質(zhì)上來說,發(fā)布的日志message會廣播到所有運行的接收者。
Exchanges在之前的章節(jié)我們通過queue收發(fā)message。現(xiàn)在開始介紹Rabbit中的full messaging model。
首先讓我們快速的回憶一下之前的章節(jié)
producer是一個發(fā)送message的用戶程序。
queue是保存message的緩沖區(qū)
consumer是接收message的用戶程序
RabbitMQ的messaging model的核心思想是producer不會直接向queue發(fā)送message。實際上,很多時候producer也不知道m(xù)essage會發(fā)送到哪些queue。
這里,producer將message發(fā)送到exchange。exchange是一個非常簡單的東西。一方面它從producer側(cè)接收message,另一方面它把message推送到queue去。 exchange必須知道對接收到的message接著要去做什么。是轉(zhuǎn)發(fā)到特定的queue?還是轉(zhuǎn)發(fā)到多個queue?還是干脆丟棄掉。這個規(guī)則取決于定義時exchange的類型。
exchange有四種可選的類型:direct, topic, headers和fanout. 今天我們聚焦于最后一種-fanout。讓我們創(chuàng)建一個fanout類型的exchange,命名為logs
channel.exchangeDeclare("logs","fanout");
fanout類型的exchange是非常簡單的??梢詮拿稚洗蟾挪鲁銎溆猛?,它廣播所有的message到它所知道的queue去。這也正是日志應(yīng)用所期望的。
列出所有的exhange,可以使用rabbitmqctl命令 sudo rabbitmqctl list_exchanges,在列表總會出現(xiàn)一些amq.* 的exchange,和默認(rèn)的exchange。這些是默認(rèn)自動創(chuàng)建的,我們不會使用到它們。沒有名字的exchange。在之前的章節(jié)里我們沒有提到過exchanges,我們直接將message發(fā)送到queue。其實我們是用到了默認(rèn)的exchange,用空字符串”“來標(biāo)識?;叵胍幌?,我們像下面這樣發(fā)布message:
channel.basicPublish("","hello",null,message.getBytes()); 第一個參數(shù)就是exchange的名字。空字符串代表了沒有名字的exchange:message被路由到了由routingKey指定名字的queue。
現(xiàn)在,我們可以向有名字的exchange發(fā)布message。
channel.basicPublish("logs","",null,message.getBytes());Temporary Queue
之前我們使用queue時都會指定名字,如hello和task_queue。給一個queue命名是很重要的,因為我們要給worker指出相同的queue。當(dāng)需要在生產(chǎn)者和消費者間共享一個queue時,就必須給queue取好名字。
但是在我們?nèi)罩緫?yīng)用中,情況卻有所不同。我們需要接收到所有的log message。我們也關(guān)注當(dāng)前流動的message。我們需要搞定2個事情。
首先,當(dāng)連接到Rabbit時,我們需要一個全新的,空的queue。因此我們可以自己創(chuàng)建一個隨意名字的queue,或是由服務(wù)器選擇隨意的queue名字,這當(dāng)然是更好的選擇。
其次,當(dāng)我們斷開接收者時,該queue可以被自動刪除。
在java客戶端中,當(dāng)我們使用無參的queueDeclare()時,我們創(chuàng)建的是使用自動生成名字的一個不持久的,自動刪除queue:
String queueName = channel.queueDeclare().getQueue();
可以通過這里來學(xué)習(xí)到exclusive標(biāo)志和其他queue的相關(guān)屬性。
這時queue就具有一個隨機的名字,比如像amq.gen-JzTY20BRgKO-HjmUJj0wLg.
Bindings
我們已經(jīng)創(chuàng)建了一個fanout exchange和queue.現(xiàn)在我們要設(shè)置exchange,讓它把message發(fā)送到我們的queue。exchange和queue這種關(guān)系的建立我們稱之為binding.
channel.queueBind(queueName,"logs","");
從現(xiàn)在開始logs這個exchange就會將message推向我們的隊列了。
可以使用命令rabbitmqctl list_bindings 來列出當(dāng)前所有的binding。開始執(zhí)行
生產(chǎn)者程序,和之前章節(jié)的代碼變化不大,主要的變化是我們將message發(fā)送到exchange而不是一個queue。你發(fā)現(xiàn)我們在發(fā)送的時候會填上一個routingKey,這個值在fanout類型的exchange中是被忽略的。下面是生產(chǎn)者EmitLog.java的代碼
public class EmitLog { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); String message = argv.length < 1 ? "info: Hello World!" : String.join(" ", argv); channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8")); System.out.println(" [x] Sent "" + message + """); } } }
如你所見,在建立connection之后我們聲明了exchange.這一步是必要的,發(fā)布Message到一個不存在的exchange是不允許的。
如果沒有queue綁定到exchange的時候,發(fā)布的message是會丟失的,但在現(xiàn)在這個場景是OK的。下面是ReceiveLogs.java的代碼:
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; public class ReceiveLogs { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, ""); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received "" + message + """); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); } }
首先進行編譯
javac -cp $CP EmitLog.java ReceiveLog.java
如果要把日志保存到文件,則
java -cp $CP ReceiveLogs > logs_from_rabbit.log
如果要在控制臺看日志,在另一個終端
java -cp $CP ReceiveLogs
最后來發(fā)送日志
java -cp $CP EmitLog
使用rabbitmqctl list_bindings,來確認(rèn)程序創(chuàng)建了我們在代碼中指定的binding和queue. 運行兩個ReceiveLogs程序,你會看到像下面的輸出
sudo rabbitmqctl list_bindings # => Listing bindings ... # => logs exchange amq.gen-JzTY20BRgKO-HjmUJj0wLg queue [] # => logs exchange amq.gen-vso0PVvyiRIL2WoV3i48Yg queue [] # => ...done.
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://specialneedsforspecialkids.com/yun/73895.html
摘要:主題模式在上一章我們改進了我們的日志系統(tǒng),如果使用我們只能簡單進行廣播,而使用則允許消費者可以進行一定程度的選擇。為的會同時發(fā)布到這兩個。當(dāng)為時,會接收所有的。當(dāng)中沒有使用通配符和時,的行為和一致。 主題模式 在上一章我們改進了我們的日志系統(tǒng),如果使用fanout我們只能簡單進行廣播,而使用direct則允許消費者可以進行一定程度的選擇。但是direct還是有其局限性,其路由不支持多個...
摘要:路由模式在之前的文章中我們建立了一個簡單的日志系統(tǒng)。更形象的表示,如對中的感興趣。為了進行說明,像下圖這么來設(shè)置如圖,可以看到有兩個綁到了類型為的上。如圖的設(shè)置中,一個為的就會同時發(fā)送到和。接收程序可以選擇要接收日志的嚴(yán)重性級別。 路由模式 在之前的文章中我們建立了一個簡單的日志系統(tǒng)。我們可以通過這個系統(tǒng)將日志message廣播給很多接收者。 在本篇文章中,我們在這之上,添加一個新的功...
摘要:生產(chǎn)者只能把消息發(fā)到交換器。是否要追加到一個特殊的隊列是否要追加到許多的隊列或者丟掉這條消息這些規(guī)則被定義為交換類型。有一點很關(guān)鍵,向不存在的交換器發(fā)布消息是被禁止的。如果仍然沒有隊列綁定交換器,消息會丟失。 發(fā)布與訂閱 (Publish/Subscribe) 在之前的章節(jié)中,我們創(chuàng)建了工作隊列,之前的工作隊列的假設(shè)是每個任務(wù)只被分發(fā)到一個worker。在這一節(jié)中,我們會做一些完全不一...
摘要:如果涉及返回值,就要用到本章提到的了。方法發(fā)送請求,并阻塞知道結(jié)果返回。當(dāng)有消息時,進行計算并通過指定的發(fā)送給客戶端。當(dāng)接收到,則檢查。如果和之前的匹配,則將消息返回給應(yīng)用進行處理。 RPC模式 在第二章中我們學(xué)習(xí)了如何使用Work模式在多個worker之間派發(fā)時間敏感的任務(wù)。這種情況是不涉及到返回值的,worker執(zhí)行任務(wù)就好。如果涉及返回值,就要用到本章提到的RPC(Remote ...
摘要:每個消費者會得到平均數(shù)量的。為了確保不會丟失,采用確認(rèn)機制。如果中斷退出了關(guān)閉了,關(guān)閉了,或是連接丟失了而沒有發(fā)送,會認(rèn)為該消息沒有完整的執(zhí)行,會將該消息重新入隊。該消息會被發(fā)送給其他的。當(dāng)消費者中斷退出,會重新分派。 Work模式 原文地址showImg(https://segmentfault.com/img/bVbqlXr?w=694&h=252); 在第一章中,我們寫了通過一個...
閱讀 1684·2021-08-30 09:45
閱讀 1751·2019-08-30 15:54
閱讀 1169·2019-08-30 14:02
閱讀 1925·2019-08-29 16:21
閱讀 1609·2019-08-29 13:47
閱讀 3193·2019-08-29 12:27
閱讀 698·2019-08-29 11:01
閱讀 2659·2019-08-26 14:04