摘要:如果你注定要成為厲害的人那問題的答案就深藏在你的血脈里。本篇文章主要講述與的整合。有想了解重構的朋友,我之前也有對重構一書的解讀,出門左轉就能看到。
如果你注定要成為厲害的人, 那問題的答案就深藏在你的血脈里。
本篇文章主要講述Spring Boot與RabbitMQ的整合。因為我們公司的云服務用到了RabbitMQ 技術,之前都是自己封裝,正好我們也正在往SpringBoot轉變,這個技術正好用到,看來代碼又要重構咯。
有想了解重構的朋友,我之前也有對《重構》一書的解讀,出門左轉就能看到。
導包:
消息生產者org.springframework.boot spring-boot-starter-amqp
ConnectionFactory配置
創建AmqpConfig文件AmqpConfig.java(后期的配置都在該文件中)
package cn.usr.springbootrabbitmq; import com.rabbitmq.client.Channel; import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Scope; /** * @program: Learn-SpringBootRabbitmq * @author: Rock 【shizhiyuan@usr.cn】 * @Date: 2018/2/23 0023 */ @Configuration public class AmqpConfig { public static final String EXCHANGE = "spring-boot-exchange2"; public static final String ROUTINGKEY = "spring-boot-routingKey2"; @Bean public CachingConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setAddresses("127.0.0.1"); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); connectionFactory.setVirtualHost("/"); // 這里需要顯示調用才能進行消息的回調 必須要設置 connectionFactory.setPublisherConfirms(true); return connectionFactory; }RabbitTemplate
@Bean @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) public RabbitTemplate rabbitTemplate() { RabbitTemplate template = new RabbitTemplate(connectionFactory()); return template; }
這里設置為原型,具體的原因在后面會講到,在發送消息時通過調用RabbitTemplate中的如下方法:
一會調用的時候用:
public void convertAndSend(String exchange, String routingKey, Object object, CorrelationData correlationData)Producer
調用啦:
package cn.usr.springbootrabbitmq; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.support.CorrelationData; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.UUID; /** * @program: Learn-SpringBootRabbitmq * @author: Rock 【shizhiyuan@usr.cn】 * @Date: 2018/2/23 0023 */ @Component public class Producer implements RabbitTemplate.ConfirmCallback { private RabbitTemplate rabbitTemplate; /** * 構造方法注入 */ @Autowired public Producer(RabbitTemplate rabbitTemplate) { this.rabbitTemplate = rabbitTemplate; //這是是設置回調能收到發送到響應,confirm()在下面解釋 rabbitTemplate.setConfirmCallback(this); } public void sendMsg(String content) { CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString()); //convertAndSend(exchange:交換機名稱,routingKey:路由關鍵字,object:發送的消息內容,correlationData:消息ID) rabbitTemplate.convertAndSend(AmqpConfig.EXCHANGE, AmqpConfig.ROUTINGKEY, content, correlationId); } @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { System.out.println(" 回調id:" + correlationData); if (ack) { System.out.println("消息成功消費"); } else { System.out.println("消息消費失敗:" + cause); } } }
如果需要在生產者需要消息發送后的回調,需要對rabbitTemplate設置ConfirmCallback對象,由于不同的生產者需要對應不同的ConfirmCallback,如果rabbitTemplate設置為單例bean,則所有的rabbitTemplate實際的ConfirmCallback為最后一次申明的ConfirmCallback。
消息消費者還是在AmqpConfig.class里面
步驟就是
聲明交換機
聲明隊列
綁定RoutingKey
/** * 針對消費者配置 * 1. 設置交換機類型 * 2. 將隊列綁定到交換機 **
* FanoutExchange: 將消息分發到所有的綁定隊列,無routingkey的概念 * HeadersExchange :通過添加屬性key-value匹配 * DirectExchange:按照routingkey分發到指定隊列 * TopicExchange:多關鍵字匹配 */ @Bean public DirectExchange defaultExchange() { return new DirectExchange(EXCHANGE); } @Bean public Queue queue() { return new Queue("spring-boot-queue", true);//隊列持久 } @Bean public Binding binding() { return BindingBuilder.bind(queue()).to(defaultExchange()).with(AmqpConfig.ROUTINGKEY); } @Bean public SimpleMessageListenerContainer messageContainer() { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory()); container.setQueues(queue()); container.setExposeListenerChannel(true); container.setMaxConcurrentConsumers(1); container.setConcurrentConsumers(1); // 設置確認模式手工確認 container.setAcknowledgeMode(AcknowledgeMode.MANUAL); container.setMessageListener(new ChannelAwareMessageListener() { @Override public void onMessage(Message message, Channel channel) throws Exception { byte[] body = message.getBody(); System.out.println("receive msg : " + new String(body)); //確認消息成功消費 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } }); return container; }
下面是完整的配置:
package cn.usr.springbootrabbitmq; import com.rabbitmq.client.Channel; import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Scope; /** * @program: Learn-SpringBootRabbitmq * @author: Rock 【shizhiyuan@usr.cn】 * @Date: 2018/2/23 0023 */ @Configuration public class AmqpConfig { public static final String EXCHANGE = "spring-boot-exchange2"; public static final String ROUTINGKEY = "spring-boot-routingKey2"; @Bean public CachingConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setAddresses("127.0.0.1"); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); connectionFactory.setVirtualHost("/"); // 這里需要顯示調用才能進行消息的回調 必須要設置 connectionFactory.setPublisherConfirms(true); return connectionFactory; } @Bean @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) public RabbitTemplate rabbitTemplate() { RabbitTemplate template = new RabbitTemplate(connectionFactory()); return template; } /** * 針對消費者配置 * 1. 設置交換機類型 * 2. 將隊列綁定到交換機 **
* FanoutExchange: 將消息分發到所有的綁定隊列,無routingkey的概念 * HeadersExchange :通過添加屬性key-value匹配 * DirectExchange:按照routingkey分發到指定隊列 * TopicExchange:多關鍵字匹配 */ @Bean public DirectExchange defaultExchange() { return new DirectExchange(EXCHANGE); } @Bean public Queue queue() { return new Queue("spring-boot-queue", true); } @Bean public Binding binding() { return BindingBuilder.bind(queue()).to(defaultExchange()).with(AmqpConfig.ROUTINGKEY); } @Bean public SimpleMessageListenerContainer messageContainer() { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory()); container.setQueues(queue()); container.setExposeListenerChannel(true); container.setMaxConcurrentConsumers(1); container.setConcurrentConsumers(1); // 設置確認模式手工確認 container.setAcknowledgeMode(AcknowledgeMode.MANUAL); container.setMessageListener(new ChannelAwareMessageListener() { @Override public void onMessage(Message message, Channel channel) throws Exception { byte[] body = message.getBody(); System.out.println("receive msg : " + new String(body)); //確認消息成功消費 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } }); return container; } }
到這里我就能完成SpringBoot整合RabbitMQ的數據收發了。
結果:
receive msg : ceshi-----? 回調id:CorrelationData [id=dfe3b3d1-f5a3-42d9-a514-a73729e009d5] 消息成功消費
點贊收藏關注不迷路。么么噠
參考:http://blog.csdn.net/liaokail...
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/68539.html
摘要:可以在地址看到如何使用講解下上面命令行表示控制臺端口號,可以在瀏覽器中通過控制臺來執行的相關操作。同時從控制臺可以看到發送的速率多線程測試性能開了個線程,每個線程發送條消息。 showImg(http://ww2.sinaimg.cn/large/006tNc79ly1g5jjb62t88j30u00gwdi2.jpg); 前提 上次寫了篇文章,《SpringBoot Kafka 整合...
摘要:創建消息監聽,并發送一條消息在程序中,提供了發送消息和接收消息的所有方法。 這篇文章帶你了解怎么整合RabbitMQ服務器,并且通過它怎么去發送和接收消息。我將構建一個springboot工程,通過RabbitTemplate去通過MessageListenerAdapter去訂閱一個POJO類型的消息。 準備工作 15min IDEA maven 3.0 在開始構建項目之前,機器需...
摘要:介紹它是出品,最流行的,能力強勁的開源消息總線。是一個完全支持和規范的實現,盡管規范出臺已經是很久的事情了,但是在當今的應用中間仍然扮演著特殊的地位。相關文章整合使用整合使用關注我轉載請務必注明原創地址為安裝同之前一樣,直接在里面玩吧。 showImg(https://segmentfault.com/img/remote/1460000012996066?w=1920&h=1281)...
摘要:引入了新的環境和概要信息,是一種更揭秘與實戰六消息隊列篇掘金本文,講解如何集成,實現消息隊列。博客地址揭秘與實戰二數據緩存篇掘金本文,講解如何集成,實現緩存。 Spring Boot 揭秘與實戰(九) 應用監控篇 - HTTP 健康監控 - 掘金Health 信息是從 ApplicationContext 中所有的 HealthIndicator 的 Bean 中收集的, Spring...
摘要:慕課網消息中間件極速入門與實戰學習總結時間年月日星期三說明本文部分內容均來自慕課網。 慕課網《RabbitMQ消息中間件極速入門與實戰》學習總結 時間:2018年09月05日星期三 說明:本文部分內容均來自慕課網。@慕課網:https://www.imooc.com 教學源碼:無 學習源碼:https://github.com/zccodere/s... 第一章:RabbitM...
閱讀 3329·2021-11-22 12:04
閱讀 2713·2019-08-29 13:49
閱讀 485·2019-08-26 13:45
閱讀 2246·2019-08-26 11:56
閱讀 1002·2019-08-26 11:43
閱讀 596·2019-08-26 10:45
閱讀 1271·2019-08-23 16:48
閱讀 2161·2019-08-23 16:07