摘要:消費(fèi)者,監(jiān)聽生產(chǎn)者往指定目的地發(fā)送消息后,接下來(lái)就是消費(fèi)者對(duì)指定目的地的消息進(jìn)行消費(fèi)了。它不會(huì)動(dòng)態(tài)的適應(yīng)運(yùn)行時(shí)需要和參與外部的事務(wù)管理。它很好的平衡了對(duì)提供者要求低先進(jìn)功能如事務(wù)參與和兼容環(huán)境。
深入淺出 JMS(七) - ActiveMQ 與 Spring 整合 一、與spring整合實(shí)現(xiàn)ptp的同步接收消息 (1)config.properties
## ActiveMQ Config activemq.brokerURL=tcp://127.0.0.1:61616 activemq.userName=admin activemq.password=password activemq.pool.maxConnection=10 activemq.queue=mailqueue activemq.topic=mailtopic(2)pom.xml
(3)spring-jms.xmlorg.springframework spring-jms 4.3.7.RELEASE org.apache.activemq activemq-pool 5.15.3
ConnectionFactory 是用于產(chǎn)生到 JMS 服務(wù)器的鏈接的,Spring 為我們提供了多個(gè) ConnectionFactory,有 SingleConnectionFactory 和 CachingConnectionFactory。
SingleConnectionFactory :對(duì)于建立 JMS 服務(wù)器鏈接的請(qǐng)求會(huì)一直返回同一個(gè)鏈接,并且會(huì)忽略 Connection 的 close 方法調(diào)用。
CachingConnectionFactory :繼承了 SingleConnectionFactory,所以它擁有 SingleConnectionFactory 的所有功能,同時(shí)它還新增了緩存功能,它可以緩存 Session、MessageProducer 和 MessageConsumer。這里我們使用 CachingConnectionFactory 來(lái)作為示例。
(4)生產(chǎn)者import com.alibaba.fastjson.JSONObject; import com.github.binarylei.jms.spring.core.Mail; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.core.JmsTemplate; import org.springframework.stereotype.Service; /** * @author: leigang * @version: 2018-04-03 */ @Service("mqProducer") public class MQProducer { @Autowired private JmsTemplate jmsTemplate; public void sendMail(Mail mail) { jmsTemplate.send((session) -> { return session.createTextMessage(JSONObject.toJSONString(mail)); }); } }(5)消費(fèi)者
import com.alibaba.fastjson.JSONObject; import com.github.binarylei.jms.spring.core.Mail; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.core.JmsTemplate; import org.springframework.stereotype.Service; import javax.jms.Destination; /** * @author: leigang * @version: 2018-04-03 */ @Service("mqCustumer") public class MQCustumer { @Autowired private JmsTemplate jmsTemplate; @Autowired private Destination destination; public void sendMail() { String json = (String) jmsTemplate.receiveAndConvert(destination); Mail mail = (Mail) JSONObject.parseObject(json, Mail.class); System.out.println(mail); } }二、PTP 的異步調(diào)用
我們?cè)?spring 中直接配置異步接收消息的監(jiān)聽器,這樣就相當(dāng)于在 spring 中配置了消費(fèi)者,在接受消息的時(shí)候就不必要啟動(dòng)消費(fèi)者了。
spring-jms.xml:
生產(chǎn)者往指定目的地 Destination 發(fā)送消息后,接下來(lái)就是消費(fèi)者對(duì)指定目的地的消息進(jìn)行消費(fèi)了。那么消費(fèi)者是如何知道有生產(chǎn)者發(fā)送消息到指定目的地 Destination了呢?這是通過(guò) Spring 為我們封裝的消息監(jiān)聽容器 MessageListenerContainer 實(shí)現(xiàn)的,它負(fù)責(zé)接收信息,并把接收到的信息分發(fā)給真正的 MessageListener 進(jìn)行處理。
每個(gè)消費(fèi)者對(duì)應(yīng)每個(gè)目的地都需要有對(duì)應(yīng)的 MessageListenerContainer。對(duì)于消息監(jiān)聽容器而言,除了要知道監(jiān)聽哪個(gè)目的地之外,還需要知道到哪里去監(jiān)聽,也就是說(shuō)它還需要知道去監(jiān)聽哪個(gè) JMS 服務(wù)器,這是通過(guò)在配置 MessageConnectionFactory 的時(shí)候往里面注入一個(gè) ConnectionFactory 來(lái)實(shí)現(xiàn)的。所以我們?cè)谂渲靡粋€(gè) MessageListenerContainer 的時(shí)候有三個(gè)屬性必須指定,一個(gè)是表示從哪里監(jiān)聽的 ConnectionFactory;一個(gè)是表示監(jiān)聽什么的 Destination;一個(gè)是接收到消息以后進(jìn)行消息處理的 MessageListener。
Spring 一共為我們提供了兩種類型的 MessageListenerContainer:
SimpleMessageListenerContainer :SimpleMessageListenerContainer 會(huì)在一開始的時(shí)候就創(chuàng)建一個(gè)會(huì)話 session 和消費(fèi)者 Consumer,并且會(huì)使用標(biāo)準(zhǔn)的 JMS MessageConsumer.setMessageListener() 方法注冊(cè)監(jiān)聽器讓 JMS 提供者調(diào)用監(jiān)聽器的回調(diào)函數(shù)。它不會(huì)動(dòng)態(tài)的適應(yīng)運(yùn)行時(shí)需要和參與外部的事務(wù)管理。兼容性方面,它非常接近于獨(dú)立的 JMS 規(guī)范,但一般不兼容 Java EE 的 JMS 限制。
DefaultMessageListenerContainer :在大多數(shù)情況下我們還是使用的 DefaultMessageListenerContainer,跟SimpleMessageListenerContainer 相比,DefaultMessageListenerContainer 會(huì)動(dòng)態(tài)的適應(yīng)運(yùn)行時(shí)需要,并且能夠參與外部的事務(wù)管理。它很好的平衡了對(duì) JMS 提供者要求低、先進(jìn)功能如事務(wù)參與和兼容 Java EE 環(huán)境。
消息監(jiān)聽器:
import javax.jms.JMSException; import javax.jms.Message; import javax.jms.TextMessage; /** * @author: leigang * @version: 2018-04-03 */ public class MessageListener implements javax.jms.MessageListener { @Override public void onMessage(Message message) { TextMessage msg = (TextMessage) message; try { System.out.println("消息:" + msg.getText()); } catch (JMSException e) { e.printStackTrace(); } } }三、發(fā)布訂閱 同步接收
在 spring-jms.xml 中將 ActiveMQTopic 生成 Topic,其它沒什么變化:
每天用心記錄一點(diǎn)點(diǎn)。內(nèi)容也許不重要,但習(xí)慣很重要!
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://specialneedsforspecialkids.com/yun/68917.html
摘要:介紹它是出品,最流行的,能力強(qiáng)勁的開源消息總線。是一個(gè)完全支持和規(guī)范的實(shí)現(xiàn),盡管規(guī)范出臺(tái)已經(jīng)是很久的事情了,但是在當(dāng)今的應(yīng)用中間仍然扮演著特殊的地位。相關(guān)文章整合使用整合使用關(guān)注我轉(zhuǎn)載請(qǐng)務(wù)必注明原創(chuàng)地址為安裝同之前一樣,直接在里面玩吧。 showImg(https://segmentfault.com/img/remote/1460000012996066?w=1920&h=1281)...
摘要:學(xué)習(xí)消息隊(duì)列的使用之前,我們先來(lái)搞清。是操作消息的接口。消息生產(chǎn)者由創(chuàng)建,并用于將消息發(fā)送到。接收消息打印結(jié)果這是接收到的消息消費(fèi)者啟動(dòng)。。。。 通過(guò)上一篇文章 《消息隊(duì)列深入解析》,我們已經(jīng)消息隊(duì)列是什么、使用消息隊(duì)列的好處以及常見消息隊(duì)列的簡(jiǎn)單介紹。 這一篇文章,主要帶大家詳細(xì)了解一下消息隊(duì)列ActiveMQ的使用。 學(xué)習(xí)消息隊(duì)列ActiveMQ的使用之前,我們先來(lái)搞清JMS。 J...
摘要:最近在研究的消息確認(rèn)機(jī)制,在與整合時(shí)遇到的了一個(gè)問(wèn)題。這時(shí)只需要把的值設(shè)置成自定義的類型即可。 最近在研究activemq的ack消息確認(rèn)機(jī)制,在activemq與spring整合時(shí)遇到的了一個(gè)問(wèn)題。JMS規(guī)范的ack消息確認(rèn)機(jī)制有一下四種,定于在session對(duì)象中:AUTO_ACKNOWLEDGE = 1 :自動(dòng)確認(rèn)CLIENT_ACKNOWLEDGE = 2:客戶端手動(dòng)確認(rèn) DU...
摘要:安裝到官方網(wǎng)站下載最新的的安裝包,并解壓到本地目錄下,下載鏈接如下。修改消費(fèi)者使用配置消費(fèi)者監(jiān)聽的隊(duì)列,其中是接收到的消息收到的報(bào)文為接收到的消息重新執(zhí)行 安裝ActiveMQ 到Apache官方網(wǎng)站下載最新的ActiveMQ的安裝包,并解壓到本地目錄下,下載鏈接如下:http://activemq.apache.org/do...。showImg(https://segmentfau...
閱讀 1446·2021-09-10 11:27
閱讀 2401·2019-08-30 15:53
閱讀 1317·2019-08-30 13:10
閱讀 2969·2019-08-30 11:09
閱讀 1075·2019-08-29 17:23
閱讀 664·2019-08-29 17:05
閱讀 2943·2019-08-29 15:10
閱讀 2339·2019-08-29 13:22