摘要:路由模式在之前的文章中我們建立了一個簡單的日志系統。更形象的表示,如對中的感興趣。為了進行說明,像下圖這么來設置如圖,可以看到有兩個綁到了類型為的上。如圖的設置中,一個為的就會同時發送到和。接收程序可以選擇要接收日志的嚴重性級別。
路由模式
在之前的文章中我們建立了一個簡單的日志系統。我們可以通過這個系統將日志message廣播給很多接收者。
在本篇文章中,我們在這之上,添加一個新的功能,即允許接收者訂閱message的一個子集。舉個例子,我們將日志分成多個級別,一個接收者接收錯誤日志將之保存到磁盤,另一個接收者接收所有日志將之打印到控制臺。
Bindings在前面的章節中,我們已經接觸過binding了,像下面的代碼這樣:
channel.queueBind(queueName,EXCHANGE_NAME,"");
binding將exchange和queue關聯在了一起。更形象的表示,如:queue對exchange中的message感興趣。
bindings可以攜帶一個routingKey參數。為了避免和basic_publish的參數弄混,我們稱之它為binding_key.我們像下面這樣創建一個binding
channel.queueBind(queueName,EXCHANGE_NAME,"black");
binding key的作用要看exchange的類型,對于fanout類型的exchange,binding key是直接忽略的。
Direct Exchange在之前的日志系統中,message會推送到所有的消費者去。我們想讓系統依據message的日志級別進行過濾。比如一個消費者只接收嚴重級別的日志。
fanout無法幫我們實現這樣的功能,它只是無腦的進行廣播。
我們使用direct類型的exchange,它的路由算法是非常簡單的 - 只要message的routing_key和bind的binding_key相同即進行轉發。
為了進行說明,像下圖這么來設置
如圖,可以看到有兩個queue綁到了類型為direct的exchange上。第一個queue綁定用了orange這個binding key,第二個則用了black和green兩個binding key。
那么結果就是有routing key為orange的message路由到了Q1.而routing key為black和green的message則路由到了Q2,其他的消息則被丟棄了。
Multiple Bindings
若使用相同的binding key將多個queue綁定到exchange上,就和fanout的行為一樣了,message會廣播到binding key相同的queue去。如圖的設置中,一個routing key為black的message就會同時發送到Q1和Q2。
我們將在我們的日志系統上應用這個模型,使用direct類型的exchange去替代fanout類型的exchange。提供日志的嚴重性作為routing key。接收程序可以選擇要接收日志的嚴重性級別。
首先我們創建exchange
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
然后就是發送message
channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
我們先假設severity取值 info | warning | error
Subscribing接收message和上一章沒什么區別,只是需要給各個severity創建新的binding。
String queueName = channel.queueDeclare().getQueue(); for(String severity : argv){ channel.queueBind(queueName, EXCHANGE_NAME, severity); }開始執行
EmitLogDirect.java代碼如下
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class EmitLogDirect { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { channel.exchangeDeclare(EXCHANGE_NAME, "direct"); String severity = getSeverity(argv); String message = getMessage(argv); channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8")); System.out.println(" [x] Sent "" + severity + "":"" + message + """); } } //.. }
ReceiveLogsDirect.java代碼如下:
import com.rabbitmq.client.*; public class ReceiveLogsDirect { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "direct"); String queueName = channel.queueDeclare().getQueue(); if (argv.length < 1) { System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]"); System.exit(1); } for (String severity : argv) { channel.queueBind(queueName, EXCHANGE_NAME, severity); } System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received "" + delivery.getEnvelope().getRoutingKey() + "":"" + message + """); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); } }
編譯代碼
javac -cp $CP ReceiveLogsDirect.java EmitLogDirect.java
如果想把warning和error的日志保存到文件去,那么
java -cp $CP ReceiveLogsDirect warning error > logs_from_rabbit.log
如果想把所有的日志打印到控制臺,那么
java -cp $CP ReceiveLogsDirect info warning error
發送error日志
java -cp $CP EmitLogDirect error "Run.Run. Or it will explode"
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/73892.html
摘要:主題模式在上一章我們改進了我們的日志系統,如果使用我們只能簡單進行廣播,而使用則允許消費者可以進行一定程度的選擇。為的會同時發布到這兩個。當為時,會接收所有的。當中沒有使用通配符和時,的行為和一致。 主題模式 在上一章我們改進了我們的日志系統,如果使用fanout我們只能簡單進行廣播,而使用direct則允許消費者可以進行一定程度的選擇。但是direct還是有其局限性,其路由不支持多個...
摘要:發布訂閱模式在之前的文章里,創建了。我們稱之為發布訂閱模式。其實我們是用到了默認的,用空字符串來標識。空字符串代表了沒有名字的被路由到了由指定名字的。和這種關系的建立我們稱之為從現在開始這個就會將推向我們的隊列了。 發布訂閱模式 在之前的文章里,創建了work queue。work queue中,每一個task都會派發給一個worker。在本章中,我們會完成完全不一樣的事情 - 我們會...
摘要:每個消費者會得到平均數量的。為了確保不會丟失,采用確認機制。如果中斷退出了關閉了,關閉了,或是連接丟失了而沒有發送,會認為該消息沒有完整的執行,會將該消息重新入隊。該消息會被發送給其他的。當消費者中斷退出,會重新分派。 Work模式 原文地址showImg(https://segmentfault.com/img/bVbqlXr?w=694&h=252); 在第一章中,我們寫了通過一個...
摘要:如果涉及返回值,就要用到本章提到的了。方法發送請求,并阻塞知道結果返回。當有消息時,進行計算并通過指定的發送給客戶端。當接收到,則檢查。如果和之前的匹配,則將消息返回給應用進行處理。 RPC模式 在第二章中我們學習了如何使用Work模式在多個worker之間派發時間敏感的任務。這種情況是不涉及到返回值的,worker執行任務就好。如果涉及返回值,就要用到本章提到的RPC(Remote ...
摘要:生產者只能把消息發到交換器。是否要追加到一個特殊的隊列是否要追加到許多的隊列或者丟掉這條消息這些規則被定義為交換類型。有一點很關鍵,向不存在的交換器發布消息是被禁止的。如果仍然沒有隊列綁定交換器,消息會丟失。 發布與訂閱 (Publish/Subscribe) 在之前的章節中,我們創建了工作隊列,之前的工作隊列的假設是每個任務只被分發到一個worker。在這一節中,我們會做一些完全不一...
閱讀 1619·2021-11-11 10:59
閱讀 2624·2021-09-04 16:40
閱讀 3650·2021-09-04 16:40
閱讀 2979·2021-07-30 15:30
閱讀 1615·2021-07-26 22:03
閱讀 3164·2019-08-30 13:20
閱讀 2225·2019-08-29 18:31
閱讀 439·2019-08-29 12:21