摘要:可以參考源碼,項目支持網站,最新文章或實現會更新在上面前言在訂閱發布中我們建立了一個簡單的日志系統,從而將消息廣播給一些消費者。因此,發送到路由鍵的消息會發送給隊列,發送到路由鍵或者的消息會發送給,其它的消息將被丟棄。
推廣
https://segmentfault.com/l/15...
我們利用消息隊列實現了分布式事務的最終一致性解決方案,請大家圍觀。可以參考源碼:https://github.com/vvsuperman…,項目支持網站: http://rabbitmq.org.cn,最新文章或實現會更新在上面
前言在訂閱/發布中我們建立了一個簡單的日志系統,從而將log消息廣播給一些消費者。這章我們會在此基礎上加入一些新的特性-我們將有針對性的進行消息分發,比如,只把錯誤(error)消息保存到磁盤,與此同時,打印出所有的消息。
綁定我們在前面的例子中,綁定是這么來做的
channel.queueBind(queueName, EXCHANGE_NAME, "");
綁定是建立交換機和隊列之間的一種聯系:隊列會接受交換機中的消息。綁定可以用一個路由鍵來指明,為了與basic_publish區分開,我們稱之為綁定鍵(binding key):
channel.queueBind(queueName, EXCHANGE_NAME, "black");
綁定鍵跟路由器類型也有關系,我們之前用的廣播路由器,會忽略掉這個值
直達交換機(Direct Exchange)之前我們用的是廣播交換機,會將消息發送給所有的消費者。這里我們希望通過log的嚴重程度進行過濾,例如只有嚴重的錯誤才會寫入到磁盤,而warn和info消息就不用了,以此來節省磁盤空間
而廣播交換機沒法滿足這個需求-它只是無腦的發送消息。所以我們會使用直達交換機(Direct Exchange)- 消息會通過所綁定的鍵來發送給對應的隊列,可以看如下這幅圖
如上圖所示,直達交換機X綁定了兩個隊列,C1是通過orange來綁定,而C2是通過black和green綁定。因此,發送到路由鍵orange的消息會發送給隊列Q1,發送到路由鍵black或者green的消息會發送給Q2,其它的消息將被丟棄。
多項綁定
當然,多個隊列綁定到一個鍵上也是合法的,在這種情況下,直達交換機將會將消息發送給所有的隊列,就像廣播交換機一樣,如上圖所示,一個鍵為black的消息將會同時被發送給C1和C2.
我們首先需要創建一個直達路由器
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
并發送消息到這個路由器
channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
上面我們是發送給"severity",簡單起見,假設有下列幾種日志類型"severity" ,"info", "warning", "error".
訂閱消息(Subscribing)接受消息跟之前一樣,但有一點不同,我們提供了一個binding key,
String queueName = channel.queueDeclare().getQueue(); for(String severity : argv){ channel.queueBind(queueName, EXCHANGE_NAME, severity); }整合
將上面的所有代碼整合到一起
EmitLogDirect.java
import com.rabbitmq.client.*; import java.io.IOException; public class EmitLogDirect { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] argv) throws java.io.IOException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "direct"); String severity = getSeverity(argv); String message = getMessage(argv); channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes()); System.out.println(" [x] Sent "" + severity + "":"" + message + """); channel.close(); connection.close(); } //.. }
ReceiveLogsDirect.java
import com.rabbitmq.client.*; import java.io.IOException; public class ReceiveLogsDirect { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "direct"); String queueName = channel.queueDeclare().getQueue(); if (argv.length < 1){ System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]"); System.exit(1); } for(String severity : argv){ channel.queueBind(queueName, EXCHANGE_NAME, severity); } System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received "" + envelope.getRoutingKey() + "":"" + message + """); } }; channel.basicConsume(queueName, true, consumer); } }
編譯
javac -cp $CP ReceiveLogsDirect.java EmitLogDirect.java
只保存warning和error的消息到磁盤上
java -cp $CP ReceiveLogsDirect warning error > logs_from_rabbit.log
將所有的消息打印到頻幕上
java -cp $CP ReceiveLogsDirect info warning error # => [*] Waiting for logs. To exit press CTRL+C
最后,發送error消息
java -cp $CP EmitLogDirect error "Run. Run. Or it will explode." # => [x] Sent "error":"Run. Run. Or it will explode."
好了,這一章就到這兒,下一章我們將講述如何基于特定模式進行監聽
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/68120.html
摘要:推廣專題講座開源項目我們利用消息隊列實現了分布式事務的最終一致性解決方案,請大家圍觀。主題交換機也可以當成其它交換機來使用,假如隊列綁定到了那么它會接收所有的消息,就像廣播路由器一樣而如果未使用,那么就跟直達路由器一樣了。 推廣 RabbitMQ專題講座 https://segmentfault.com/l/15... CoolMQ開源項目 我們利用消息隊列實現了分布式事務的最終一致性...
摘要:推廣專題講座開源項目我們利用消息隊列實現了分布式事務的最終一致性解決方案,請大家圍觀。因此一旦有有消息,消息會廣播到所有的消費者。如此一來路由器就能夠把消息發送給相應的隊列了。 推廣 RabbitMQ專題講座 https://segmentfault.com/l/15... CoolMQ開源項目 我們利用消息隊列實現了分布式事務的最終一致性解決方案,請大家圍觀。可以參考源碼:https...
摘要:作為消息隊列的一個典型實踐,完全實現了標準,與的快快快不同,它追求的穩定可靠。同一個隊列不僅可以綁定多個生產者,而且能夠發送消息到多個消費者。消費者接受并消費消息。幾乎于完全類似是一個繼承了接口的類,方便我們來存儲消息隊列來的消息。 推廣 RabbitMQ專題講座 https://segmentfault.com/l/15... CoolMQ開源項目 我們利用消息隊列實現了分布式事務的...
摘要:因為消費消息是在另外一個進程中,我們需要阻塞我們的進程直到結果返回,使用阻塞隊列是一種非常好的方式,這里我們使用了長度為的,的功能是檢查消息的的是不是我們之前所發送的,如果是,將返回值返回到。 推廣 RabbitMQ專題講座 https://segmentfault.com/l/15... CoolMQ開源項目 我們利用消息隊列實現了分布式事務的最終一致性解決方案,請大家圍觀。可以參考...
摘要:路由模式在之前的文章中我們建立了一個簡單的日志系統。更形象的表示,如對中的感興趣。為了進行說明,像下圖這么來設置如圖,可以看到有兩個綁到了類型為的上。如圖的設置中,一個為的就會同時發送到和。接收程序可以選擇要接收日志的嚴重性級別。 路由模式 在之前的文章中我們建立了一個簡單的日志系統。我們可以通過這個系統將日志message廣播給很多接收者。 在本篇文章中,我們在這之上,添加一個新的功...
閱讀 1264·2021-10-18 13:32
閱讀 2333·2021-09-24 09:47
閱讀 1323·2021-09-23 11:22
閱讀 2463·2019-08-30 14:06
閱讀 571·2019-08-30 12:48
閱讀 1997·2019-08-30 11:03
閱讀 535·2019-08-29 17:09
閱讀 2462·2019-08-29 14:10