摘要:本文主要講述消息服務在中的使用。所以需要一個監聽容器工廠的概念,即接口,它會引用上面創建好的與的連接工廠,由它來負責接收消息以及將消息分發給指定的監聽器。為了消費消息,訂閱者必須保持運行的狀態。
JMS 在 SpringBoot 中的使用
當前環境摘要:本文屬于原創,歡迎轉載,轉載請保留出處:https://github.com/jasonGeng88/blog>
本文所有服務均采用docker容器化方式部署
Mac OS 10.11.x
docker 1.12.1
JDK 1.8
SpringBoot 1.5
前言基于之前一篇“一個故事告訴你什么是消息隊列”,了解了消息隊列的使用場景以及相關的特性。本文主要講述消息服務在 JAVA 中的使用。
市面上的有關消息隊列的技術選型非常多,如果我們的代碼框架要支持不同的消息實現,在保證框架具有較高擴展性的前提下,我們勢必要進行一定的封裝。
在 JAVA 中,大可不必如此。因為 JAVA 已經制定了一套標準的 JMS 規范。該規范定義了一套通用的接口和相關語義,提供了諸如持久、驗證和事務的消息服務,其最主要的目的是允許Java應用程序訪問現有的消息中間件。就和 JDBC 一樣。
基本概念在介紹具體的使用之前,先簡單介紹一下 JMS 的一些基本知識。這里我打算分為 3 部分來介紹,即 消息隊列(MQ)的連接、消息發送與消息接收。
這里我們的技術選型是 SpringBoot、JMS、ActiveMQ
為了更好的理解 JMS,這里沒有使用 SpringBoot 零配置來搭建項目
MQ 的連接使用 MQ 的第一步一定是先連接 MQ。因為這里使用的是 JMS 規范,對于任何遵守 JMS 規范的 MQ 來說,都會實現相應的ConnectionFactory接口,因此我們只需要創建一個ConnectionFactory工廠類,由它來實現 MQ 的連接,以及封裝一系列特性的 MQ 參數。
例子:這里我們以 ActiveMQ 為例,
maven 依賴:
org.springframework.boot spring-boot-starter-parent 1.5.3.RELEASE org.springframework.boot spring-boot-starter-activemq
創建 ActiveMQ 連接工廠:
@Bean public ConnectionFactory connectionFactory(){ ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(); connectionFactory.setBrokerURL(ActiveMQ_URL); connectionFactory.setUserName(ActiveMQ_USER); connectionFactory.setPassword(ActiveMQ_PASSWORD); return connectionFactory; }消息發送
關于消息的發送,是通過 JMS 核心包中的JmsTemplate類來實現的,它簡化了 JMS 的使用,因為在發送或同步接收消息時它幫我們處理了資源的創建和釋放。從它的作用也不難推測出,它需要引用我們上面創建的連接工廠,具體代碼如下:
@Bean public JmsTemplate jmsQueueTemplate(){ return new JmsTemplate(connectionFactory()); }
JmsTemplate創建完成后,我們就可以調用它的方法來發送消息了。這里有兩個概念需要注意:
消息會發送到哪里?-> 即需要指定發送隊列的目的地(Destination),是可以在 JNDI 中進行存儲和提取的 JMS 管理對象。
發送的消息體具體是什么?-> 實現了javax.jms.Message的對象,類似于 JAVA RMI 的 Remote 對象。
代碼示例:
@Autowired private JmsTemplate jmsQueueTemplate; /** * 發送原始消息 Message */ public void send(){ jmsQueueTemplate.send("queue1", new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { return session.createTextMessage("我是原始消息"); } }); }
優化:當然,我們不用每次都通過MessageCreator 匿名類的方式來創建Message對象,JmsTemplate類中提供了對象實體自動轉換為Message對象的方法,convertAndSend(String destinationName, final Object message)。
優化代碼示例:
/** * 發送消息自動轉換成原始消息 */ public void convertAndSend(){ jmsQueueTemplate.convertAndSend("queue1", "我是自動轉換的消息"); }
注:關于消息轉換,還可以通過實現MessageConverter接口來自定義轉換內容
消息接收講完了消息發送,我們最后來說說消息是如何接收的。消息既然是以Message對象的形式發送到指定的目的地,那么消息的接收勢必會去指定的目的地上去接收消息。這里采用的是監聽者的方式來監聽指定地點的消息,采用注解@JmsListener來設置監聽方法。
代碼示例:
@Component public class Listener1 { @JmsListener(destination = "queue1") public void receive(String msg){ System.out.println("監聽到的消息內容為: " + msg); } }
有了監聽的目標和方法后,監聽器還得和 MQ 關聯起來,這樣才能運作起來。這里的監聽器可能不止一個,如果每個都要和 MQ 建立連接,肯定不太合適。所以需要一個監聽容器工廠的概念,即接口JmsListenerContainerFactory,它會引用上面創建好的與 MQ 的連接工廠,由它來負責接收消息以及將消息分發給指定的監聽器。當然也包括事務管理、資源獲取與釋放和異常轉換等。
代碼示例:
@Bean public DefaultJmsListenerContainerFactory jmsQueueListenerContainerFactory() { DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); factory.setConnectionFactory(connectionFactory()); //設置連接數 factory.setConcurrency("3-10"); //重連間隔時間 factory.setRecoveryInterval(1000L); return factory; }場景
代碼地址:https://github.com/jasonGeng88/springboot-jms
對 JMS 有了基本的理解后,我們就來在具體的場景中使用一下。
首先,我們需要先啟動 ActiveMQ,這里我們以 Docker 容器化的方式進行啟動。
啟動命令:
docker run -d -p 8161:8161 -p 61616:61616 --name activemq webcenter/activemq
啟動成功后,在 ActiveMQ 可視化界面查看效果(http://localhost:8161):
點對點模式(單消費者)下面介紹消息隊列中最常用的一種場景,即點對點模式。基本概念如下:
每個消息只能被一個消費者(Consumer)進行消費。一旦消息被消費后,就不再在消息隊列中存在。
發送者和接收者之間在時間上沒有依賴性,也就是說當發送者發送了消息之后,不管接收者有沒有正在運行,它不會影響到消息被發送到隊列。
接收者在成功接收消息之后需向隊列應答成功。
代碼實現(為簡化代碼,部分代碼沿用上面所述):啟動文件(Application.java)
@SpringBootApplication @EnableJms public class Application { ... /** * JMS 隊列的模板類 * connectionFactory() 為 ActiveMQ 連接工廠 */ @Bean public JmsTemplate jmsQueueTemplate(){ return new JmsTemplate(connectionFactory()); } public static void main(String[] args) { SpringApplication.run(Application.class, args); } }
注解@EnableJms設置在@Configuration類上,用來聲明對 JMS 注解的支持。
消息生產者(PtpProducer.java)
@Component public class PtpProducer { @Autowired private JmsTemplate jmsQueueTemplate; /** * 發送消息自動轉換成原始消息 */ public void convertAndSend(){ jmsQueueTemplate.convertAndSend("ptp", "我是自動轉換的消息"); } }
生產者調用類(PtpController.java)
@RestController @RequestMapping(value = "/ptp") public class PtpController { @Autowired private PtpProducer ptpProducer; @RequestMapping(value = "/convertAndSend") public Object convertAndSend(){ ptpProducer.convertAndSend(); return "success"; } }
消息監聽容器工廠
@SpringBootApplication @EnableJms public class Application { ... /** * JMS 隊列的監聽容器工廠 */ @Bean(name = "jmsQueueListenerCF") public DefaultJmsListenerContainerFactory jmsQueueListenerContainerFactory() { DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); factory.setConnectionFactory(connectionFactory()); //設置連接數 factory.setConcurrency("3-10"); //重連間隔時間 factory.setRecoveryInterval(1000L); return factory; } ... }
消息監聽器
@Component public class PtpListener1 { /** * 消息隊列監聽器 * destination 隊列地址 * containerFactory 監聽器容器工廠, 若存在2個以上的監聽容器工廠,需進行指定 */ @JmsListener(destination = "ptp", containerFactory = "jmsQueueListenerCF") public void receive(String msg){ System.out.println("點對點模式1: " + msg); } }演示
啟動項目啟動后,通過 REST 接口的方式來調用消息生產者發送消息,請求如下:
curl -XGET 127.0.0.1:8080/ptp/convertAndSend
消費者控制臺信息:
ActiveMQ 控制臺信息:
列表說明:
Name:隊列名稱。
Number Of Pending Messages:等待消費的消息個數。
Number Of Consumers:當前連接的消費者數目,因為我們采用的是連接池的方式連接,初始連接數為 3,所以顯示數字為 3。
Messages Enqueued:進入隊列的消息總個數,包括出隊列的和待消費的,這個數量只增不減。
Messages Dequeued:出了隊列的消息,可以理解為是已經消費的消息數量。
點對點模式(多消費者)基于上面一個消費者消費的模式,因為生產者可能會有很多,同時像某個隊列發送消息,這時一個消費者可能會成為瓶頸。所以需要多個消費者來分攤消費壓力(消費線程池能解決一定壓力,但畢竟在單機上,做不到分布式分布,所以多消費者是有必要的),也就產生了下面的場景。
代碼實現添加新的監聽器
@Component public class PtpListener2 { @JmsListener(destination = Constant.QUEUE_NAME, containerFactory = "jmsQueueListenerCF") public void receive(String msg){ System.out.println("點對點模式2: " + msg); } }演示
這里我們發起 10 次請求,來觀察消費者的消費情況:
這里因為監聽容器設置了線程池的緣故,在實際消費過程中,監聽器消費的順序會有所差異。
發布訂閱模式除了點對點模式,發布訂閱模式也是消息隊列中常見的一種使用。試想一下,有一個即時聊天群,你在群里發送一條消息。所有在這個群里的人(即訂閱了該群的人),都會收到你發送的信息。
基本概念:
每個消息可以有多個消費者。
發布者和訂閱者之間有時間上的依賴性。針對某個主題(Topic)的訂閱者,它必須創建一個訂閱者之后,才能消費發布者的消息。
為了消費消息,訂閱者必須保持運行的狀態。
代碼實現修改 JmsTemplate 模板類,使其支持發布訂閱功能
@SpringBootApplication @EnableJms public class Application { ... @Bean public JmsTemplate jmsTopicTemplate(){ JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory()); jmsTemplate.setPubSubDomain(true); return jmsTemplate; } ... }
消息生產者(PubSubProducer.java)
@Component public class PtpProducer { @Autowired private JmsTemplate jmsTopicTemplate; public void convertAndSend(){ jmsTopicTemplate.convertAndSend("topic", "我是自動轉換的消息"); } }
生產者調用類(PubSubController.java)
@RestController @RequestMapping(value = "/pubsub") public class PtpController { @Autowired private PubSubProducer pubSubProducer; @RequestMapping(value = "/convertAndSend") public String convertAndSend(){ pubSubProducer.convertAndSend(); return "success"; } }
修改 DefaultJmsListenerContainerFactory 類,使其支持發布訂閱功能
@SpringBootApplication @EnableJms public class Application { ... /** * JMS 隊列的監聽容器工廠 */ @Bean(name = "jmsTopicListenerCF") public DefaultJmsListenerContainerFactory jmsTopicListenerContainerFactory() { DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); factory.setConnectionFactory(connectionFactory()); factory.setConcurrency("1"); factory.setPubSubDomain(true); return factory; } ... }
消息監聽器(這里設置2個訂閱者)
@Component public class PubSubListener1 { @JmsListener(destination = "topic", containerFactory = "jmsTopicListenerCF") public void receive(String msg){ System.out.println("訂閱者1 - " + msg); } } @Component public class PubSubListener2 { @JmsListener(destination = "topic", containerFactory = "jmsTopicListenerCF") public void receive(String msg){ System.out.println("訂閱者2 - " + msg); } }演示
curl -XGET 127.0.0.1:8080/pubSub/convertAndSend
消費者控制臺信息:
ActiveMQ 控制臺信息:
總結這里只是對 SpringBoot 與 JMS 集成的簡單說明與使用,詳細的介紹可以查看 Spring 的官方文檔,我這里也有幸參與 并發編程網 發起的 Spring 5 的翻譯工作,我主要翻譯了 Spring 5 的 JMS 章節,其內容對于上述 JMS 的基本概念,都有詳細的展開說明,有興趣的可以看一下,當然翻譯水平有限,英文好的建議看原文。
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/67190.html
摘要:異步發送不會在受到的確認之前一直阻塞方法。方法成功返回意味著所有的持久消息都以被寫到二級存儲中。總結默認情況,非持久化消息事務內的消息均采用異步發送對于持久化消息采用同步發送。 ActiveMq事務 ActiveMq事務的作用就是在發送、接收處理消息過程中,如果出現問題,可以回滾。 ActiveMq異步/同步發送 以下摘抄自https://blog.csdn.net/songhai.....
摘要:介紹它是出品,最流行的,能力強勁的開源消息總線。是一個完全支持和規范的實現,盡管規范出臺已經是很久的事情了,但是在當今的應用中間仍然扮演著特殊的地位。相關文章整合使用整合使用關注我轉載請務必注明原創地址為安裝同之前一樣,直接在里面玩吧。 showImg(https://segmentfault.com/img/remote/1460000012996066?w=1920&h=1281)...
摘要:對提供了很好的支持,對其做了起步依賴。構架工程創建一個工程,在其文件加入添加配置在中填寫自己的郵箱密碼。啟用設置附件發送郵件郵件已發送測試已全部通過,沒有坑。 springboot對JMS提供了很好的支持,對其做了起步依賴。 構架工程 創建一個springboot工程,在其pom文件加入: org.springframework.boot spring-boot-st...
摘要:下班后閑著無聊看了下中的自動配置,把我的理解跟大家說下。上述的每一個自動配置類都有自動配置功能,也可在配置文件中自定義配置。 微信公眾號:一個優秀的廢人。如有問題,請后臺留言,反正我也不會聽。 前言 這個月過去兩天了,這篇文章才跟大家見面,最近比較累,大家見諒下。下班后閑著無聊看了下 SpringBoot 中的自動配置,把我的理解跟大家說下。 配置文件能寫什么? 相信接觸過 Sprin...
閱讀 3385·2021-11-24 09:38
閱讀 1385·2021-11-22 15:08
閱讀 1454·2021-09-29 09:35
閱讀 475·2021-09-02 15:11
閱讀 1304·2019-08-30 12:55
閱讀 384·2019-08-29 17:16
閱讀 492·2019-08-29 11:30
閱讀 415·2019-08-26 13:23