背景:
在一些應用場景中,程序并不需要同步執行,例如用戶注冊之后的郵件或者短信通知提醒。這種場景的實現則是在當前線程,開啟一個新線 程,當前線程在開啟新線程之后會繼續往下執行,無需等待新線程執行完成。 但例如一些需要延時的場景則不只是開啟新線程執行如此簡單了。譬如提交訂單后在15分鐘內沒有完成支付,訂單需要關閉,這種情 況,是否只開啟一個異步線程就不適用了呢。
那么就單單實現在提交訂單后的15分鐘內,如果沒有完成支付,系統關閉訂單。有哪些可行的方案呢。
方案:
使用定時任務輪詢訂單表,查詢出訂單創建了15分鐘以上并且未支付的訂單,如果有查詢出此類訂單則執行關閉。
缺點:假設每1分鐘輪詢一次,則會存在秒級誤差,如果秒級輪詢,則會極其消耗性能,影響程序的健壯性。
提交訂單時開啟一個新線程,而新線程直接休眠15分鐘,休眠結束后開始執行訂單關閉
缺點:如果在線程休眠時,重啟了整個服務,那么會怎樣呢?
使用延時消息隊列
缺點:需要額外部署消息中間件
綜上考慮:使用延時消息隊列則成為最佳選擇,消息延時發布之后,保存在消息中間件中,在15分鐘后才會正式發布至隊列,延時隊列監聽器在15分鐘后監聽到消息時,才開始執行,而這期間,即使項目重啟也沒有關系。
以springboot為基礎框架,集成rabbitmq實現延時隊列注意:這里不采用網上流傳的死信隊列轉發,而是采用rabbitmq3.7+版本的延時隊列插件,所以務必安裝3.7+版本并啟用延時隊列插件。增加amqp依賴
修改application.yml文件,配置rabbitmq,并且開啟消息的手動應答org.springframework.boot spring-boot-starter-parent 1.5.4.RELEASE org.springframework.boot spring-boot-starter-web org.springframework.boot spring-boot-starter-amqp
spring: rabbitmq: host: 127.0.0.1 port: 5672 username: admin password: admin listener: direct: acknowledge-mode: MANUAL simple: acknowledge-mode: MANUAL配置隊列,路由,交換機
package cn.rongyuan.config; import java.util.HashMap; import java.util.Map; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @title rabbitmq配置類 * @author zengzp * @time 2018年8月20日 上午10:46:43 * @Description */ @Configuration public class RabbitConfig { // 支付超時延時交換機 public static final String Delay_Exchange_Name = "delay.exchange"; // 超時訂單關閉隊列 public static final String Timeout_Trade_Queue_Name = "close_trade"; @Bean public Queue delayPayQueue() { return new Queue(RabbitConfig.Timeout_Trade_Queue_Name, true); } // 定義廣播模式的延時交換機 無需綁定路由 @Bean FanoutExchange delayExchange(){ Map在提交訂單時發布消息至延時隊列并且指定延時時長args = new HashMap (); args.put("x-delayed-type", "direct"); FanoutExchange topicExchange = new FanoutExchange(RabbitConfig.Delay_Exchange_Name, true, false, args); topicExchange.setDelayed(true); return topicExchange; } // 綁定延時隊列與交換機 @Bean public Binding delayPayBind() { return BindingBuilder.bind(delayPayQueue()).to(delayExchange()); } // 定義消息轉換器 @Bean Jackson2JsonMessageConverter jsonMessageConverter() { return new Jackson2JsonMessageConverter(); } // 定義消息模板用于發布消息,并且設置其消息轉換器 @Bean RabbitTemplate rabbitTemplate(final ConnectionFactory connectionFactory) { final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setMessageConverter(jsonMessageConverter()); return rabbitTemplate; } @Bean RabbitAdmin rabbitAdmin(final ConnectionFactory connectionFactory) { return new RabbitAdmin(connectionFactory); } }
@Autowired RabbitTemplate rabbitTemplate; // 通過廣播模式發布延時消息 延時30分鐘 持久化消息 消費后銷毀 這里無需指定路由,會廣播至每個綁定此交換機的隊列 rabbitTemplate.convertAndSend(RabbitConfig.Delay_Exchange_Name, "", trade.getTradeCode(), message ->{ message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT); message.getMessageProperties().setDelay(30 * (60*1000)); // 毫秒為單位,指定此消息的延時時長 return message; });消費端監聽延時隊列
package cn.rongyuan.mq.consumer; import java.io.IOException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import com.rabbitmq.client.Channel; import cn.rongyuan.config.RabbitConfig; import cn.rongyuan.service.TradeService; import cn.rongyuan.util.ExceptionUtil; /** * @title 消息消費端 * @author zengzp * @time 2018年8月20日 上午11:00:26 * @Description */ @Component public class PayTimeOutConsumer { @Autowired TradeService tradeService; private Logger logger = LoggerFactory.getLogger(getClass()); @RabbitListener(queues = RabbitConfig.Timeout_Trade_Queue_Name) public void process(String tradeCode, Message message, Channel channel) throws IOException{ try { logger.info("開始執行訂單[{}]的支付超時訂單關閉......", tradeCode); tradeService.cancelTrade(tradeCode); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); logger.info("超時訂單處理完畢"); } catch (Exception e) { logger.error("超時訂單處理失敗:{}", ExceptionUtil.getMessage(e)); channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); } } }
參考資料:
1、spring amqp 官方文檔:https://docs.spring.io/spring-amqp/docs/2.0.0.M2/reference/htmlsingle/#delayed-message-exchange 2、rabbitmq 官方文檔:http://www.rabbitmq.com/blog/2015/04/16/scheduling-messages-with-rabbitmq/
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/76788.html
摘要:另一種就是用中的位于包下,本質是由和實現的阻塞優先級隊列。表明了一條消息可在隊列中存活的最大時間。當某條消息被設置了或者當某條消息進入了設置了的隊列時,這條消息會在時間后死亡成為。 SpringBoot 是為了簡化 Spring 應用的創建、運行、調試、部署等一系列問題而誕生的產物,自動裝配的特性讓我們可以更好的關注業務本身而不是外部的XML配置,我們只需遵循規范,引入相關的依賴就可...
摘要:可以在地址看到如何使用講解下上面命令行表示控制臺端口號,可以在瀏覽器中通過控制臺來執行的相關操作。同時從控制臺可以看到發送的速率多線程測試性能開了個線程,每個線程發送條消息。 showImg(http://ww2.sinaimg.cn/large/006tNc79ly1g5jjb62t88j30u00gwdi2.jpg); 前提 上次寫了篇文章,《SpringBoot Kafka 整合...
摘要:集成實例個人在學習時發現網上很少有系統性介紹和如何集成的,其他人總結的都片段化,所以結合個人調研過程,整理此篇文章。 springboot 集成rabbitmq 實例 個人在學習rabbitmq時發現網上很少有系統性介紹springboot和rabbitmq如何集成的,其他人總結的都片段化,所以結合個人調研過程,整理此篇文章。 本文章共分為以下部分: rabbitmq簡介 sprin...
摘要:本文將會講解如何使用實現延時重試和失敗消息隊列,實現可靠的消息消費,消費失敗后,自動延時將消息重新投遞,當達到一定的重試次數后,將消息投遞到失敗消息隊列,等待人工介入處理。 RabbitMQ是一款使用Erlang開發的開源消息隊列。本文假設讀者對RabbitMQ是什么已經有了基本的了解,如果你還不知道它是什么以及可以用來做什么,建議先從官網的 RabbitMQ Tutorials 入門...
摘要:本文將會講解如何使用實現延時重試和失敗消息隊列,實現可靠的消息消費,消費失敗后,自動延時將消息重新投遞,當達到一定的重試次數后,將消息投遞到失敗消息隊列,等待人工介入處理。 RabbitMQ是一款使用Erlang開發的開源消息隊列。本文假設讀者對RabbitMQ是什么已經有了基本的了解,如果你還不知道它是什么以及可以用來做什么,建議先從官網的 RabbitMQ Tutorials 入門...
閱讀 3981·2021-11-22 15:31
閱讀 2518·2021-11-18 13:20
閱讀 3098·2021-11-15 11:37
閱讀 6957·2021-09-22 15:59
閱讀 736·2021-09-13 10:27
閱讀 3767·2021-09-09 09:33
閱讀 1434·2019-08-30 15:53
閱讀 2562·2019-08-29 15:37