摘要:推廣專題講座開源項目我們利用消息隊列實現了分布式事務的最終一致性解決方案,請大家圍觀。因此一旦有有消息,消息會廣播到所有的消費者。如此一來路由器就能夠把消息發送給相應的隊列了。
推廣
https://segmentfault.com/l/15...
我們利用消息隊列實現了分布式事務的最終一致性解決方案,請大家圍觀。可以參考源碼:https://github.com/vvsuperman…,項目支持網站: http://rabbitmq.org.cn,最新文章或實現會更新在上面
前言在第二章中我們描述了任務隊列,在任務隊列中一個消息只會發送給一個消費者。而在這一章中我們將消息發送給許多個消費者,我們稱之為“發布/訂閱”
為了更好的闡述這個模式,我們會建立一個新的簡單的logging系統,包含2個步驟-第一步發送log信息,第二步能夠接受并將信息打印出來,而且在第二步中所有的消費者都會接受到同樣的消息,比如一個消費者用來將log信息寫到磁盤,另外一個接受信息并顯示在屏幕上。因此一旦有有消息,消息會廣播到所有的消費者。
交換機(Exchanges)前面的章節中我們是直接通過queue來處理消息,現在我們來介紹一種更完善的模式
讓我們迅速瀏覽一遍前面的主題:
生產者是一個客戶端程序,用來發送消息
隊列是一個緩沖,用來存儲消息
消費者是一個客戶端程序,用來接受消息
RabbitMQ的核心思想是生產者不會將消息直接發送給隊列,意味著生產者是完全看不到隊列的。反之,生產者只能將消息發送給路由器(Exchange),再由路由器來決定該如何來處理消息,是將消息發送給一個隊列呢,還是發送給許多個隊列,或者直接無視,具體的規則是根據路由器的類型而定的。
路由器的類型有這樣幾種:直連路由器(dirct), 主題路由器(topic),頭部路由器(headers),以及多廣播路由器(fanout)
channel.exchangeDeclare("logs", "fanout");
廣播路由器聽起來就很簡單,它會將消息廣播到所有的它所知道的隊列,而這正是我們所需要的。
默認路由器在前面的章節中雖然沒有設置任何路由器,但依然能夠將消息發送到隊列,這是因為我們的是默認路由器:使用空字符串("")來做的定義:
channel.basicPublish("", "hello", null, message.getBytes());
第一個參數是exchange的名稱,在這里是空字符串,消息會通過路由健(routingKey)發送到該鍵所對應的隊列。
然而現在,我們有了確認的路由器
channel.basicPublish( "logs", "", null, message.getBytes());
臨時隊列
我們之前隊列都有名字(Hello隊列和task_queue隊列),給隊列起名字非常重要-需要將消費者綁定到特定的queue上面,以及需要把消息從生產者發送給特定的消費者。
但對于日志來說,消息會發送到所有的消費者,而并非個別,We"re also interested only in currently flowing messages not in the old ones.為了滿足當前需求我們可以做兩件事
一旦連接上RabbitMQ,需要一個新的空隊列來接受消息,我們可以隨機起個名字,甚至根本不起名,而讓RabbitMQ來命名它。
一旦消費者斷開連接,這個隊列就能被刪除掉
我們可以這樣定義一個不需要持久化、獨立的、能夠被自動刪除的隊列
String queueName = channel.queueDeclare().getQueue();
這個名稱是RabbitMQ隨機分配的,比如amq.gen-JzTY20BRgKO-HjmUJj0wLg.
綁定我們已經聲明了一個廣播路由器,現在需要告訴這個路由器需要把信息發送給哪些隊列,路由器和隊列間的這個關系就稱之為綁定。
channel.queueBind(queueName, "logs", "");
如此一來路由器就能夠把消息發送給相應的隊列了。
整合發送者與我們之前的代碼基本相同,最重大的區別我們現在是發送給帶名稱的路由器了,同時我們也需要一個路由鍵,但這里也不需要,因為廣播路由器會忽略這個值,這是我們EmitLog.java的代碼
import java.io.IOException; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; public class EmitLog { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] argv) throws java.io.IOException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); String message = getMessage(argv); channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes()); System.out.println(" [x] Sent "" + message + """); channel.close(); connection.close(); } //... }
可以看到,一旦我們建立的連接立即定義了一個路由器,這個步驟對我們非常重要,因為是嚴禁將消息發送給并不存在的路由的。
同時,如果路由器沒有綁定隊列,消息也會丟失掉,但這對于我們來說是ok的:如果并沒有消費者在監聽,我們可以直接丟棄掉這個消息。
ReciveLogs.java代碼如下:
import com.rabbitmq.client.*; import java.io.IOException; public class ReceiveLogs { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, ""); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received "" + message + """); } }; channel.basicConsume(queueName, true, consumer); } }
編譯代碼
javac -cp $CP EmitLog.java ReceiveLogs.java
如果你希望將log存儲到本機上
java -cp $CP ReceiveLogs > logs_from_rabbit.log
如果你希望在屏幕上顯示log信息,打開一個新的終端:
java -cp $CP ReceiveLogs
發送消息
java -cp $CP EmitLog
如此一來,就能夠存儲消息的同時進行打印了。
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/68123.html
摘要:可以參考源碼,項目支持網站,最新文章或實現會更新在上面前言在訂閱發布中我們建立了一個簡單的日志系統,從而將消息廣播給一些消費者。因此,發送到路由鍵的消息會發送給隊列,發送到路由鍵或者的消息會發送給,其它的消息將被丟棄。 推廣 RabbitMQ專題講座 https://segmentfault.com/l/15... CoolMQ開源項目 我們利用消息隊列實現了分布式事務的最終一致性解決...
摘要:發布訂閱模式在之前的文章里,創建了。我們稱之為發布訂閱模式。其實我們是用到了默認的,用空字符串來標識。空字符串代表了沒有名字的被路由到了由指定名字的。和這種關系的建立我們稱之為從現在開始這個就會將推向我們的隊列了。 發布訂閱模式 在之前的文章里,創建了work queue。work queue中,每一個task都會派發給一個worker。在本章中,我們會完成完全不一樣的事情 - 我們會...
摘要:本文將會講解如何使用實現延時重試和失敗消息隊列,實現可靠的消息消費,消費失敗后,自動延時將消息重新投遞,當達到一定的重試次數后,將消息投遞到失敗消息隊列,等待人工介入處理。 RabbitMQ是一款使用Erlang開發的開源消息隊列。本文假設讀者對RabbitMQ是什么已經有了基本的了解,如果你還不知道它是什么以及可以用來做什么,建議先從官網的 RabbitMQ Tutorials 入門...
摘要:本文將會講解如何使用實現延時重試和失敗消息隊列,實現可靠的消息消費,消費失敗后,自動延時將消息重新投遞,當達到一定的重試次數后,將消息投遞到失敗消息隊列,等待人工介入處理。 RabbitMQ是一款使用Erlang開發的開源消息隊列。本文假設讀者對RabbitMQ是什么已經有了基本的了解,如果你還不知道它是什么以及可以用來做什么,建議先從官網的 RabbitMQ Tutorials 入門...
摘要:性能調優筆記避免雷區要避免流控機制觸發服務端默認配置是當內存使用達到,磁盤空閑空間小于,即啟動內存報警,磁盤報警報警后服務端觸發流控機制。最佳線程生產者使用多線程發送數據到三到五個線程性能發送最佳,超過它也不能提高生產的發送速率。 RabbitMq 性能調優筆記 [TOC] 避免雷區 要避免流控機制觸發 服務端默認配置是當內存使用達到40%,磁盤空閑空間小于50M,即啟動內存報警,磁...
閱讀 1608·2021-11-23 09:51
閱讀 1178·2019-08-30 13:57
閱讀 2257·2019-08-29 13:12
閱讀 2011·2019-08-26 13:57
閱讀 1193·2019-08-26 11:32
閱讀 978·2019-08-23 15:08
閱讀 699·2019-08-23 14:42
閱讀 3079·2019-08-23 11:41