摘要:慕課網消息中間件極速入門與實戰學習總結時間年月日星期三說明本文部分內容均來自慕課網。
慕課網《RabbitMQ消息中間件極速入門與實戰》學習總結
時間:2018年09月05日星期三
說明:本文部分內容均來自慕課網。@慕課網:https://www.imooc.com
教學源碼:無
學習源碼:https://github.com/zccodere/s...
第一章:RabbitMQ起步 1-1 課程導航課程導航
RabbitMQ簡介及AMQP協議
RabbitMQ安裝與使用
RabbitMQ核心概念
與SpringBoot整合
保障100%的消息可靠性投遞方案落地實現
1-2 RabbitMQ簡介初識RabbitMQ
RabbitMQ是一個開源的消息代理和隊列服務器
用來通過普通協議在完全不同的應用之間共享數據
RabbitMQ是使用Erlang語言來編寫的
并且RabbitMQ是基于AMQP協議的
RabbitMQ簡介
目前很多互聯網大廠都在使用RabbitMQ
RabbitMQ底層采用Erlang語言進行編寫
開源、性能優秀,穩定性保障
與SpringAMQP完美的整合、API豐富
集群模式豐富,表達式配置,HA模式,鏡像隊列模型
保證數據不丟失的前提做到高可靠性、可用性
AMQP全稱:Advanced Message Queuing Protocol
AMQP翻譯:高級消息隊列協議
AMQP協議模型
1-3 RabbitMQ安裝學習筆記
0.安裝準備 官網地址:http://www.rabbitmq.com/ 安裝Linux必要依賴包1-4 RabbitMQ概念下載RabbitMQ安裝包 進行安裝,修改相關配置文件 vim /etc/hostname vim /etc/hosts 1.安裝Erlang wget https://packages.erlang-solutions.com/erlang-solutions_1.0_all.deb sudo dpkg -i erlang-solutions_1.0_all.deb sudo apt-get install erlang erlang-nox 2.安裝RabbitMQ echo "deb http://www.rabbitmq.com/debian/ testing main" | sudo tee /etc/apt/sources.list.d/rabbitmq.list wget -O- https://www.rabbitmq.com/rabbitmq-release-signing-key.asc | sudo apt-key add - sudo apt-get install rabbitmq-server 3.安裝RabbitMQ web管理插件 sudo rabbitmq-plugins enable rabbitmq_management sudo systemctl restart rabbitmq-server 訪問:http://localhost:15672 默認用戶名密碼:guest/guest 4.修改RabbitMQ配置 vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.5.7/ebin/rabbit.app 比如修改密碼、配置等等;例如:loopback_users中的<<"guest">>,只保留guest 服務啟動:rabbitmq-server start & 服務停止:rabbitmqctl app_stop
RabbitMQ的整體架構
RabbitMQ核心概念
Server:又稱Broker,接受客戶端的連接,實現AMQP實體服務
Connection:連接,應用程序與Broker的網絡連接
Channel:網絡信道
幾乎所有的操作都在Channel中進行
Channel是進行消息讀寫的通道
客戶端可建立多個Channel
每個Channel代表一個會話任務
Message:消息
服務器和應用程序之間傳送的數據,由Properties和Body組成
Properties可以對消息進行修飾,比如消息的優先級、延遲等高級特性
Body則就是消息體內容
Virtual host:虛擬機
用于進行邏輯隔離,最上層的消息路由
一個Virtual host里面可以有若干個Exchange和Queue
同一個Virtual host里面不能有相同名稱的Exchange或Queue
Exchange:交換機,接收消息,根據路由鍵轉發消息到綁定的隊列
Binding:Exchange和Queue之間的虛擬連接,binding中可以包含routing key
Routing key:一個路由規則,虛擬機可用它來確定如何路由一個特定消息
Queue:也稱為Message Queue,消息隊列,保存消息并將它們轉發給消費者
RabbitMQ消息的流轉過程
第二章:RabbitMQ使用 2-1 發送消息SpringBoot與RabbitMQ集成
引入相關依賴
對application.properties進行配置
創建名為rabbitmq-producer的maven工程pom如下
47-rabbitmq com.myimooc 1.0-SNAPSHOT 4.0.0 rabbitmq-producer 2.0.4.RELEASE org.springframework.boot spring-boot-parent ${spring.boot.version} pom import org.springframework.boot spring-boot-starter org.springframework.boot spring-boot-starter-amqp org.apache.commons commons-lang3 commons-io commons-io 2.5 com.alibaba fastjson 1.2.36 javax.servlet javax.servlet-api provided org.slf4j slf4j-api log4j log4j 1.2.17 org.springframework.boot spring-boot-starter-test test org.springframework.boot spring-boot-maven-plugin
1.編寫Order類
package com.myimooc.rabbitmq.entity; import java.io.Serializable; /** *
* 標題: 訂單實體
* 描述: 訂單實體
* 時間: 2018/09/06
* * @author zc */ public class Order implements Serializable{ private static final long serialVersionUID = 6771608755338249746L; private String id; private String name; /** * 存儲消息發送的唯一標識 */ private String messageId; public Order() { } public Order(String id, String name, String messageId) { this.id = id; this.name = name; this.messageId = messageId; } @Override public String toString() { return "Order{" + "id="" + id + """ + ", name="" + name + """ + ", messageId="" + messageId + """ + "}"; } public String getId() { return id; } public void setId(String id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getMessageId() { return messageId; } public void setMessageId(String messageId) { this.messageId = messageId; } }
2.編寫OrderSender類
package com.myimooc.rabbitmq.producer.producer; import com.myimooc.rabbitmq.entity.Order; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.support.CorrelationData; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** *
* 標題: 訂單消息發送者
* 描述: 訂單消息發送者
* 時間: 2018/09/06
* * @author zc */ @Component public class OrderSender { private RabbitTemplate rabbitTemplate; @Autowired public OrderSender( RabbitTemplate rabbitTemplate) { this.rabbitTemplate = rabbitTemplate; } /** * 發送訂單 * * @param order 訂單 * @throws Exception 異常 */ public void send(Order order) throws Exception { CorrelationData correlationData = new CorrelationData(); correlationData.setId(order.getMessageId()); // exchange:交換機 // routingKey:路由鍵 // message:消息體內容 // correlationData:消息唯一ID this.rabbitTemplate.convertAndSend("order-exchange", "order.a", order, correlationData); } }
3.編寫application.properties類
# RabbitMQ配置 spring.rabbitmq.addresses=192.168.0.105:5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest spring.rabbitmq.virtual-host=/ spring.rabbitmq.connection-timeout=15000 # Server配置 server.servlet.context-path=/ server.port=8080 spring.http.encoding.charset=UTF-8 spring.jackson.date-format=yyyy-MM-dd HH:mm:ss spring.jackson.time-zone=GMT+8 spring.jackson.default-property-inclusion=NON_NULL
4.編寫Application類
package com.myimooc.rabbitmq.producer; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; /** *
* 標題: 啟動類
* 描述: 啟動類
* 時間: 2018/09/06
* * @author zc */ @SpringBootApplication public class Application { public static void main(String[] args) { SpringApplication.run(Application.class, args); } }
5.編寫OrderSenderTest類
package com.myimooc.rabbitmq.producer.producer; import com.myimooc.rabbitmq.entity.Order; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import java.util.UUID; /** *2-2 處理消息
* 標題: 訂單消息發送者測試
* 描述: 訂單消息發送者測試
* 時間: 2018/09/06
* * @author zc */ @RunWith(SpringRunner.class) @SpringBootTest public class OrderSenderTest { @Autowired private OrderSender orderSender; @Test public void testSend1() throws Exception { Order order = new Order(); order.setId("201809062009010001"); order.setName("測試訂單1"); order.setMessageId(System.currentTimeMillis() + "$" + UUID.randomUUID().toString().replaceAll("-","")); this.orderSender.send(order); } }
創建名為rabbitmq-consumer的maven工程pom如下
47-rabbitmq com.myimooc 1.0-SNAPSHOT 4.0.0 rabbitmq-consumer 2.0.4.RELEASE org.springframework.boot spring-boot-parent ${spring.boot.version} pom import org.springframework.boot spring-boot-starter org.springframework.boot spring-boot-starter-amqp org.apache.commons commons-lang3 commons-io commons-io 2.5 com.alibaba fastjson 1.2.36 javax.servlet javax.servlet-api provided org.slf4j slf4j-api log4j log4j 1.2.17 org.springframework.boot spring-boot-starter-test test org.springframework.boot spring-boot-maven-plugin
1.編寫Order類
package com.myimooc.rabbitmq.entity; import java.io.Serializable; /** *
* 標題: 訂單實體
* 描述: 訂單實體
* 時間: 2018/09/06
* * @author zc */ public class Order implements Serializable{ private static final long serialVersionUID = 6771608755338249746L; private String id; private String name; /** * 存儲消息發送的唯一標識 */ private String messageId; public Order() { } public Order(String id, String name, String messageId) { this.id = id; this.name = name; this.messageId = messageId; } @Override public String toString() { return "Order{" + "id="" + id + """ + ", name="" + name + """ + ", messageId="" + messageId + """ + "}"; } public String getId() { return id; } public void setId(String id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getMessageId() { return messageId; } public void setMessageId(String messageId) { this.messageId = messageId; } }
2.編寫OrderReceiver類
package com.myimooc.rabbitmq.consumer.consumer; import com.rabbitmq.client.Channel; import com.myimooc.rabbitmq.entity.Order; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.amqp.support.AmqpHeaders; import org.springframework.messaging.handler.annotation.Headers; import org.springframework.messaging.handler.annotation.Payload; import org.springframework.stereotype.Component; import java.util.Map; /** *
* 標題: 訂單接收者
* 描述: 訂單接收者
* 時間: 2018/09/06
* * @author zc */ @Component public class OrderReceiver { /** * 接收消息 * * @param order 消息體內容 * @param headers 消息頭內容 * @param channel 網絡信道 * @throws Exception 異常 */ @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "order-queue",durable = "true"), exchange = @Exchange(name = "order-exchange",type = "topic"), key = "order.*" )) @RabbitHandler public void onOrderMessage(@Payload Order order, @Headers Mapheaders, Channel channel) throws Exception { // 消費者操作 System.out.println("收到消息:"); System.out.println("訂單信息:" + order.toString()); // 手動簽收消息 Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG); channel.basicAck(deliveryTag, false); } }
3.編寫application.properties類
# RabbitMQ連接配置 spring.rabbitmq.addresses=192.168.0.105:5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest spring.rabbitmq.virtual-host=/ spring.rabbitmq.connection-timeout=15000 # RabbitMQ消費配置 # 基本并發:5 spring.rabbitmq.listener.simple.concurrency=5 # 最大并發:10 spring.rabbitmq.listener.simple.max-concurrency=10 # 簽收模式:手動簽收 spring.rabbitmq.listener.simple.acknowledge-mode=manual # 限流策略:同一時間只有1條消息發送過來消費 spring.rabbitmq.listener.simple.prefetch=1 # Server配置 server.servlet.context-path=/ server.port=8082 spring.http.encoding.charset=UTF-8 spring.jackson.date-format=yyyy-MM-dd HH:mm:ss spring.jackson.time-zone=GMT+8 spring.jackson.default-property-inclusion=NON_NULL
4.編寫Application類
package com.myimooc.rabbitmq.consumer; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; /** *第三章:可靠性投遞 3-1 設計方案
* 標題: 啟動類
* 描述: 啟動類
* 時間: 2018/09/06
* * @author zc */ @SpringBootApplication public class Application { public static void main(String[] args) { SpringApplication.run(Application.class, args); } }
保障100%消息投遞成功設計方案(一)
3-2 代碼詳解因篇幅限制,源碼請到github地址查看,這里僅展示核心關鍵類
1.編寫OrderSender類
package com.myimooc.rabbitmq.ha.producer; import com.myimooc.rabbitmq.entity.Order; import com.myimooc.rabbitmq.ha.constant.Constants; import com.myimooc.rabbitmq.ha.dao.mapper.BrokerMessageLogMapper; import com.myimooc.rabbitmq.ha.dao.po.BrokerMessageLogPO; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.support.CorrelationData; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** *
* 標題: 訂單消息發送者
* 描述: 訂單消息發送者
* 時間: 2018/09/06
* * @author zc */ @Component public class OrderSender { @Autowired private RabbitTemplate rabbitTemplate; @Autowired private BrokerMessageLogMapper brokerMessageLogMapper; /** * 回調方法:confirm確認 */ private final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { System.out.println("correlationData:" + correlationData); String messageId = correlationData.getId(); if (ack) { // 如果confirm返回成功,則進行更新 BrokerMessageLogPO messageLogPO = new BrokerMessageLogPO(); messageLogPO.setMessageId(messageId); messageLogPO.setStatus(Constants.OrderSendStatus.SEND_SUCCESS); brokerMessageLogMapper.changeBrokerMessageLogStatus(messageLogPO); } else { // 失敗則進行具體的后續操作:重試或者補償等 System.out.println("異常處理..."); } } }; /** * 發送訂單 * * @param order 訂單 */ public void send(Order order) { // 設置回調方法 this.rabbitTemplate.setConfirmCallback(confirmCallback); // 消息ID CorrelationData correlationData = new CorrelationData(order.getMessageId()); // 發送消息 this.rabbitTemplate.convertAndSend("order-exchange", "order.a", order, correlationData); } }
2.編寫OrderService類
package com.myimooc.rabbitmq.ha.service; import com.myimooc.rabbitmq.entity.Order; import com.myimooc.rabbitmq.ha.constant.Constants; import com.myimooc.rabbitmq.ha.dao.mapper.BrokerMessageLogMapper; import com.myimooc.rabbitmq.ha.dao.mapper.OrderMapper; import com.myimooc.rabbitmq.ha.dao.po.BrokerMessageLogPO; import com.myimooc.rabbitmq.ha.producer.OrderSender; import com.myimooc.rabbitmq.ha.util.FastJsonConvertUtils; import org.apache.commons.lang3.time.DateUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.util.Date; /** *
* 標題: 訂單服務
* 描述: 訂單服務
* 時間: 2018/09/07
* * @author zc */ @Service public class OrderService { @Autowired private OrderMapper orderMapper; @Autowired private BrokerMessageLogMapper brokerMessageLogMapper; @Autowired private OrderSender orderSender; /** * 創建訂單 * * @param order 訂單 */ public void create(Order order) { // 當前時間 Date orderTime = new Date(); // 業務數據入庫 this.orderMapper.insert(order); // 消息日志入庫 BrokerMessageLogPO messageLogPO = new BrokerMessageLogPO(); messageLogPO.setMessageId(order.getMessageId()); messageLogPO.setMessage(FastJsonConvertUtils.convertObjectToJson(order)); messageLogPO.setTryCount(0); messageLogPO.setStatus(Constants.OrderSendStatus.SENDING); messageLogPO.setNextRetry(DateUtils.addMinutes(orderTime, Constants.ORDER_TIMEOUT)); this.brokerMessageLogMapper.insert(messageLogPO); // 發送消息 this.orderSender.send(order); } }
3.編寫RetryMessageTask類
package com.myimooc.rabbitmq.ha.task; import com.myimooc.rabbitmq.entity.Order; import com.myimooc.rabbitmq.ha.constant.Constants; import com.myimooc.rabbitmq.ha.dao.mapper.BrokerMessageLogMapper; import com.myimooc.rabbitmq.ha.dao.po.BrokerMessageLogPO; import com.myimooc.rabbitmq.ha.producer.OrderSender; import com.myimooc.rabbitmq.ha.util.FastJsonConvertUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import java.util.List; /** *
* 標題: 重發消息定時任務
* 描述: 重發消息定時任務
* 時間: 2018/09/07
* * @author zc */ @Component public class RetryMessageTask { private Logger logger = LoggerFactory.getLogger(getClass()); @Autowired private OrderSender orderSender; @Autowired private BrokerMessageLogMapper brokerMessageLogMapper; /** * 啟動完成3秒后開始執行,每隔10秒執行一次 */ @Scheduled(initialDelay = 3000, fixedDelay = 10000) public void retrySend() { logger.debug("重發消息定時任務開始"); // 查詢 status = 0 和 timeout 的消息日志 Listpos = this.brokerMessageLogMapper.listSendFailureAndTimeoutMessage(); for (BrokerMessageLogPO po : pos) { logger.debug("處理消息日志:{}",po); if (po.getTryCount() >= Constants.MAX_RETRY_COUNT) { // 更新狀態為失敗 BrokerMessageLogPO messageLogPO = new BrokerMessageLogPO(); messageLogPO.setMessageId(po.getMessageId()); messageLogPO.setStatus(Constants.OrderSendStatus.SEND_FAILURE); this.brokerMessageLogMapper.changeBrokerMessageLogStatus(messageLogPO); } else { // 進行重試,重試次數+1 this.brokerMessageLogMapper.updateRetryCount(po); Order reSendOrder = FastJsonConvertUtils.convertJsonToObject(po.getMessage(), Order.class); try { this.orderSender.send(reSendOrder); } catch (Exception ex) { // 異常處理 logger.error("消息發送異常:{}", ex); } } } logger.debug("重發消息定時任務結束"); } }
4.編寫ApplicationTest類
package com.myimooc.rabbitmq.ha; import com.myimooc.rabbitmq.entity.Order; import com.myimooc.rabbitmq.ha.service.OrderService; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import java.util.UUID; /** *
* 標題: 訂單創建測試
* 描述: 訂單創建測試
* 時間: 2018/09/07
* * @author zc */ @RunWith(SpringRunner.class) @SpringBootTest public class ApplicationTest { @Autowired private OrderService orderService; @Test public void testCreateOrder(){ Order order = new Order(); order.setId(String.valueOf(System.currentTimeMillis())); order.setName("測試創建訂單"); order.setMessageId(System.currentTimeMillis() + "$" + UUID.randomUUID().toString().replaceAll("-","")); this.orderService.create(order); } }
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/77035.html
摘要:時間年月日星期六說明本文部分內容均來自慕課網。這個時候,可以啟動多臺積分系統,來同時消費這個消息中間件里面的登錄消息,達到橫向擴展的作用。 時間:2017年07月22日星期六說明:本文部分內容均來自慕課網。@慕課網:http://www.imooc.com教學源碼:無學習源碼:https://github.com/zccodere/s... 第一章:課程介紹 1-1 課程安排 Java...
時間:2018年04月11日星期三 說明:本文部分內容均來自慕課網。@慕課網:https://www.imooc.com 教學源碼:https://github.com/zccodere/s... 學習源碼:https://github.com/zccodere/s... 第一章:課程介紹 1-1 課程介紹 什么是Netty 高性能、事件驅動、異步非阻塞的IO Java開源框架 基于NIO的客戶...
摘要:時間年月日星期六說明本文部分內容均來自慕課網。必填用于執行命令,當執行完畢后,將產生一個新的文件層。可選指定此鏡像啟動時默認執行命令。可選用于指定需要暴露的網絡端口號。可選向鏡像中掛載一個卷組。 時間:2017年09月16日星期六說明:本文部分內容均來自慕課網。@慕課網:http://www.imooc.com 教學源碼:無 學習源碼:無 第一章:課程簡介 1-1 課程介紹 Docke...
摘要:入門篇學習總結時間年月日星期三說明本文部分內容均來自慕課網。主要的功能是日志記錄,性能統計,安全控制,事務處理,異常處理等等。 《Spring入門篇》學習總結 時間:2017年1月18日星期三說明:本文部分內容均來自慕課網。@慕課網:http://www.imooc.com教學示例源碼:https://github.com/zccodere/s...個人學習源碼:https://git...
閱讀 2560·2023-04-26 01:44
閱讀 2565·2021-09-10 10:50
閱讀 1416·2019-08-30 15:56
閱讀 2269·2019-08-30 15:44
閱讀 517·2019-08-29 11:14
閱讀 3422·2019-08-26 11:56
閱讀 3022·2019-08-26 11:52
閱讀 910·2019-08-26 10:27