国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專欄INFORMATION COLUMN

慕課網_《Java消息中間件》學習總結

twohappy / 3052人閱讀

摘要:時間年月日星期六說明本文部分內容均來自慕課網。這個時候,可以啟動多臺積分系統,來同時消費這個消息中間件里面的登錄消息,達到橫向擴展的作用。

時間:2017年07月22日星期六
說明:本文部分內容均來自慕課網。@慕課網:http://www.imooc.com
教學源碼:無
學習源碼:https://github.com/zccodere/s...

第一章:課程介紹 1-1 課程安排

Java消息中間件(入門篇)

</>復制代碼

  1. 為什么需要使用消息中間件
  2. 消息中間件概述
  3. JMS規范
  4. JMS代碼演練

Java消息中間件(拓展篇)

</>復制代碼

  1. ActiveMQ集群配置
  2. 消息中間件在大型系統中的最佳實踐
  3. 使用其它消息中間件
1-2 使用消息中間件原因

通過服務調用讓其它系統感知事件發生

</>復制代碼

  1. 系統之間高耦合
  2. 程序執行效率低

通過消息中間件解耦服務調用

生活中的案例

</>復制代碼

  1. 微信公眾號
  2. 老師在黑板上寫字
  3. 電視機
  4. 等等

消息中間件帶來的好處

</>復制代碼

  1. 解耦:系統解耦
  2. 異步:異步執行
  3. 橫向擴展
  4. 安全可靠
  5. 順序保證

橫向擴展解釋

</>復制代碼

  1. 當登錄系統,需要很多用戶登錄。這些消息全部需要告知積分系統,去增加積分,而增加積分這個處理過程可能比較麻煩、比較耗時。這個時候,可以啟動多臺積分系統,來同時消費這個消息中間件里面的登錄消息,達到橫向擴展的作用。
第二章:概述 2-1 消息中間件概述

什么是中間件

</>復制代碼

  1. 非底層操作系統軟件,非業務應用軟件,不是直接給最終用戶使用的,不能直接給客戶帶來價值的軟件統稱為中間件

什么是消息中間件

</>復制代碼

  1. 關注于數據的發送和接受,利用高效可靠的異步消息傳遞機制集成分布式系統

示意圖

什么是JMS

</>復制代碼

  1. Java消息服務(Java Message Service)應用程序接口,是一個Java平臺中關于面向消息中間件的API,用于在兩個應用程序之間,或分布式系統中發送消息,進行異步通信。Java消息服務是一個與具體平臺無關的API,絕大多數MOM提供商都對JMS提供支持。

什么是AMQP

</>復制代碼

  1. AMQP(Advanced Message Queuing Protocol)是一個提供統一消息服務的應用層標準高級消息隊列協議,基于此協議的客戶端與消息中間件可傳遞消息,并不受客戶端/中間件不同產品,不同的開發語言等條件的限制。

JMS和AMQP對比

ActiveMQ

</>復制代碼

  1. ActiveMQApache出品,最流行的,能力強勁的開源消息總線。ActiveMQ是一個完全支持JMS1.1J2EE1.4規范的JMS Provider實現,盡管JMS規范出臺已經是很久的事情了,但是JMS在當今J2EE應用中間件仍然扮演者特殊的地位。

ActiveMQ特性

</>復制代碼

  1. 多種語言和協議編寫客戶端。
  2. 語言:Java、C、C++、C#、Ruby、Perl、Python、PHP
  3. 應用協議:OpenWire、Stomp、REST、WS Notification、XMPP、AMQP
  4. 完全支持JMS1.1J2EE1.4規范(持久化、XA消息、事務)
  5. 虛擬主題、組合目的、鏡像隊列

RabbitMQ

</>復制代碼

  1. RabbitMQ是一個開源的AMQP實現,服務器端用Erlang語言編寫。用于在分布式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗。

RabbitMQ特性

</>復制代碼

  1. 支持多種客戶端
  2. 如:PythonRuby.NETJavaJMSCPHPActionScript
  3. AMQP的完整實現(vhost、Exchange、Binding、Routing Key等)
  4. 事務支持/發布確認
  5. 消息持久化

Kafka

</>復制代碼

  1. Kafka是一種高吞吐量的分布式發布訂閱消息系統,是一個分布式、分區的、可高的分布式日志存儲服務。它通過一種獨一無二的設計提供了一個消息系統的功能。

Kafka特性

</>復制代碼

  1. 通過O(1)的磁盤數據結構提供消息的持久化,
  2. 這種結構對于即使數以TB的消息存儲也能夠保持長時間的穩定性能
  3. 高吞吐量:即使是非常普通的硬件Kafka也可以支持每秒數百萬的消息
  4. Partition、Consumer Group

綜合評價

第三章:JMS規范 3-1 JMS規范

Java消息服務定義

</>復制代碼

  1. Java消息服務(Java Message Service)應用程序接口,是一個Java平臺中關于面向消息中間件的API,用于在兩個應用程序之間,或分布式系統中發送消息,進行異步通信。

JMS相關概念

</>復制代碼

  1. 提供者:實現JMS規范的消息中間件服務器
  2. 客戶端:發送或接收消息的應用程序
  3. 生產者/發布者:創建并發送消息的客戶端
  4. 消費者/訂閱者:接收并處理消息的客戶端
  5. 消息:應用程序之間傳遞的數據內容
  6. 消息模式:在客戶端之間傳遞消息的模式,JMS中定義了主題和隊列兩種模式

JMS消息模式:隊列模式

</>復制代碼

  1. 客戶端包括生產者和消費者
  2. 隊列中的消息只能被一個消費者消費
  3. 消費者可以隨時消費隊列中的消息

隊列模型示意圖

JMS消息模式:主題模型

</>復制代碼

  1. 客戶端包括發布者和訂閱者
  2. 主題中的消息被所有訂閱者消費
  3. 消費者不能消費訂閱之前就發送到主題中的消息

主題模型示意圖

JMS編碼接口

</>復制代碼

  1. ConnectionFactory:用于創建連接到消息中間件的連接工廠
  2. Connection:代表了應用程序和消息服務器之間的通信鏈路
  3. Destination:指消息發布和接收的地點,包括隊列和主題
  4. Session:表示一個單線程的上下文,用于發送和接收消息
  5. MessageConsumer:由會話創建,用戶接收發送到目標的消息
  6. MessageProducer:由會話創建,用于發送消息到目標
  7. Message:是在消費者和生產者之間傳送的對象,消息頭,一組消息屬性,一個消息體

JMS編碼接口之間的關系

第四章:使用ActiveMQ 4-1 Windows安裝ActiveMQ

在Windows安裝ActiveMQ

</>復制代碼

  1. 下載安裝包
  2. 直接啟動
  3. 使用服務啟動

安裝驗證

</>復制代碼

  1. 訪問地址:http://127.0.0.1:8161/
  2. 默認用戶:admin
  3. 默認密碼:admin
4-2 Linux安裝ActiveMQ

在Linux安裝ActiveMQ

</>復制代碼

  1. 下載并解壓安裝包
  2. 啟動

啟動驗證

</>復制代碼

  1. 進入到bin目錄,使用命令./activemq start啟動服務
  2. 使用命令ps -ef |grep activemq查看進程是否存在
  3. 使用命令./activemq stop關閉服務

安裝驗證

</>復制代碼

  1. 訪問地址:http://Linux主機IP:8161/
  2. 默認用戶:admin
  3. 默認密碼:admin
4-3 隊列模式的消息演示

使用JMS接口規范連接ActiveMQ

</>復制代碼

  1. 創建生產者
  2. 創建消費者
  3. 創建發布者
  4. 創建訂閱者

回顧JMS編碼接口之間的關系

代碼演示

1.編寫AppProducer類

</>復制代碼

  1. package com.myimooc.jms.queue;
  2. import javax.jms.Connection;
  3. import javax.jms.ConnectionFactory;
  4. import javax.jms.Destination;
  5. import javax.jms.JMSException;
  6. import javax.jms.MessageProducer;
  7. import javax.jms.Session;
  8. import javax.jms.TextMessage;
  9. import org.apache.activemq.ActiveMQConnectionFactory;
  10. /**
  11. * App 生產者-隊列模式
  12. * @author ZhangCheng on 2017-07-22
  13. *
  14. */
  15. public class AppProducer {
  16. /** 指定ActiveMQ服務的地址 */
  17. private static final String URL = "tcp://127.0.0.1:61616";
  18. /** 指定隊列的名稱 */
  19. private static final String QUEUE_NAME = "queue-test";
  20. public static void main(String[] args) throws JMSException {
  21. // 1.創建ConnectionFactory
  22. ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);
  23. // 2.創建Connection
  24. Connection connection = connectionFactory.createConnection();
  25. // 3.啟動連接
  26. connection.start();
  27. // 4.創建會話(第一個參數:是否在事務中處理)
  28. Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  29. // 5. 創建一個目標
  30. Destination destination = session.createQueue(QUEUE_NAME);
  31. // 6.創建一個生產者
  32. MessageProducer producer = session.createProducer(destination);
  33. for (int i = 0; i < 100; i++) {
  34. // 7.創建消息
  35. TextMessage textMessage = session.createTextMessage("test" + i);
  36. // 8.發布消息
  37. producer.send(textMessage);
  38. System.out.println("消息發送:" + textMessage.getText());
  39. }
  40. // 9.關閉連接
  41. connection.close();
  42. }
  43. }

2.編寫AppConsumer類

</>復制代碼

  1. package com.myimooc.jms.queue;
  2. import javax.jms.Connection;
  3. import javax.jms.ConnectionFactory;
  4. import javax.jms.Destination;
  5. import javax.jms.JMSException;
  6. import javax.jms.Message;
  7. import javax.jms.MessageConsumer;
  8. import javax.jms.MessageListener;
  9. import javax.jms.Session;
  10. import javax.jms.TextMessage;
  11. import org.apache.activemq.ActiveMQConnectionFactory;
  12. /**
  13. * App 消費者-隊列模式
  14. * @author ZhangCheng on 2017-07-22
  15. *
  16. */
  17. public class AppConsumer {
  18. /** 指定ActiveMQ服務的地址 */
  19. private static final String URL = "tcp://127.0.0.1:61616";
  20. /** 指定隊列的名稱 */
  21. private static final String QUEUE_NAME = "queue-test";
  22. public static void main(String[] args) throws JMSException {
  23. // 1.創建ConnectionFactory
  24. ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);
  25. // 2.創建Connection
  26. Connection connection = connectionFactory.createConnection();
  27. // 3.啟動連接
  28. connection.start();
  29. // 4.創建會話(第一個參數:是否在事務中處理)
  30. Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  31. // 5.創建一個目標
  32. Destination destination = session.createQueue(QUEUE_NAME);
  33. // 6.創建一個消費者
  34. MessageConsumer consumer = session.createConsumer(destination);
  35. // 7.創建一個監聽器
  36. consumer.setMessageListener(new MessageListener() {
  37. public void onMessage(Message message) {
  38. TextMessage textMessage = (TextMessage)message;
  39. try {
  40. System.out.println("接收消息:" + textMessage.getText());
  41. } catch (JMSException e) {
  42. System.out.println("接收消息異常:");
  43. e.printStackTrace();
  44. }
  45. }
  46. });
  47. // 8.關閉連接
  48. //connection.close();
  49. }
  50. }
4-4 主題模式的消息演示

代碼演示

1.編寫AppProducer類

</>復制代碼

  1. package com.myimooc.jms.topic;
  2. import javax.jms.Connection;
  3. import javax.jms.ConnectionFactory;
  4. import javax.jms.Destination;
  5. import javax.jms.JMSException;
  6. import javax.jms.MessageProducer;
  7. import javax.jms.Session;
  8. import javax.jms.TextMessage;
  9. import org.apache.activemq.ActiveMQConnectionFactory;
  10. /**
  11. * App 生產者-主題模式
  12. * @author ZhangCheng on 2017-07-22
  13. *
  14. */
  15. public class AppProducer {
  16. /** 指定ActiveMQ服務的地址 */
  17. private static final String URL = "tcp://127.0.0.1:61616";
  18. /** 指定主題的名稱 */
  19. private static final String TOPIC_NAME = "topic-test";
  20. public static void main(String[] args) throws JMSException {
  21. // 1.創建ConnectionFactory
  22. ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);
  23. // 2.創建Connection
  24. Connection connection = connectionFactory.createConnection();
  25. // 3.啟動連接
  26. connection.start();
  27. // 4.創建會話(第一個參數:是否在事務中處理)
  28. Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  29. // 5. 創建一個目標
  30. Destination destination = session.createTopic(TOPIC_NAME);
  31. // 6.創建一個生產者
  32. MessageProducer producer = session.createProducer(destination);
  33. for (int i = 0; i < 100; i++) {
  34. // 7.創建消息
  35. TextMessage textMessage = session.createTextMessage("test" + i);
  36. // 8.發布消息
  37. producer.send(textMessage);
  38. System.out.println("消息發送:" + textMessage.getText());
  39. }
  40. // 9.關閉連接
  41. connection.close();
  42. }
  43. }

2.編寫AppConsumer類

</>復制代碼

  1. package com.myimooc.jms.topic;
  2. import javax.jms.Connection;
  3. import javax.jms.ConnectionFactory;
  4. import javax.jms.Destination;
  5. import javax.jms.JMSException;
  6. import javax.jms.Message;
  7. import javax.jms.MessageConsumer;
  8. import javax.jms.MessageListener;
  9. import javax.jms.Session;
  10. import javax.jms.TextMessage;
  11. import org.apache.activemq.ActiveMQConnectionFactory;
  12. /**
  13. * App 消費者-主題模式
  14. * @author ZhangCheng on 2017-07-22
  15. *
  16. */
  17. public class AppConsumer {
  18. /** 指定ActiveMQ服務的地址 */
  19. private static final String URL = "tcp://127.0.0.1:61616";
  20. /** 指定主題的名稱 */
  21. private static final String TOPIC_NAME = "topic-test";
  22. public static void main(String[] args) throws JMSException {
  23. // 1.創建ConnectionFactory
  24. ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);
  25. // 2.創建Connection
  26. Connection connection = connectionFactory.createConnection();
  27. // 3.啟動連接
  28. connection.start();
  29. // 4.創建會話(第一個參數:是否在事務中處理)
  30. Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  31. // 5.創建一個目標
  32. Destination destination = session.createTopic(TOPIC_NAME);
  33. // 6.創建一個消費者
  34. MessageConsumer consumer = session.createConsumer(destination);
  35. // 7.創建一個監聽器
  36. consumer.setMessageListener(new MessageListener() {
  37. public void onMessage(Message message) {
  38. TextMessage textMessage = (TextMessage)message;
  39. try {
  40. System.out.println("接收消息:" + textMessage.getText());
  41. } catch (JMSException e) {
  42. System.out.println("接收消息異常:");
  43. e.printStackTrace();
  44. }
  45. }
  46. });
  47. // 8.關閉連接
  48. //connection.close();
  49. }
  50. }
4-5 Spring jms理論

使用Spring集成JMS連接ActiveMQ

</>復制代碼

  1. ConnectionFactory:用于管理連接的連接工廠
  2. JmsTemplate:用于發送和接收消息的模版類
  3. MessageListener:消息監聽器

ConnectionFactory

</>復制代碼

  1. 一個Spring為我們提供的連接池
  2. JmsTemplate每次發消息都會重新創建連接,會話和productor
  3. Spring中提供了SingleConnectFactory和CachingConnectionFactory

JmsTemplate

</>復制代碼

  1. 是Spring提供的,只需向Spring容器內注冊這個類就可以使用JmsTemplate方便的操作jms
  2. JmsTemplate類是線程安全的,可以在整個應用范圍使用

MessageListener

</>復制代碼

  1. 實現一個onMessage方法,該方法只接收一個Message參數
4-6 Spring jms演示

代碼演示

1.創建名為jmsspring的maven項目POM文件如下

</>復制代碼

  1. 4.0.0
  2. com.myimooc
  3. jmsspring
  4. 0.0.1-SNAPSHOT
  5. jar
  6. jmsspring
  7. http://maven.apache.org
  8. org.springframework.boot
  9. spring-boot-starter-parent
  10. 1.5.1.RELEASE
  11. UTF-8
  12. UTF-8
  13. org.springframework.boot
  14. spring-boot-starter
  15. org.springframework.boot
  16. spring-boot-starter-web
  17. org.springframework
  18. spring-jms
  19. org.apache.activemq
  20. activemq-all
  21. 5.9.0
  22. org.apache.maven.plugins
  23. maven-compiler-plugin
  24. 1.8
  25. 1.8

2.完成后的目錄結構如下

源碼請到我的github地址查看

3.測試

使用Postman向ProducerController發起請求,將消息發送出去

對應的ConsumerTopicMessageListener 和 ConsumerMessageListener接收到消息

第五章:大型系統最佳實踐 5-1 ActiceMQ集群

為什么要對消息中間件集群

</>復制代碼

  1. 實現高可用,以排除單點故障引起的服務中斷
  2. 實現負載均衡,以提升效率為更多客戶提供服務

集群方式

</>復制代碼

  1. 客戶端集群:讓多個消費者消費同一個隊列
  2. Broker cluster:多個Broker之間同步消息
  3. Master Slave:實現高可用

ActiveMQ失效轉移(failover)-客戶端配置

</>復制代碼

  1. 允許當其中一臺消息服務器宕機時,客戶端在傳輸層上重新連接到其它消息服務器
  2. 語法:failover:(uri1,…,uriN)?transportOptions

transportOptions參數說明

</>復制代碼

  1. randomize默認為true,表示在URI列表中選擇URI連接時是否采用隨機策略
  2. initialReconnectDelay默認為10,單位毫秒,表示第一次嘗試重連之間等待的時間
  3. maxReconnectDelay默認為30000,單位毫秒,最長重連的時間間隔

Broker cluster集群配置-原理

NetworkConnector(網絡連接器)

</>復制代碼

  1. 網絡連接器主要用于配置ActiveMQ服務器與服務器之間的網絡通訊方式,用于服務器透傳消息
  2. 網絡連接器分為靜態連接器和動態連接器

靜態連接器

動態連接器

5-2 ActiveMQ集群理論

ActiveMQ Master Slace集群方案

</>復制代碼

  1. Share nothing storage master/slave(已過時,5.8+后移除)
  2. Shared storage master/slave 共享存儲
  3. Replicated LevelDB Store基于負責的LevelDB Store

共享存儲集群的原理

基于復制的LevelDB Store的原理

兩種集群方式對比

三臺服務器的完美集群方案

5-3 ActiveMQ集群實踐

ActiveMQ集群配置方案

配置過程

1.節點準備

</>復制代碼

  1. mkdir activemq創建目錄
  2. cp -rf apache-activemq-5.15.0 activemq/activemq-a
  3. cp -rf apache-activemq-5.15.0 activemq/activemq-b
  4. cp -rf apache-activemq-5.15.0 activemq/activemq-c
  5. cd activemq
  6. mkdir kahadb

2.配置a節點

</>復制代碼

  1. cd activemq-a/
  2. cd conf/
  3. vim activemq.xml
  4. vim jetty.xml:配置管理端口號,a節點使用默認端口,無須配置

3.配置b節點

</>復制代碼

  1. vim activemq.xml
  2. 配置網絡連接器
  3. 配置持久化存儲路徑
  4. 配置服務端口
  5. vim jetty.xml
  6. 配置管理端口號

4.配置c節點

</>復制代碼

  1. vim activemq.xml
  2. 配置網絡連接器
  3. 配置持久化存儲路徑
  4. 配置服務端口
  5. vim jetty.xml
  6. 配置管理端口號

5.啟動服務
回到activemq目錄,分別啟動a,b,c三個節點

</>復制代碼

  1. ./activemq-a/bin/activemq start
  2. ./activemq-b/bin/activemq start
  3. ./activemq-c /bin/activemq start

檢查是否都啟動成功

</>復制代碼

  1. ps -ef |grep activemq

檢查是否對外提供服務,即端口是否被監聽(占用)

</>復制代碼

  1. netstat -anp |grep 61616
  2. netstat -anp |grep 61617
  3. netstat -anp |grep 61618

檢查發現61618即c節點沒有提供服務,但是c節點的進程是啟動成功了的。因為b節點和c點擊是master/slave配置,現在b節點獲取到了共享文件夾的所有權,所以c節點正在等待獲得資源,并且提供服務。即c節點在未獲得資源之前,是不提供服務的。

測試,把b節點殺掉,看c節點能不能提供61618的服務

</>復制代碼

  1. ./activemq-b/bin/activemq stop
  2. netstat -anp |grep 61618
  3. ./activemq-b/bin/activemq start
  4. netstat -anp |grep 61617

檢查發現,重新啟動b節點后,b節點61617端口并沒有提供服務,是因為現在b節點成為了slave節點,而c節點成為了master節點。所以,現在b節點啟動了,但是它并不對外提供服務。只有當c節點出現問題后,b節點才對外提供服務。

6.通過代碼測試集群配置是否生效

生產者

</>復制代碼

  1. package com.myimooc.jms.queue;
  2. import javax.jms.Connection;
  3. import javax.jms.ConnectionFactory;
  4. import javax.jms.Destination;
  5. import javax.jms.JMSException;
  6. import javax.jms.MessageProducer;
  7. import javax.jms.Session;
  8. import javax.jms.TextMessage;
  9. import org.apache.activemq.ActiveMQConnectionFactory;
  10. /**
  11. * App 生產者-隊列模式-集群配置測試
  12. * @author ZhangCheng on 2017-07-25
  13. *
  14. */
  15. public class AppProducerTest {
  16. /** failover 為狀態轉移的存在部分
  17. * 因a節點只作為消費者使用,所以這里不配置61616節點了。
  18. * */
  19. private static final String URL = "failover:(tcp://127.0.0.1:61617,tcp://127.0.0.1:61618)?randomize=true";
  20. /** 指定隊列的名稱 */
  21. private static final String QUEUE_NAME = "queue-test";
  22. public static void main(String[] args) throws JMSException {
  23. // 1.創建ConnectionFactory
  24. ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);
  25. // 2.創建Connection
  26. Connection connection = connectionFactory.createConnection();
  27. // 3.啟動連接
  28. connection.start();
  29. // 4.創建會話(第一個參數:是否在事務中處理)
  30. Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  31. // 5. 創建一個目標
  32. Destination destination = session.createQueue(QUEUE_NAME);
  33. // 6.創建一個生產者
  34. MessageProducer producer = session.createProducer(destination);
  35. for (int i = 0; i < 100; i++) {
  36. // 7.創建消息
  37. TextMessage textMessage = session.createTextMessage("test" + i);
  38. // 8.發布消息
  39. producer.send(textMessage);
  40. System.out.println("消息發送:" + textMessage.getText());
  41. }
  42. // 9.關閉連接
  43. connection.close();
  44. }
  45. }

消費者

</>復制代碼

  1. package com.myimooc.jms.queue;
  2. import javax.jms.Connection;
  3. import javax.jms.ConnectionFactory;
  4. import javax.jms.Destination;
  5. import javax.jms.JMSException;
  6. import javax.jms.Message;
  7. import javax.jms.MessageConsumer;
  8. import javax.jms.MessageListener;
  9. import javax.jms.Session;
  10. import javax.jms.TextMessage;
  11. import org.apache.activemq.ActiveMQConnectionFactory;
  12. /**
  13. * App 消費者-隊列模式-集群配置測試
  14. * @author ZhangCheng on 2017-07-22
  15. *
  16. */
  17. public class AppConsumerTest {
  18. /** failover 為狀態轉移的存在部分
  19. * */
  20. private static final String URL = "failover:(tcp://127.0.0.1:61616,tcp://127.0.0.1:61617,tcp://127.0.0.1:61618)?randomize=true";
  21. /** 指定隊列的名稱 */
  22. private static final String QUEUE_NAME = "queue-test";
  23. public static void main(String[] args) throws JMSException {
  24. // 1.創建ConnectionFactory
  25. ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);
  26. // 2.創建Connection
  27. Connection connection = connectionFactory.createConnection();
  28. // 3.啟動連接
  29. connection.start();
  30. // 4.創建會話(第一個參數:是否在事務中處理)
  31. Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  32. // 5.創建一個目標
  33. Destination destination = session.createQueue(QUEUE_NAME);
  34. // 6.創建一個消費者
  35. MessageConsumer consumer = session.createConsumer(destination);
  36. // 7.創建一個監聽器
  37. consumer.setMessageListener(new MessageListener() {
  38. public void onMessage(Message message) {
  39. TextMessage textMessage = (TextMessage)message;
  40. try {
  41. System.out.println("接收消息:" + textMessage.getText());
  42. } catch (JMSException e) {
  43. System.out.println("接收消息異常:");
  44. e.printStackTrace();
  45. }
  46. }
  47. });
  48. // 8.關閉連接
  49. //connection.close();
  50. }
  51. }

運行生產者,然后到管理界面查看消息發送到了那里

</>復制代碼

  1. http://127.0.0.1:8161
  2. http://127.0.0.1:8162
  3. http://127.0.0.1:8163

查看發現,8162無法訪問,是因為b節點是slave節點,不提供服務,消息都發送到了c節點

把8163即c節點宕掉后,運行消費者,查看消息是否能夠使用

</>復制代碼

  1. ./activemq-c/bin/activemq stop
5-4 企業級系統中的最佳實踐

實際業務場景分析

實際業務場景特點

</>復制代碼

  1. 子業務系統都有集群的可能性
  2. 同一個消息會廣播給關注該類消息的所有子業務系統
  3. 同一類消息在集群中被負載消費
  4. 業務的發生和消息的發布最終一致性

需要解決的問題

</>復制代碼

  1. 不同業務系統分別處理同一個消息,同一業務系統負載處理同類消息
  2. 解決消息發送時的一致性問題
  3. 解決消息處理的冪等性問題
  4. 基于消息機制建立事件總線

集群系統處理消息方案-使用JMS級聯的解決方案

集群系統處理消息方案-使用ActiveMQ的虛擬主題解決方案

</>復制代碼

  1. 發布者:將消息發布到一個主題中,主題名以VirtualTopic開頭,如VirtualTopic.TEST
  2. 消費者:從隊列中獲取消息,在隊列名中表名自己身份,如Consumer.A.VirtualTopic.TEST

解決消息發送時的一致性問題-使用JMS中XA系列接口保證強一致性

</>復制代碼

  1. 引入分布式事務
  2. 要求業務操作必須支持XA協議

解決消息發送時的一致性問題-使用消息表的本地事務解決方案

解決消息發送時的一致性問題-使用內存日志的解決方案

解決消息處理的冪等性問題

</>復制代碼

  1. 所謂冪等性問題,是指多次執行所產生的影響(結果)與一次執行所產生的影響(結果)一樣。比如:支付成功后,支付寶會發起多次通知給業務系統,要求業務系統能夠處理這些重復的消息,但是又不重復處理訂單。如果在消息處理系統中保證冪等性,會增加系統復雜度,我們可以統一處理冪等性后,再將消息發送給消息處理系統。

解決消息處理的冪等性問題-使用消息表的本地事務解決方案

解決消息處理的冪等性問題-使用內存日志的解決方案

基于消息機制的事件總線-什么是事件驅動架構

</>復制代碼

  1. 事件驅動架構(Event Driven Architecture,EDA)定義了一個設計和實現一個應用系統的方法學,在這個系統里事件可傳輸于松散耦合的組件和服務之間。特點:有事我叫你,沒事別煩我

事件驅動架構模型

該教師正在開發該事件總線的框架,github地址https://github.com/jovezhao/nest。

第六章:使用其它消息中間件 6-1 使用其它消息中間件

分析需要做的事

</>復制代碼

  1. 解決各業務系統集群處理同一條消息
  2. 實現自己的消息提供者

常用消息中間件

</>復制代碼

  1. ActiveMQ
  2. RabbitMQ
  3. Kafka

集成RabbitMQ

</>復制代碼

  1. RabbitMQ:使用交換器綁定到隊列

示意圖

RabbitMQ消息提供者源碼解析

</>復制代碼

  1. 創建ConnectionFactory
  2. 創建Connection
  3. 創建Channel
  4. 定義Exchange
  5. 定義Queue并且綁定隊列

集成Kafka

Kafka使用group.id分組消費者

</>復制代碼

  1. 配置消息者參數group.id相對時對消息進行負載處理
  2. 配置服務器partitions參數,控制同一個group.id下的consumer數量小于partitions
  3. Kafka只保證同一個partition下的消息是有序的
第七章:課程總結 7-1 課程總結

文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。

轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/67478.html

相關文章

  • 課網_《RabbitMQ消息間件極速入門與實戰》學習總結

    摘要:慕課網消息中間件極速入門與實戰學習總結時間年月日星期三說明本文部分內容均來自慕課網。 慕課網《RabbitMQ消息中間件極速入門與實戰》學習總結 時間:2018年09月05日星期三 說明:本文部分內容均來自慕課網。@慕課網:https://www.imooc.com 教學源碼:無 學習源碼:https://github.com/zccodere/s... 第一章:RabbitM...

    mykurisu 評論0 收藏0
  • 課網_Java實現Base64加密》學習總結

    摘要:時間年月日星期一說明本文部分內容均來自慕課網。多用于網絡加密。散列函數函數或消息摘要函數主要作用散列函數用來驗證數據的完整性。 時間:2017年4月10日星期一說明:本文部分內容均來自慕課網。@慕課網:http://www.imooc.com教學示例源碼:https://github.com/zccodere/s...個人學習源碼:https://github.com/zccodere...

    verano 評論0 收藏0
  • 課網_《初識Java微信公眾號開發》學習總結

    摘要:時間年月日星期五說明本文部分內容均來自慕課網。本套課程介紹微信公眾號開發,主要涉及公眾號介紹編輯模式介紹開發模式介紹等。慕課網是垂直的互聯網技能免費學習網站。 時間:2017年08月11日星期五說明:本文部分內容均來自慕課網。@慕課網:http://www.imooc.com教學源碼:https://github.com/zccodere/s...學習源碼:https://github...

    PrototypeZ 評論0 收藏0
  • 課網_Java實現消息摘要算法加密》學習總結

    時間:2017年4月10日星期一說明:本文部分內容均來自慕課網。@慕課網:http://www.imooc.com教學示例源碼:https://github.com/zccodere/s...個人學習源碼:https://github.com/zccodere/s... 第一章:概述 1-1 Java實現消息摘要算法加密 消息摘要算法 MD(Message Digest) SHA(Secure H...

    zengdongbao 評論0 收藏0
  • 課網_《Netty入門之WebSocket初體驗》學習總結

    時間:2018年04月11日星期三 說明:本文部分內容均來自慕課網。@慕課網:https://www.imooc.com 教學源碼:https://github.com/zccodere/s... 學習源碼:https://github.com/zccodere/s... 第一章:課程介紹 1-1 課程介紹 什么是Netty 高性能、事件驅動、異步非阻塞的IO Java開源框架 基于NIO的客戶...

    Noodles 評論0 收藏0

發表評論

0條評論

twohappy

|高級講師

TA的文章

閱讀更多
最新活動
閱讀需要支付1元查看
<