摘要:主題模式在上一章我們改進了我們的日志系統,如果使用我們只能簡單進行廣播,而使用則允許消費者可以進行一定程度的選擇。為的會同時發布到這兩個。當為時,會接收所有的。當中沒有使用通配符和時,的行為和一致。
主題模式
在上一章我們改進了我們的日志系統,如果使用fanout我們只能簡單進行廣播,而使用direct則允許消費者可以進行一定程度的選擇。但是direct還是有其局限性,其路由不支持多個條件。
在我們的日志系統中,消費者程序可能不止是基于日志的severity,同時也想基于發送日志的源系統。你可能知道linux的syslog工具,它就是同時基于severity(info/warn/crit...)和功能(auth/cron/kern...).
這就提供了很大的靈活性-我們想接收來自cron的嚴重錯誤日志和kern的所有日志。
下面我們就使用更復雜的topic來改進我們的日志系統。
Topic exchange發送到topic類型exchange的message不可以具有模糊的routing_key,它必須具有以冒號分割的詞。就像"stock.usd.nyse","nyse.vmw","quick.orange.rabbit"等,限制長度255字節。
binding key也采用相似的形勢。topic exchange的邏輯和direct相似,通過比較message的routing key和bind的binding key,來匹配轉發的queue。但是topic的binding支持通配符:
” * “表示任何一個詞
” # “ 表示0或1個詞
通過上面圖示的場景來解釋會比較好理解。
例子中我們將發送描述動物的message。message會攜帶routing key(包含三個詞),第一個詞表示speed,第二個表示color,第三個表示species"
創建了三個綁定:Q1的binding key是”*.orange.*" Q2的binding key是“*.*.rabbit”和 "lazy.#".
以文字表述便是:
Q1 關心所有橘色的動物
Q2 關心所有的rabbit和所有的lazy動物
routing key為“quick.orange.rabbit"的message會同時發布到這兩個queue。
routing key為"lazy.orange.elephant"的message會同時發布到這兩個queue。
routing key為”quick.orange.fox“只會發布到第一個queue.
routing key為”lazy.brown.fox"的message只會發布到第二個queue.
routing key為"lazy.pink.rabbit"的message雖然滿足Q2的兩個條件,但也只會發布到Q2一次。
routing key為"quick.brown.fox"的message沒有任何匹配,就會被丟失。
如果我們發送的message只有一個word或者多余三個word,如"orange"或者"quick.orange.male.rabbit"會發生什么呢?這些message不會匹配任何binding key,均會被丟棄掉。
另外"lazy.orange.male.rabbit"雖然具有四個詞,但是會匹配最后的binding key,而被發送到第二個queue。
Topic exhange非常強大,同時可以模仿其他兩種類型的exchange。當binding key為 # 時,queue會接收所有的message。當binding key中沒有使用通配符(* 和 #)時,topic的行為和direct一致。開始執行
我們將在日志系統中使用topic exchange。我們的routding key采用兩個詞 "
EmitLogTopic.java的代碼如下:
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class EmitLogTopic { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { channel.exchangeDeclare(EXCHANGE_NAME, "topic"); String routingKey = getRouting(argv); String message = getMessage(argv); channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8")); System.out.println(" [x] Sent "" + routingKey + "":"" + message + """); } } //.. }
ReceiveLogsTopic.java的代碼如下:
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; public class ReceiveLogsTopic { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "topic"); String queueName = channel.queueDeclare().getQueue(); if (argv.length < 1) { System.err.println("Usage: ReceiveLogsTopic [binding_key]..."); System.exit(1); } for (String bindingKey : argv) { channel.queueBind(queueName, EXCHANGE_NAME, bindingKey); } System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received "" + delivery.getEnvelope().getRoutingKey() + "":"" + message + """); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); } }
編譯
javac -cp $CP ReceiveLogsTopic.java EmitLogTopic.java
接收所有日志
java -cp $CP ReceiveLogsTopic "#"
接收功能"kern"的日志
java -cp $CP ReceiveLogsTopic "kern.*"
接收嚴重級別日志
java -cp $CP ReceiveLogsTopic "*.critical"
接收者使用兩個綁定條件
java -cp $CP ReceiveLogsTopic "kern.*" "*.critical"
發送日志message
java -cp $CP EmitLogTopic "kern.critical" "A critical kernal error"
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/73916.html
摘要:路由模式在之前的文章中我們建立了一個簡單的日志系統。更形象的表示,如對中的感興趣。為了進行說明,像下圖這么來設置如圖,可以看到有兩個綁到了類型為的上。如圖的設置中,一個為的就會同時發送到和。接收程序可以選擇要接收日志的嚴重性級別。 路由模式 在之前的文章中我們建立了一個簡單的日志系統。我們可以通過這個系統將日志message廣播給很多接收者。 在本篇文章中,我們在這之上,添加一個新的功...
摘要:推廣專題講座開源項目我們利用消息隊列實現了分布式事務的最終一致性解決方案,請大家圍觀。主題交換機也可以當成其它交換機來使用,假如隊列綁定到了那么它會接收所有的消息,就像廣播路由器一樣而如果未使用,那么就跟直達路由器一樣了。 推廣 RabbitMQ專題講座 https://segmentfault.com/l/15... CoolMQ開源項目 我們利用消息隊列實現了分布式事務的最終一致性...
摘要:每個消費者會得到平均數量的。為了確保不會丟失,采用確認機制。如果中斷退出了關閉了,關閉了,或是連接丟失了而沒有發送,會認為該消息沒有完整的執行,會將該消息重新入隊。該消息會被發送給其他的。當消費者中斷退出,會重新分派。 Work模式 原文地址showImg(https://segmentfault.com/img/bVbqlXr?w=694&h=252); 在第一章中,我們寫了通過一個...
摘要:發布訂閱模式在之前的文章里,創建了。我們稱之為發布訂閱模式。其實我們是用到了默認的,用空字符串來標識。空字符串代表了沒有名字的被路由到了由指定名字的。和這種關系的建立我們稱之為從現在開始這個就會將推向我們的隊列了。 發布訂閱模式 在之前的文章里,創建了work queue。work queue中,每一個task都會派發給一個worker。在本章中,我們會完成完全不一樣的事情 - 我們會...
摘要:如果涉及返回值,就要用到本章提到的了。方法發送請求,并阻塞知道結果返回。當有消息時,進行計算并通過指定的發送給客戶端。當接收到,則檢查。如果和之前的匹配,則將消息返回給應用進行處理。 RPC模式 在第二章中我們學習了如何使用Work模式在多個worker之間派發時間敏感的任務。這種情況是不涉及到返回值的,worker執行任務就好。如果涉及返回值,就要用到本章提到的RPC(Remote ...
閱讀 685·2023-04-25 22:50
閱讀 1525·2021-10-08 10:05
閱讀 983·2021-09-30 09:47
閱讀 1913·2021-09-28 09:35
閱讀 815·2021-09-26 09:55
閱讀 3405·2021-09-10 10:51
閱讀 3426·2021-09-02 15:15
閱讀 3290·2021-08-05 09:57