摘要:作為消息隊列的一個典型實踐,完全實現了標準,與的快快快不同,它追求的穩定可靠。同一個隊列不僅可以綁定多個生產者,而且能夠發送消息到多個消費者。消費者接受并消費消息。幾乎于完全類似是一個繼承了接口的類,方便我們來存儲消息隊列來的消息。
推廣
https://segmentfault.com/l/15...
我們利用消息隊列實現了分布式事務的最終一致性解決方案,請大家圍觀??梢詤⒖荚创a:https://github.com/vvsuperman…,項目支持網站: http://rabbitmq.org.cn,最新文章或實現會更新在上面
前言消息隊列想必大家都有一定了解:用來解耦,上級模塊不用關心下級模塊是否執行成功,最常見的比如說日志,核心系統并不關心日志是否成功,日志什么時候記錄。這種情形就可以用消息隊列來解耦。
RabbitMQ作為消息隊列的一個典型實踐,完全實現了AMQ標準,與Kafka的快快快不同,它追求的穩定、可靠。下面就來幾篇文章來詳細介紹下,均翻譯至RabbitMQ的官方文檔。
RabbitMQ是一個消息的中介(用來接受以及轉發消息),就像是一個非??煽康泥]局,當信件放到郵局時,信件就確保能到達,所以,RabbitMQ可以看成是郵箱、郵局、以及郵遞員的合體
RabbitMQ的一些重要概念 produceing(生產者):生產數據 queue(隊列):類似于郵箱,存在于RabbitMQ服務器的內部,用來存儲消息,并且消息只能存儲在隊列里面。隊列的大小只受RabbitMQ主機內存和硬盤的影響。同一個隊列不僅可以綁定多個生產者,而且能夠發送消息到多個消費者。
Consuming(消費者):接受并消費消息。 Hello World下面我們來寫我們的第一個“Hello World”,我們會使用Java的API來編寫一個生產者來生產消息,以及一個消費者來消費消息
P是我們的生產者,而C是我們的消費者。中間的box是我們的queue:作為消息緩沖,是RabbitMQ用來存儲轉發消息給消費者的。
Java客戶端庫RabbitMQ支持多重協議,這里我們會用AMQP 0-9-1來說明,它是一個消息隊列的通用協議。RabbitMQ同時也有多種語言的客戶端,我們在這里用Java來做說明。
首先請下載Java客戶端包以及它所依賴的SLF4J和SLF4J SIMPLE,將它們拷貝到自己的工作區。
引入RabbitMQ同樣也可以使用Maven來做依賴管理, groupId是com.rabbitmq 以及artifactId amqp-client
發送請求生產者會發送消息到MQ,然后退出
在Send.java中,首先我們import一些類
import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel;
設置我們的主類
public class Send { private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws java.io.IOException { ... } }
創建Connection
ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel();
這里我們連接的是本地,你當然也可以連接到另一個服務器上,只需要指明服務器的名稱和ip地址。
下面我們要創建一個Channel,大家可以想象一些,消息的產生和發送都是通過這個Channel完成的。
當然,我們還需要頂一個一個Queue來接受消息
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent "" + message + """);
對于Queue的定義是冥等的,如果不存在才會創建,如果存在則不會再建新的。消息會被格式化成byte的數組,方便進行任意的轉換。
最后,我們關閉通道
channel.close(); connection.close();
完整的代碼可以看這個地方:send.java
接受請求消費者會從RabbitMQ接收到請求,消息是被推到消費者,而且消費者會一直監聽著消息隊列,一旦有有新的消息就會打印出來。
Recv.java幾乎于Send完全類似
import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer;
Defaultconsumer是一個繼承了Consumer接口的類,方便我們來存儲消息隊列來的消息。建立消費者與我們建立生產者非常類似:
public class Recv { private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws java.io.IOException, java.lang.InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); ... } }
你可以注意到我們在消費者定義了一個Queue,因此我們是需要在生產者之前啟動消費者的,我們要確保我們在消費消息之前這個隊列是已經存在的。
然后我們需要告訴mq服務可以推送消息給我們。因為這個推送是異步的,因此我們可以提供一個回調方法,DefaultConsumer會暫時存儲這個消息,直到消費者以及準備好來處理接受到的消息了(消息會存儲在消費者中直到消費者有能力來消費它,可以想象一下數據庫等高IO操作)
Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received "" + message + """); } }; channel.basicConsume(QUEUE_NAME, true, consumer);
完整的Recv.java地址
跑起來我們可以先用javac來編譯程序
javac -cp amqp-client-4.0.2.jar Send.java Recv.java
而后來運行它,這需要我們在路徑加上它的依賴包,我們首先啟動的是消費者
java -cp .:amqp-client-4.0.2.jar:slf4j-api-1.7.21.jar:slf4j-simple-1.7.22.jar Recv
而后啟動發送者
java -cp .:amqp-client-4.0.2.jar:slf4j-api-1.7.21.jar:slf4j-simple-1.7.22.jar Send
消費者會持續等待,并打印從生產者哪里來的消息,你可以用(Ctrl-C)來停止它。所以你要另外開啟一個命令行窗口來運行生產者。
查看隊列也許你想知道RabbitMQ中到底有多少個消息,你可以使用rabbitmqctl工具:
sudo rabbitmqctl list_queues
在Windows中:
rabbitmqctl.bat list_queues
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/70889.html
摘要:推廣專題講座開源項目我們利用消息隊列實現了分布式事務的最終一致性解決方案,請大家圍觀。因此一旦有有消息,消息會廣播到所有的消費者。如此一來路由器就能夠把消息發送給相應的隊列了。 推廣 RabbitMQ專題講座 https://segmentfault.com/l/15... CoolMQ開源項目 我們利用消息隊列實現了分布式事務的最終一致性解決方案,請大家圍觀??梢詤⒖荚创a:https...
摘要:推廣專題講座開源項目我們利用消息隊列實現了分布式事務的最終一致性解決方案,請大家圍觀。主題交換機也可以當成其它交換機來使用,假如隊列綁定到了那么它會接收所有的消息,就像廣播路由器一樣而如果未使用,那么就跟直達路由器一樣了。 推廣 RabbitMQ專題講座 https://segmentfault.com/l/15... CoolMQ開源項目 我們利用消息隊列實現了分布式事務的最終一致性...
摘要:任務隊列最主要的功能就是解耦高耗時的操作,否則程序會一直等在那里,浪費大量資源。我們將任務封裝成一個消息發送給隊列,后臺的任務進程會得到這個任務并執行它,而且可以配置多個任務進程,進一步加大吞吐率。為了確保消息不丟失,支持消息確認。 推廣 RabbitMQ專題講座 https://segmentfault.com/l/15... CoolMQ開源項目 我們利用消息隊列實現了分布式事務的...
摘要:可以參考源碼,項目支持網站,最新文章或實現會更新在上面前言在訂閱發布中我們建立了一個簡單的日志系統,從而將消息廣播給一些消費者。因此,發送到路由鍵的消息會發送給隊列,發送到路由鍵或者的消息會發送給,其它的消息將被丟棄。 推廣 RabbitMQ專題講座 https://segmentfault.com/l/15... CoolMQ開源項目 我們利用消息隊列實現了分布式事務的最終一致性解決...
摘要:因為消費消息是在另外一個進程中,我們需要阻塞我們的進程直到結果返回,使用阻塞隊列是一種非常好的方式,這里我們使用了長度為的,的功能是檢查消息的的是不是我們之前所發送的,如果是,將返回值返回到。 推廣 RabbitMQ專題講座 https://segmentfault.com/l/15... CoolMQ開源項目 我們利用消息隊列實現了分布式事務的最終一致性解決方案,請大家圍觀??梢詤⒖?..
閱讀 2785·2021-10-14 09:42
閱讀 3608·2021-10-11 10:59
閱讀 2941·2019-08-30 11:25
閱讀 3074·2019-08-29 16:25
閱讀 3224·2019-08-26 17:40
閱讀 1225·2019-08-26 13:30
閱讀 1143·2019-08-26 11:46
閱讀 1329·2019-08-23 15:22