摘要:嵌入在一些項(xiàng)目中,多帶帶開啟一個,對于項(xiàng)目實(shí)施來說有時略顯繁瑣。待啟動后,選擇所在的進(jìn)程。連接后選擇頁簽紅框的地方分別為已消費(fèi)和已進(jìn)入中的消息的條數(shù)。
ActiveMQ 嵌入Tomcat
在一些項(xiàng)目中,多帶帶開啟一個ActiveMQ,對于項(xiàng)目實(shí)施來說有時略顯繁瑣。所以我們將ActiveMQ內(nèi)嵌到Tomcat,Tomcat啟動同時就順帶啟動了ActiveMQ。由此我們需要掌握三個個重要的知識點(diǎn)
ActiveMQ中的BrokerService
自啟動Servlet配置
使用jconsole了解嵌入ActiveMQ運(yùn)行狀態(tài)
一、開啟BrokerService在pom.xml添加ActiveMQ依賴,本次代碼實(shí)例采用5.7版本,記住只需要activemq-core就行。
org.apache.activemq activemq-core 5.7.0
在編寫BrokerService代碼部分,主要注意三個點(diǎn)
是否需要在jconsole中顯示監(jiān)控信息 broker.setUseJmx(true)
設(shè)置連接用戶名和密碼,如何使用驗(yàn)證插件
是否持久化,存儲位置設(shè)置,持久化配置
所以需要啟動一個連接地址 tcp://localhost:61616,用戶名為admin,密碼為admin,需要持久化,持久化數(shù)據(jù)文件存儲地址為 /activemq ,需要啟動jconsole監(jiān)控的BrokerService的代碼如下:
// author:herbert qq:464884492 BrokerService broker = new BrokerService(); broker.setUseJmx(true); // 開啟監(jiān)控 broker.setPersistent(true); // 持久化 broker.setBrokerName("Test"); SimpleAuthenticationPlugin sap = new SimpleAuthenticationPlugin(); AuthenticationUser au = new AuthenticationUser("admin", "admin","users"); ArrayList二、生產(chǎn)者和消費(fèi)者d = new ArrayList (); d.add(au); sap.setUsers(d); // 用戶驗(yàn)證 broker.setPlugins(new BrokerPlugin[] { sap }); String mqDataPath = "/activemq"; // 存儲位置 broker.getPersistenceAdapter().setDirectory(new File(mqDataPath)); broker.addConnector("tcp://localhost:61616"); // 連接地址 broker.start();
ActiveMQ中,通用的消息傳遞方式有兩種
隊(duì)列,支持消息持久化,未消費(fèi)的消息,在重啟后依然存在。若有多個消費(fèi)者,在每次提取一條消息的前提下,所有消費(fèi)均分隊(duì)列中的消息
主題,不支持消息持久化,未消費(fèi)的消息,在重啟后消息丟失。若有多個消費(fèi),每個消費(fèi)者依次消費(fèi)主題中所有消息
不管是生產(chǎn)者還是消費(fèi)者代碼編寫,主要是4個步驟
建立連接,采用failover:()方式,自動斷線重連
建立Session,獲取發(fā)送或接收目標(biāo)Destination ,指定是隊(duì)列(session.createQueue(queueName)),還是主題(session.createTopic(topicName))
通過Session獲取生產(chǎn)者或消費(fèi)者
生產(chǎn)或消費(fèi)消息
我們現(xiàn)在編寫一個生產(chǎn)者的代碼,并循環(huán)產(chǎn)生10條消息
// author:herbert qq:464884492 String mqConnUrl = "tcp://localhost:61616"; String connUrl = "failover:(" + mqConnUrl.trim()+ ")?initialReconnectDelay=1000&maxReconnectDelay=30000"; ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin","admin", connUrl); javax.jms.Connection connection = connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue("system"); MessageProducer messageProducer = session.createProducer(destination); for (int i = 0; i < 10; i++) { javax.jms.TextMessage message = session.createTextMessage("ActiveMQ 發(fā)送的消息" + i); System.out.println("發(fā)送消息:" + "ActiveMQ 發(fā)送的消息" + i); messageProducer.send(message); }
編寫一個消費(fèi),消費(fèi)上邊的10條消息
// author:herbert qq:464884492 String mqConnUrl = "tcp://localhost:61616"; String connUrl = "failover:(" + mqConnUrl.trim()+ ")?initialReconnectDelay=1000&maxReconnectDelay=30000"; ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin", "admin", connUrl); javax.jms.Connection connection = connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue("system"); MessageConsumer messageConsumer = session.createConsumer(destination); messageConsumer.setMessageListener(new MessageListener() { @Override public void onMessage(javax.jms.Message message) { ActiveMQTextMessage m = (ActiveMQTextMessage) message; try { System.out.println("接收到:" + m.getText()); } catch (JMSException e) { e.printStackTrace(); } } });
運(yùn)行效果
可見,我們生產(chǎn)者,產(chǎn)生的10條消息,已成功被消費(fèi)者處理了。
三、監(jiān)控嵌入的ActiveMQ對于嵌入的ActiveMQ,在BrokerService啟動前需要設(shè)置 broker.setUseJmx(true);然后找到你的JAVA_HOME,切換到bin,輸入jconsole命令。
待jconsole啟動后,選擇ActiveMQ所在的進(jìn)程。連接后選擇Mbean頁簽
紅框的地方分別為已消費(fèi)和已進(jìn)入MQ中的消息的條數(shù)。選擇操作,找到那個SendTextMessage還可以想此隊(duì)列發(fā)送消息。
四、Selvelt跟隨Tomcat啟動對于Tomcat7.x版本之后Tomcat,Selvelt都可以通過直接在代碼中通過注解的方式配置URl連接,一起是否自啟動loadOnStartup 這個值>=0表示需要自啟動,值越小優(yōu)先級越高
// author:herbert qq:464884492 @WebServlet(urlPatterns = "/initmq", loadOnStartup = 1) public class InitMqServlet extends HttpServlet { @Override public void init(ServletConfig config) throws ServletException { super.init(config); // 這里編寫啟動ActiveMQ代碼 } }五、總結(jié)
這次以ActiveMQ作為消息隊(duì)列使用切入點(diǎn),總體上說還比較順利。其中唯一出現(xiàn)問題的地方就是對于activeMQ依賴過多,多依賴了jaractivemq-broker,導(dǎo)致消息能連接,但不能發(fā)送消息。后邊直接換成 activemq-all,有出現(xiàn)slf4j日志沖突,使用exclusions依然不能解決問題。最終只依賴 activemq-core,完美解決所有問題。
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://specialneedsforspecialkids.com/yun/76600.html
摘要:第步配置環(huán)境變量在中使用即可,不用寫第步輸入進(jìn)入中,輸入檢驗(yàn)是否安裝成功。安裝服務(wù)啟動服務(wù)停止服務(wù)切換到目錄下運(yùn)行參考安裝解壓,配置環(huán)境變量,下開啟和關(guān)閉窗口方式啟動安裝服務(wù)啟動協(xié)助服務(wù)命令校驗(yàn)是否啟動輸入?yún)⒖? 1.安裝jdk 第1步:在官網(wǎng)下載和自己電腦位數(shù)相同的安裝包 第2步:選擇安裝路徑,默認(rèn)在C盤,建議自定義選擇其他盤符,避免c盤重裝系統(tǒng)后,需要重新安裝。 第3步:安裝過程中...
摘要:構(gòu)建系統(tǒng)強(qiáng)烈建議你選擇一個支持依賴管理的構(gòu)建系統(tǒng),并且可以使用發(fā)布到中心存儲庫的工件。例如,要升級到另一個,你可以將以下元素添加到你的在前面的示例中,我們指定了一個,但是任何依賴類型都可以以相同的方式被覆蓋。 13. 構(gòu)建系統(tǒng) 強(qiáng)烈建議你選擇一個支持依賴管理的構(gòu)建系統(tǒng),并且可以使用發(fā)布到Maven中心存儲庫的工件。我們建議你選擇Maven或Gradle,可以讓Spring Boot與其...
摘要:不同的應(yīng)用場景,不同的架構(gòu),不同的需求,都會對優(yōu)化設(shè)置有不同要求。在這里我所記述的只是我自己在一些應(yīng)用中所設(shè)置的優(yōu)化項(xiàng),以備不時之需,并不是放之四海而皆準(zhǔn)的準(zhǔn)則。為了消除這些警告,還需要根治。 一千個人眼中就有一千個哈姆雷特。——偽西方諺語 關(guān)于Tomcat的優(yōu)化點(diǎn)之多,我估計沒有上萬,也有成千。不同的應(yīng)用場景,不同的架構(gòu),不同的需求,都會對優(yōu)化設(shè)置有不同要求。在這里我所記述的只是我...
摘要:還自動配置發(fā)送和接收消息所需的基礎(chǔ)設(shè)施。支持是一個輕量級的可靠的可伸縮的可移植的消息代理,基于協(xié)議,使用通過協(xié)議進(jìn)行通信。 32. 消息傳遞 Spring框架為與消息傳遞系統(tǒng)集成提供了廣泛的支持,從使用JmsTemplate簡化的JMS API到使用完整的基礎(chǔ)設(shè)施異步接收消息,Spring AMQP為高級消息隊(duì)列協(xié)議提供了類似的特性集。Spring Boot還為RabbitTempla...
閱讀 712·2021-10-14 09:42
閱讀 1966·2021-09-22 15:04
閱讀 1571·2019-08-30 12:44
閱讀 2134·2019-08-29 13:29
閱讀 2730·2019-08-29 12:51
閱讀 542·2019-08-26 18:18
閱讀 698·2019-08-26 13:43
閱讀 2803·2019-08-26 13:38