摘要:集成實例個人在學習時發現網上很少有系統性介紹和如何集成的,其他人總結的都片段化,所以結合個人調研過程,整理此篇文章。
springboot 集成rabbitmq 實例
個人在學習rabbitmq時發現網上很少有系統性介紹springboot和rabbitmq如何集成的,其他人總結的都片段化,所以結合個人調研過程,整理此篇文章。
本文章共分為以下部分:
rabbitmq簡介
springboot配置
rabbitmq生產者配置
rabbitmq消費者配置
問題補充
一、rabbitmq簡介目前流程的消息隊列主要有:ActivityMQ/kafka/redis/rabbitmq等,各有各自的應用場景,關于各個框架的介紹,大家可自行百度,網上很多文章介紹~其中rabbit因為其ack特性以及還算不錯的性能被大多數公司采用。
概念:生產者 消息的產生方,負責將消息推送到消息隊列
消費者 消息的最終接受方,負責監聽隊列中的對應消息,消費消息
隊列 消息的寄存器,負責存放生產者發送的消息
交換機 負責根據一定規則分發生產者產生的消息
綁定 完成交換機和隊列之間的綁定
模式:direct
直連模式,用于實例間的任務分發
topic
話題模式,通過可配置的規則分發給綁定在該exchange上的隊列
headers
適用規則復雜的分發,用headers里的參數表達規則
fanout
分發給所有綁定到該exchange上的隊列,忽略routing key
單機版安裝很簡單,大概步驟如下:
# 安裝erlang包 ? ? yum install erlang # 安裝socat ? ? yum install socat # 安裝rabbit? ? rpm -ivh rabbitmq-server-3.6.6-1.el6.noarch.rpm? # 啟動服務 rabbitmq-server start # 增加管理控制功能 rabbitmq-plugins?enable?rabbitmq_management # 增加用戶: ? ??sudo rabbitmqctl add_user root password ? ? rabbitmqctl?set_user_tags root?administrator? ? ? rabbitmqctl?set_permissions?-p?/ root ".*" ".*" ".*"
集群安裝,可參考以下博客:
? ? ?
rabbitmq集群安裝
廢話少說直接上代碼:
配置參數
application.yml:
spring: rabbitmq: addresses: 192.168.1.1:5672 username: username password: password publisher-confirms: true virtual-host: /
java config讀取參數
/** * RabbitMq配置文件讀取類 * * @author chenhf * @create 2017-10-23 上午9:31 **/ @Configuration @ConfigurationProperties(prefix = "spring.rabbitmq") public class RabbitMqConfig { @Value("${spring.rabbitmq.addresses}") private String addresses; @Value("${spring.rabbitmq.username}") private String username; @Value("${spring.rabbitmq.password}") private String password; @Value("${spring.rabbitmq.publisher-confirms}") private Boolean publisherConfirms; @Value("${spring.rabbitmq.virtual-host}") private String virtualHost; // 構建mq實例工廠 @Bean public ConnectionFactory connectionFactory(){ CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setAddresses(addresses); connectionFactory.setUsername(username); connectionFactory.setPassword(password); connectionFactory.setPublisherConfirms(publisherConfirms); connectionFactory.setVirtualHost(virtualHost); return connectionFactory; } @Bean public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){ return new RabbitAdmin(connectionFactory); } @Bean @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) public RabbitTemplate rabbitTemplate(){ RabbitTemplate template = new RabbitTemplate(connectionFactory()); return template; } }三、rabbitmq生產者配置
主要配置了直連和話題模式,其中話題模式設置兩個隊列(queueTopicTest1、queueTopicTest2),此兩個隊列在和交換機綁定時分別設置不同的routingkey(.TEST.以及lazy.#)來驗證匹配模式。
/** * 用于配置交換機和隊列對應關系 * 新增消息隊列應該按照如下步驟 * 1、增加queue bean,參見queueXXXX方法 * 2、增加queue和exchange的binding * @author chenhf * @create 2017-10-23 上午10:33 **/ @Configuration @AutoConfigureAfter(RabbitMqConfig.class) public class RabbitMqExchangeConfig { /** logger */ private static final Logger logger = LoggerFactory.getLogger(RabbitMqExchangeConfig.class); /** * @Author:chenhf * @Description: 主題型交換機 * @Date:下午5:49 2017/10/23 * @param * @return */ @Bean TopicExchange contractTopicExchangeDurable(RabbitAdmin rabbitAdmin){ TopicExchange contractTopicExchange = new TopicExchange(RabbitMqEnum.Exchange.CONTRACT_TOPIC.getCode()); rabbitAdmin.declareExchange(contractTopicExchange); logger.debug("完成主題型交換機bean實例化"); return contractTopicExchange; } /** * 直連型交換機 */ @Bean DirectExchange contractDirectExchange(RabbitAdmin rabbitAdmin) { DirectExchange contractDirectExchange = new DirectExchange(RabbitMqEnum.Exchange.CONTRACT_DIRECT.getCode()); rabbitAdmin.declareExchange(contractDirectExchange); logger.debug("完成直連型交換機bean實例化"); return contractDirectExchange; } //在此可以定義隊列 @Bean Queue queueTest(RabbitAdmin rabbitAdmin){ Queue queue = new Queue(RabbitMqEnum.QueueName.TESTQUEUE.getCode()); rabbitAdmin.declareQueue(queue); logger.debug("測試隊列實例化完成"); return queue; } //topic 1 @Bean Queue queueTopicTest1(RabbitAdmin rabbitAdmin){ Queue queue = new Queue(RabbitMqEnum.QueueName.TOPICTEST1.getCode()); rabbitAdmin.declareQueue(queue); logger.debug("話題測試隊列1實例化完成"); return queue; } //topic 2 @Bean Queue queueTopicTest2(RabbitAdmin rabbitAdmin){ Queue queue = new Queue(RabbitMqEnum.QueueName.TOPICTEST2.getCode()); rabbitAdmin.declareQueue(queue); logger.debug("話題測試隊列2實例化完成"); return queue; } //在此處完成隊列和交換機綁定 @Bean Binding bindingQueueTest(Queue queueTest,DirectExchange exchange,RabbitAdmin rabbitAdmin){ Binding binding = BindingBuilder.bind(queueTest).to(exchange).with(RabbitMqEnum.QueueEnum.TESTQUEUE.getCode()); rabbitAdmin.declareBinding(binding); logger.debug("測試隊列與直連型交換機綁定完成"); return binding; } //topic binding1 @Bean Binding bindingQueueTopicTest1(Queue queueTopicTest1,TopicExchange exchange,RabbitAdmin rabbitAdmin){ Binding binding = BindingBuilder.bind(queueTopicTest1).to(exchange).with(RabbitMqEnum.QueueEnum.TESTTOPICQUEUE1.getCode()); rabbitAdmin.declareBinding(binding); logger.debug("測試隊列與話題交換機1綁定完成"); return binding; } //topic binding2 @Bean Binding bindingQueueTopicTest2(Queue queueTopicTest2,TopicExchange exchange,RabbitAdmin rabbitAdmin){ Binding binding = BindingBuilder.bind(queueTopicTest2).to(exchange).with(RabbitMqEnum.QueueEnum.TESTTOPICQUEUE2.getCode()); rabbitAdmin.declareBinding(binding); logger.debug("測試隊列與話題交換機2綁定完成"); return binding; } }
在這里用到枚舉類:RabbitMqEnum
/** * 定義rabbitMq需要的常量 * * @author chenhf * @create 2017-10-23 下午4:07 **/ public class RabbitMqEnum { /** * @param * @Author:chenhf * @Description:定義數據交換方式 * @Date:下午4:08 2017/10/23 * @return */ public enum Exchange { CONTRACT_FANOUT("CONTRACT_FANOUT", "消息分發"), CONTRACT_TOPIC("CONTRACT_TOPIC", "消息訂閱"), CONTRACT_DIRECT("CONTRACT_DIRECT", "點對點"); private String code; private String name; Exchange(String code, String name) { this.code = code; this.name = name; } public String getCode() { return code; } public String getName() { return name; } } /** * describe: 定義隊列名稱 * creat_user: chenhf * creat_date: 2017/10/31 **/ public enum QueueName { TESTQUEUE("TESTQUEUE", "測試隊列"), TOPICTEST1("TOPICTEST1", "topic測試隊列"), TOPICTEST2("TOPICTEST2", "topic測試隊列"); private String code; private String name; QueueName(String code, String name) { this.code = code; this.name = name; } public String getCode() { return code; } public String getName() { return name; } } /** * describe: 定義routing_key * creat_user: chenhf * creat_date: 2017/10/31 **/ public enum QueueEnum { TESTQUEUE("TESTQUEUE1", "測試隊列key"), TESTTOPICQUEUE1("*.TEST.*", "topic測試隊列key"), TESTTOPICQUEUE2("lazy.#", "topic測試隊列key"); private String code; private String name; QueueEnum(String code, String name) { this.code = code; this.name = name; } public String getCode() { return code; } public String getName() { return name; } } }
以上完成消息生產者的定義,下面封裝調用接口
測試時直接調用此工具類,testUser類需自己實現
rabbitMqSender.sendRabbitmqDirect("TESTQUEUE1",testUser); rabbitMqSender.sendRabbitmqTopic("lazy.1.2",testUser); rabbitMqSender.sendRabbitmqTopic("lazy.TEST.2",testUser);
/** * rabbitmq發送消息工具類 * * @author chenhf * @create 2017-10-26 上午11:10 **/ @Component public class RabbitMqSender implements RabbitTemplate.ConfirmCallback{ /** logger */ private static final Logger logger = LoggerFactory.getLogger(RabbitMqSender.class); private RabbitTemplate rabbitTemplate; @Autowired public RabbitMqSender(RabbitTemplate rabbitTemplate) { this.rabbitTemplate = rabbitTemplate; this.rabbitTemplate.setConfirmCallback(this); } @Override public void confirm(CorrelationData correlationData, boolean b, String s) { logger.info("confirm: " + correlationData.getId()); } /** * 發送到 指定routekey的指定queue * @param routeKey * @param obj */ public void sendRabbitmqDirect(String routeKey,Object obj) { CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); logger.info("send: " + correlationData.getId()); this.rabbitTemplate.convertAndSend(RabbitMqEnum.Exchange.CONTRACT_DIRECT.getCode(), routeKey , obj, correlationData); } /** * 所有發送到Topic Exchange的消息被轉發到所有關心RouteKey中指定Topic的Queue上 * @param routeKey * @param obj */ public void sendRabbitmqTopic(String routeKey,Object obj) { CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); logger.info("send: " + correlationData.getId()); this.rabbitTemplate.convertAndSend(RabbitMqEnum.Exchange.CONTRACT_TOPIC.getCode(), routeKey , obj, correlationData); } }四、rabbitmq消費者配置
springboot注解方式監聽隊列,無法手動指定回調,所以采用了實現ChannelAwareMessageListener接口,重寫onMessage來進行手動回調,詳見以下代碼,詳細介紹可以在spring的官網上找amqp相關章節閱讀
直連消費者
通過設置TestUser的name來測試回調,分別發兩條消息,一條UserName為1,一條為2,查看控制臺中隊列中消息是否被消費
/** * 消費者配置 * * @author chenhf * @create 2017-10-30 下午3:14 **/ @Configuration @AutoConfigureAfter(RabbitMqConfig.class) public class ExampleAmqpConfiguration { @Bean("testQueueContainer") public MessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.setQueueNames("TESTQUEUE"); container.setMessageListener(exampleListener()); container.setAcknowledgeMode(AcknowledgeMode.MANUAL); return container; } @Bean("testQueueListener") public ChannelAwareMessageListener exampleListener() { return new ChannelAwareMessageListener() { @Override public void onMessage(Message message, Channel channel) throws Exception { TestUser testUser = (TestUser) SerializeUtil.unserialize(message.getBody()); //通過設置TestUser的name來測試回調,分別發兩條消息,一條UserName為1,一條為2,查看控制臺中隊列中消息是否被消費 if ("2".equals(testUser.getUserName())){ System.out.println(testUser.toString()); channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); } if ("1".equals(testUser.getUserName())){ System.out.println(testUser.toString()); channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true); } } }; } }
topic消費者1
/** * 消費者配置 * * @author chenhf * @create 2017-10-30 下午3:14 **/ @Configuration @AutoConfigureAfter(RabbitMqConfig.class) public class TopicAmqpConfiguration { @Bean("topicTest1Container") public MessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.setQueueNames("TOPICTEST1"); container.setMessageListener(exampleListener1()); container.setAcknowledgeMode(AcknowledgeMode.MANUAL); return container; } @Bean("topicTest1Listener") public ChannelAwareMessageListener exampleListener1(){ return new ChannelAwareMessageListener() { @Override public void onMessage(Message message, Channel channel) throws Exception { TestUser testUser = (TestUser) SerializeUtil.unserialize(message.getBody()); System.out.println("TOPICTEST1:"+testUser.toString()); channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); } }; } }
topic消費者2
/** * 消費者配置 * * @author chenhf * @create 2017-10-30 下午3:14 **/ @Configuration @AutoConfigureAfter(RabbitMqConfig.class) public class TopicAmqpConfiguration2 { @Bean("topicTest2Container") public MessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.setQueueNames("TOPICTEST2"); container.setMessageListener(exampleListener()); container.setAcknowledgeMode(AcknowledgeMode.MANUAL); return container; } @Bean("topicTest2Listener") public ChannelAwareMessageListener exampleListener() { return new ChannelAwareMessageListener() { @Override public void onMessage(Message message, Channel channel) throws Exception { TestUser testUser = (TestUser) SerializeUtil.unserialize(message.getBody()); System.out.println("TOPICTEST2:"+testUser.toString()); channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); } }; } }問題補充
使用過程中可能出現的坑參考此篇文章
https://segmentfault.com/a/11...
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/67921.html
摘要:而調用后端服務就應用了的高級特分布式配置管理平臺后端掘金輕量的分布式配置管理平臺。關于網絡深度解讀后端掘金什么是網絡呢總的來說,網絡中的容器們可以相互通信,網絡外的又訪問不了這些容器。 在 Java 路上,我看過的一些書、源碼和框架(持續更新) - 后端 - 掘金簡書 占小狼轉載請注明原創出處,謝謝!如果讀完覺得有收獲的話,歡迎點贊加關注 物有本末,事有終始,知所先后,則近道矣 ......
摘要:介紹死信隊列沒有被及時消費的消息存放的隊列,消息沒有被及時消費有以下幾點原因有消息被拒絕并且隊列達到最大長度消息過期場景小時進入初始隊列,等待分鐘后進入分鐘隊列消息等待分鐘后進入執行隊列執行失敗后重新回到分鐘隊列失敗次后,消息進入小時隊列消 介紹 死信隊列:沒有被及時消費的消息存放的隊列,消息沒有被及時消費有以下幾點原因:1.有消息被拒絕(basic.reject/ basic.nac...
摘要:引入了新的環境和概要信息,是一種更揭秘與實戰六消息隊列篇掘金本文,講解如何集成,實現消息隊列。博客地址揭秘與實戰二數據緩存篇掘金本文,講解如何集成,實現緩存。 Spring Boot 揭秘與實戰(九) 應用監控篇 - HTTP 健康監控 - 掘金Health 信息是從 ApplicationContext 中所有的 HealthIndicator 的 Bean 中收集的, Spring...
背景: 在一些應用場景中,程序并不需要同步執行,例如用戶注冊之后的郵件或者短信通知提醒。這種場景的實現則是在當前線程,開啟一個新線 程,當前線程在開啟新線程之后會繼續往下執行,無需等待新線程執行完成。 但例如一些需要延時的場景則不只是開啟新線程執行如此簡單了。譬如提交訂單后在15分鐘內沒有完成支付,訂單需要關閉,這種情 況,是否只開啟一個異步線程就不適用了呢。 那么就單單實現...
閱讀 3055·2021-11-25 09:43
閱讀 1033·2021-11-24 10:22
閱讀 1361·2021-09-22 15:26
閱讀 689·2019-08-30 15:44
閱讀 2468·2019-08-29 16:33
閱讀 3702·2019-08-26 18:42
閱讀 915·2019-08-23 18:07
閱讀 1837·2019-08-23 17:55