摘要:推廣專題講座開源項目我們利用消息隊列實現了分布式事務的最終一致性解決方案,請大家圍觀。主題交換機也可以當成其它交換機來使用,假如隊列綁定到了那么它會接收所有的消息,就像廣播路由器一樣而如果未使用,那么就跟直達路由器一樣了。
推廣
https://segmentfault.com/l/15...
我們利用消息隊列實現了分布式事務的最終一致性解決方案,請大家圍觀??梢詤⒖荚创a:https://github.com/vvsuperman…,項目支持網站: http://rabbitmq.org.cn,最新文章或實現會更新在上面
前言在之前的建立路由中我們改進了日志系統。我們摒棄無腦發送消息的廣播路由器,而使用能夠根據綁定鍵(binding key)來發送消息的,從而能有有選擇的后去logs.
盡管使用直達路由器大大的改進了我們系統,但也存在局限性 - 無法加入更多條件。比如我們希望能夠加入更多的維度,我們希望不僅是基于嚴重程度,而且是基于來源,如果你對linux tool工具有了解的話,它不僅僅是基于嚴重程度(info/warn/crit...) 而且有來源(auth/cron/kern...),這個給到我們更大的靈活性-我們需要監聽所有來自"cron"的errors消息,以及來自"kern"的所有log。所以我們需要的是一個更復雜的主題交換機
主題交換機發送到主題交換機的消息并不會有一個確定的路由鍵-而是一長串字符列表,以"."來分割,而這個字符串列表表明了路由信息,比如"stock.usd.nyse","nyse.vmw","quick.orange.rabbit",字符串的最大長度限制在255bytes。
同時,在隊列綁定交換機時也需要指定模式,而符合模式的消息將會被發送至該隊列,模式可以由通配符組成:
"*" 可以表示一個詞
"#" 表示0個或多個詞
可以通過如下的例子來說明
請看例子,以發送動物的消息為例,我們會發送包含三個詞的路由鍵(兩個".")。第一個是速度,第二個是顏色,而第三個是種族
同時,我們建立了三個綁定,Q1綁定了鍵".orange.",Q2綁定了鍵"..rabbit"以及"lazy.#"??梢宰鋈缦碌慕忉?,Q1用來接受所有orange的動物,Q2用來接受所有rabbits,以及lazy的動物
一個路由為"quick.orange.rabbit"的消息將會被同時發送給這兩個隊列,消息"lazy.orange.elephant"也會被同時發給它們;"quick.orange.fox"只會發給第一個隊列;"lazy.brown.fox"會發到第二個;"lazy.pink.rabbit"將只會發送給第二個;"quick.brown.fox"會被丟棄因為匹配不上任何一個。
如果我們發送四個詞的呢?比如"oragne"或者"quick.orange.male.rabbit"?這些沒有任何匹配的隊列將會丟失。但比如"quick.orange.male.rabbit"會匹配到第二個隊列。
主題交換機也可以當成其它交換機來使用,假如隊列綁定到了 "#",那么它會接收所有的消息,就像廣播路由器一樣;而如果未使用"*","#",那么就跟直達路由器一樣了。
整合所有的代碼我們用主題交換機替換掉之前的直達交換機,用如同"
import com.rabbitmq.client.*; import java.io.IOException; public class EmitLogTopic { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "topic"); String routingKey = getRouting(argv); String message = getMessage(argv); channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes()); System.out.println(" [x] Sent "" + routingKey + "":"" + message + """); connection.close(); } //... }
ReceiveLogsTopic.java的代碼片段
import com.rabbitmq.client.*; import java.io.IOException; public class ReceiveLogsTopic { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "topic"); String queueName = channel.queueDeclare().getQueue(); if (argv.length < 1) { System.err.println("Usage: ReceiveLogsTopic [binding_key]..."); System.exit(1); } for (String bindingKey : argv) { channel.queueBind(queueName, EXCHANGE_NAME, bindingKey); } System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received "" + envelope.getRoutingKey() + "":"" + message + """); } }; channel.basicConsume(queueName, true, consumer); } }
編譯這段代碼
javac -cp $CP ReceiveLogsTopic.java EmitLogTopic.java
接受所有的logs
java -cp $CP ReceiveLogsTopic "#"
接受來自"kern"的消息
java -cp $CP ReceiveLogsTopic "kern.*"
接受來自"critical"的消息
java -cp $CP ReceiveLogsTopic "*.critical"
創建多個綁定
java -cp $CP ReceiveLogsTopic "kern.*" "*.critical"
發送消息
java -cp $CP EmitLogTopic "kern.critical" "A critical kernel error"
你可以嘗試更多的參數,以此來熟悉這個知識
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/68117.html
摘要:推廣專題講座開源項目我們利用消息隊列實現了分布式事務的最終一致性解決方案,請大家圍觀。因此一旦有有消息,消息會廣播到所有的消費者。如此一來路由器就能夠把消息發送給相應的隊列了。 推廣 RabbitMQ專題講座 https://segmentfault.com/l/15... CoolMQ開源項目 我們利用消息隊列實現了分布式事務的最終一致性解決方案,請大家圍觀。可以參考源碼:https...
摘要:前提必讀本教程假設是安裝在標準端口上運行。這些詞可以是任何東西,但通常它們指定連接到消息的某些特性。如果我們違背合同,用一個或四個詞,如或那么,這些消息將不匹配任何綁定并將丟失。代碼與前面的教程幾乎相同。 (using php-amqplib) 前提必讀 本教程假設RabbitMQ是安裝在標準端口上運行(5672)。如果您使用不同的主機、端口或憑據,則連接設置需要調整。 在哪里得到幫助...
摘要:主題模式在上一章我們改進了我們的日志系統,如果使用我們只能簡單進行廣播,而使用則允許消費者可以進行一定程度的選擇。為的會同時發布到這兩個。當為時,會接收所有的。當中沒有使用通配符和時,的行為和一致。 主題模式 在上一章我們改進了我們的日志系統,如果使用fanout我們只能簡單進行廣播,而使用direct則允許消費者可以進行一定程度的選擇。但是direct還是有其局限性,其路由不支持多個...
摘要:概述概述消息隊列,是分布式系統中重要的組件,是一種進程間通信或者是同一進程的不同線程的通信方式。消息隊列的使用場景消息隊列的使用場景異步處理流量控制應用解耦應用解耦應用解耦消息隊列的一個作用就是實現系統應用之間的解耦。概述消息隊列(Message Queue),是分布式系統中重要的組件,是一種進程間通信或者是同一進程的不同線程的通信方式。和 http 同步協議不同的是,消息隊列是一種異步的通...
摘要:消息隊列,用于存儲還未被消費者消費的消息。由在與時指定,而由發送時指定,兩者的匹配方式由決定。需要為每一個創建,協議規定只有通過才能執行的命令。建議客戶端線程之間不要共用,至少要保證共用的線程發送消息必須是串行的,但是建議盡量共用。 安裝 rabbitmq 在 mac 下可以直接用 brew 安裝默認安裝在 /usr/local/Cellar/下命令被軟連接加入到了/usr/local...
閱讀 3146·2021-11-22 12:01
閱讀 3767·2021-08-30 09:46
閱讀 784·2019-08-30 13:48
閱讀 3209·2019-08-29 16:43
閱讀 1657·2019-08-29 16:33
閱讀 1848·2019-08-29 13:44
閱讀 1410·2019-08-26 13:45
閱讀 2228·2019-08-26 11:44