摘要:學習消息隊列的使用之前,我們先來搞清。是操作消息的接口。消息生產者由創建,并用于將消息發送到。接收消息打印結果這是接收到的消息消費者啟動。。。。
通過上一篇文章 《消息隊列深入解析》,我們已經消息隊列是什么、使用消息隊列的好處以及常見消息隊列的簡單介紹。
這一篇文章,主要帶大家詳細了解一下消息隊列ActiveMQ的使用。
學習消息隊列ActiveMQ的使用之前,我們先來搞清JMS。
JMS 1. JMS基本概念JMS(JAVA Message Service,java消息服務)是java的消息服務,JMS的客戶端之間可以通過JMS服務進行異步的消息傳輸。JMS(JAVA Message Service,java消息服務)API是一個消息服務的標準或者說是規范,允許應用程序組件基于JavaEE平臺創建、發送、接收和讀取消息。它使分布式通信耦合度更低,消息服務更加可靠以及異步性。
2. JMS五種不同的消息正文格式JMS定義了五種不同的消息正文格式,以及調用的消息類型,允許你發送并接收以一些不同形式的數據,提供現有消息格式的一些級別的兼容性。
StreamMessage -- Java原始值的數據流
MapMessage--一套名稱-值對
TextMessage--一個字符串對象
ObjectMessage--一個序列化的 Java對象
BytesMessage--一個字節的數據流
3.JMS兩種消息模型 1 .點到點(P2P)模型
使用隊列(Queue)作為消息通信載體;滿足生產者與消費者模式,一條消息只能被一個消費者使用,未被消費的消息在隊列中保留直到被消費或超時。比如:我們生產者發送100條消息的話,兩個消費者來消費一般情況下兩個消費者會按照消息發送的順序各自消費一半(也就是你一個我一個的消費。)后面我們會通過代碼演示來驗證。
發布訂閱模型(Pub/Sub) 使用主題(Topic)作為消息通信載體,類似于廣播模式;發布者發布一條消息,該消息通過主題傳遞給所有的訂閱者,在一條消息廣播之后才訂閱的用戶則是收不到該條消息的。
ConnectionFactory:創建Connection對象的工廠,針對兩種不同的jms消息模型,分別有QueueConnectionFactory和TopicConnectionFactory兩種。可以通過JNDI來查找ConnectionFactory對象。
Connection:Connection表示在客戶端和JMS系統之間建立的鏈接(對TCP/IP socket的包裝)。Connection可以產生一個或多個Session。跟ConnectionFactory一樣,Connection也有兩種類型:QueueConnection和TopicConnection。
Session:Session是操作消息的接口。可以通過session創建生產者、消費者、消息等。Session提供了事務的功能。當需要使用session發送/接收多個消息時,可以將這些發送/接收動作放到一個事務中。同樣,也分QueueSession和TopicSession。
MessageProducer:消息生產者由Session創建,并用于將消息發送到Destination。同樣,消息生產者分兩種類型:QueueSender和TopicPublisher。可以調用消息生產者的方法(send或publish方法)發送消息。
MessageConsumer :消息消費者由Session創建,用于接收被發送到Destination的消息。兩種類型:QueueReceiver和TopicSubscriber。可分別通過session的createReceiver(Queue)或createSubscriber(Topic)來創建。當然,也可以session的creatDurableSubscriber方法來創建持久化的訂閱者。
Destination:Destination的意思是消息生產者的消息發送目標或者說消息消費者的消息來源。對于消息生產者來說,它的Destination是某個隊列(Queue)或某個主題(Topic);對于消息消費者來說,它的Destination也是某個隊列或主題(即消息來源)。
MessageListener: 消息監聽器。如果注冊了消息監聽器,一旦消息到達,將自動調用監聽器的onMessage方法。
參考:https://blog.csdn.net/shaobin...
消息隊列ActiveMQ 1.簡介ActiveMQ 是Apache出品,最流行的,能力強勁的開源消息總線。ActiveMQ 是一個完全支持JMS1.1和J2EE 1.4規范的 JMS Provider實現,盡管JMS規范出臺已經是很久的事情了,但是JMS在當今的J2EE應用中間仍然扮演著特殊的地位。2.簡單使用
安裝過程很簡單這里就不貼安裝過程了,可以自行google.
添加Maven依賴
2.1.測試點對點模型通信org.apache.activemq activemq-all 5.15.3
生產者發送消息測試方法:
@Test public void testQueueProducer() throws Exception { // 1、創建一個連接工廠對象,需要指定服務的ip及端口。 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.155:61616"); // 2、使用工廠對象創建一個Connection對象。 Connection connection = connectionFactory.createConnection(); // 3、開啟連接,調用Connection對象的start方法。 connection.start(); // 4、創建一個Session對象。 // 第一個參數:是否開啟事務。如果true開啟事務,第二個參數無意義。一般不開啟事務false。 // 第二個參數:應答模式。自動應答或者手動應答。一般自動應答。 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 5、使用Session對象創建一個Destination對象。兩種形式queue、topic,現在應該使用queue Queue queue = session.createQueue("test-queue"); // 6、使用Session對象創建一個Producer對象。 MessageProducer producer = session.createProducer(queue); // 7、創建一個Message對象,可以使用TextMessage。 for (int i = 0; i < 50; i++) { TextMessage textMessage = session.createTextMessage("第"+i+ "一個ActiveMQ隊列目的地的消息"); // 8、發送消息 producer.send(textMessage); } // 9、關閉資源 producer.close(); session.close(); connection.close(); }
消費者消費消息測試方法
@Test public void testQueueConsumer() throws Exception { // 創建一個ConnectionFactory對象連接MQ服務器 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.155:61616"); // 創建一個連接對象 Connection connection = connectionFactory.createConnection(); // 開啟連接 connection.start(); // 使用Connection對象創建一個Session對象 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 創建一個Destination對象。queue對象 Queue queue = session.createQueue("test-queue"); // 使用Session對象創建一個消費者對象。 MessageConsumer consumer = session.createConsumer(queue); // 接收消息 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { // 打印結果 TextMessage textMessage = (TextMessage) message; String text; try { text = textMessage.getText(); System.out.println("這是接收到的消息:" + text); } catch (JMSException e) { e.printStackTrace(); } } }); // 等待接收消息 System.in.read(); // 關閉資源 consumer.close(); session.close(); connection.close(); }
我們開啟兩個消費者進程來監聽(運行兩次testQueueConsumer()方法)。
然后我們運行運行生產者測試方法發送消息.先發送消息還是先監聽消息一般不會不影響。
效果如下:
兩個消費者各自消費一半消息,而且還是按照消息發送到消息隊列的順序,這也驗證了我們上面的說法。
第一個消費者
第二個消費者
生產者發送消息測試方法:
@Test public void testTopicProducer() throws Exception { // 1、創建一個連接工廠對象,需要指定服務的ip及端口。 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.155:61616"); // 2、使用工廠對象創建一個Connection對象。 Connection connection = connectionFactory.createConnection(); // 3、開啟連接,調用Connection對象的start方法。 connection.start(); // 4、創建一個Session對象。 // 第一個參數:是否開啟事務。如果true開啟事務,第二個參數無意義。一般不開啟事務false。 // 第二個參數:應答模式。自動應答或者手動應答。一般自動應答。 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 5、使用Session對象創建一個Destination對象。兩種形式queue、topic,現在應該使用topic Topic topic = session.createTopic("test-topic"); // 6、使用Session對象創建一個Producer對象。 MessageProducer producer = session.createProducer(topic); // 7、創建一個Message對象,可以使用TextMessage。 for (int i = 0; i < 50; i++) { TextMessage textMessage = session.createTextMessage("第"+i+ "一個ActiveMQ隊列目的地的消息"); // 8、發送消息 producer.send(textMessage); } // 9、關閉資源 producer.close(); session.close(); connection.close(); }
消費者消費消息測試方法:
@Test public void testTopicConsumer() throws Exception { // 創建一個ConnectionFactory對象連接MQ服務器 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.155:61616"); // 創建一個連接對象 Connection connection = connectionFactory.createConnection(); // 開啟連接 connection.start(); // 使用Connection對象創建一個Session對象 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 創建一個Destination對象。topic對象 Topic topic = session.createTopic("test-topic"); // 使用Session對象創建一個消費者對象。 MessageConsumer consumer = session.createConsumer(topic); // 接收消息 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { // 打印結果 TextMessage textMessage = (TextMessage) message; String text; try { text = textMessage.getText(); System.out.println("這是接收到的消息:" + text); } catch (JMSException e) { e.printStackTrace(); } } }); System.out.println("topic消費者啟動。。。。"); // 等待接收消息 System.in.read(); // 關閉資源 consumer.close(); session.close(); connection.close(); }
先運行兩個消費者進程(提前訂閱,不然收不到發送的消息),然后運行生產者測試方法發送消息。
結果是:
兩個消費者進程都可以接收到生產者發送過來的所有消息,我這里就不貼圖片了,
這樣驗證了我們上面的說法。
我們從上面代碼就可以看出,點對點通信和發布訂閱通信模式的區別就是創建生產者和消費者對象時提供的Destination對象不同,如果是點對點通信創建的Destination對象是Queue,發布訂閱通信模式通信則是Topic。
3.整合Spring使用整合spring除了我們上面依賴的Jar包還要依賴
org.springframework spring-jms 4.2.7.RELEASE org.springframework spring-context-support 4.2.7.RELEASE
比如我們在我們的系統中現在有兩個服務,第一個服務發送消息,第二個服務接收消息,我們下面看看這是如何實現的。
發送消息發送消息的配置文件:
spring-queue
發送消息的測試方法:
@Test public void testSpringActiveMq() throws Exception { //初始化spring容器 ApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:spring/applicationContext-activemq.xml"); //從spring容器中獲得JmsTemplate對象 JmsTemplate jmsTemplate = applicationContext.getBean(JmsTemplate.class); //從spring容器中取Destination對象 Destination destination = (Destination) applicationContext.getBean("queueDestination"); //使用JmsTemplate對象發送消息。 jmsTemplate.send(destination, new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { //創建一個消息對象并返回 TextMessage textMessage = session.createTextMessage("spring activemq queue message"); return textMessage; } }); }
我們上面直接ApplicationContext的getBean方法獲取的對象,實際在項目使用依賴注入即可。
接收消息創建一個MessageListener的實現類。
public class MyMessageListener implements MessageListener { @Override public void onMessage(Message message) { try { TextMessage textMessage = (TextMessage) message; //取消息內容 String text = textMessage.getText(); System.out.println(text); } catch (JMSException e) { e.printStackTrace(); } } }
接收消息的配置文件:
spring-queue
測試接收消息的代碼
@Test public void testQueueConsumer() throws Exception { //初始化spring容器 ApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:spring/applicationContext-activemq.xml"); //等待 System.in.read(); }
歡迎關注我的微信公眾號:"Java面試通關手冊"(一個有溫度的微信公眾號,期待與你共同進步~~~堅持原創,分享美文,分享各種Java學習資源):。
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/69198.html
摘要:具體可以參考消息隊列之具體可以參考實戰之快速入門十分鐘入門阿里中間件團隊博客是一個分布式的可分區的可復制的基于發布訂閱的消息系統主要用于大數據領域當然在分布式系統中也有應用。目前市面上流行的消息隊列就是阿里借鑒的原理用開發而得。 我自己總結的Java學習的系統知識點以及面試問題,目前已經開源,會一直完善下去,歡迎建議和指導歡迎Star: https://github.com/Snail...
摘要:我在前面的文章中也提到了應該怎么做自我介紹與項目介紹,詳情可以查看這篇文章備戰春招秋招系列初出茅廬的程序員該如何準備面試。因此基于事件消息對象驅動的業務架構可以是一系列流程。 showImg(https://user-gold-cdn.xitu.io/2018/11/14/16711ac29c2ae52c?w=928&h=531&f=png&s=798562); 一 消息隊列MQ的...
摘要:時間年月日星期六說明本文部分內容均來自慕課網。這個時候,可以啟動多臺積分系統,來同時消費這個消息中間件里面的登錄消息,達到橫向擴展的作用。 時間:2017年07月22日星期六說明:本文部分內容均來自慕課網。@慕課網:http://www.imooc.com教學源碼:無學習源碼:https://github.com/zccodere/s... 第一章:課程介紹 1-1 課程安排 Java...
摘要:嵌入在一些項目中,單獨開啟一個,對于項目實施來說有時略顯繁瑣。待啟動后,選擇所在的進程。連接后選擇頁簽紅框的地方分別為已消費和已進入中的消息的條數。 ActiveMQ 嵌入Tomcat 在一些項目中,單獨開啟一個ActiveMQ,對于項目實施來說有時略顯繁瑣。所以我們將ActiveMQ內嵌到Tomcat,Tomcat啟動同時就順帶啟動了ActiveMQ。由此我們需要掌握三個個重要的知識...
閱讀 3723·2021-11-24 09:39
閱讀 1870·2021-11-16 11:45
閱讀 616·2021-11-16 11:45
閱讀 1028·2021-10-11 10:58
閱讀 2475·2021-09-09 11:51
閱讀 1941·2019-08-30 15:54
閱讀 687·2019-08-29 13:13
閱讀 3466·2019-08-26 12:18