摘要:前提通過前面兩篇文章可以簡單的了解和安裝,今天就將和整合起來使用。然后我運行之前的整合項目,查看監控信息如下總結整篇文章講述了與整合和監控平臺的搭建。
前提
通過前面兩篇文章可以簡單的了解 RocketMQ 和 安裝 RocketMQ ,今天就將 SpringBoot 和 RocketMQ 整合起來使用。
1、SpringBoot Kafka 整合使用
2、SpringBoot RabbitMQ 整合使用
3、SpringBoot ActiveMQ 整合使用
4、Kafka 安裝及快速入門
5、SpringBoot RabbitMQ 整合進階版
6、RocketMQ 初探
7、RocketMQ 安裝及快速入門
關注我轉載請務必注明原創地址為:http://www.54tianzhisheng.cn/2018/02/07/SpringBoot-RocketMQ/
創建項目在 IDEA 創建一個 SpringBoot 項目,項目結構如下:
pom 文件引入 RocketMQ 的一些相關依賴,最后的 pom 文件如下:
配置文件4.0.0 com.zhisheng rocketmq 0.0.1-SNAPSHOT jar rocketmq Demo project for Spring Boot RocketMQ org.springframework.boot spring-boot-starter-parent 1.5.9.RELEASE UTF-8 UTF-8 1.8 org.springframework.boot spring-boot-starter-web org.springframework.boot spring-boot-starter-test test org.apache.rocketmq rocketmq-common 4.2.0 org.apache.rocketmq rocketmq-client 4.2.0 org.springframework.boot spring-boot-maven-plugin
application.properties 中如下:
# 消費者的組名 apache.rocketmq.consumer.PushConsumer=PushConsumer # 生產者的組名 apache.rocketmq.producer.producerGroup=Producer # NameServer地址 apache.rocketmq.namesrvAddr=localhost:9876生產者
package com.zhisheng.rocketmq.client; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import org.springframework.util.StopWatch; import javax.annotation.PostConstruct; /** * Created by zhisheng_tian on 2018/2/6 */ @Component public class RocketMQClient { /** * 生產者的組名 */ @Value("${apache.rocketmq.producer.producerGroup}") private String producerGroup; /** * NameServer 地址 */ @Value("${apache.rocketmq.namesrvAddr}") private String namesrvAddr; @PostConstruct public void defaultMQProducer() { //生產者的組名 DefaultMQProducer producer = new DefaultMQProducer(producerGroup); //指定NameServer地址,多個地址以 ; 隔開 producer.setNamesrvAddr(namesrvAddr); try { /** * Producer對象在使用之前必須要調用start初始化,初始化一次即可 * 注意:切記不可以在每次發送消息時,都調用start方法 */ producer.start(); //創建一個消息實例,包含 topic、tag 和 消息體 //如下:topic 為 "TopicTest",tag 為 "push" Message message = new Message("TopicTest", "push", "發送消息----zhisheng-----".getBytes(RemotingHelper.DEFAULT_CHARSET)); StopWatch stop = new StopWatch(); stop.start(); for (int i = 0; i < 10000; i++) { SendResult result = producer.send(message); System.out.println("發送響應:MsgId:" + result.getMsgId() + ",發送狀態:" + result.getSendStatus()); } stop.stop(); System.out.println("----------------發送一萬條消息耗時:" + stop.getTotalTimeMillis()); } catch (Exception e) { e.printStackTrace(); } finally { producer.shutdown(); } } }消費者
package com.zhisheng.rocketmq.server; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; /** * Created by zhisheng_tian on 2018/2/6 */ @Component public class RocketMQServer { /** * 消費者的組名 */ @Value("${apache.rocketmq.consumer.PushConsumer}") private String consumerGroup; /** * NameServer 地址 */ @Value("${apache.rocketmq.namesrvAddr}") private String namesrvAddr; @PostConstruct public void defaultMQPushConsumer() { //消費者的組名 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup); //指定NameServer地址,多個地址以 ; 隔開 consumer.setNamesrvAddr(namesrvAddr); try { //訂閱PushTopic下Tag為push的消息 consumer.subscribe("TopicTest", "push"); //設置Consumer第一次啟動是從隊列頭部開始消費還是隊列尾部開始消費 //如果非第一次啟動,那么按照上次消費的位置繼續消費 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.registerMessageListener((MessageListenerConcurrently) (list, context) -> { try { for (MessageExt messageExt : list) { System.out.println("messageExt: " + messageExt);//輸出消息內容 String messageBody = new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET); System.out.println("消費響應:msgId : " + messageExt.getMsgId() + ", msgBody : " + messageBody);//輸出消息內容 } } catch (Exception e) { e.printStackTrace(); return ConsumeConcurrentlyStatus.RECONSUME_LATER; //稍后再試 } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //消費成功 }); consumer.start(); } catch (Exception e) { e.printStackTrace(); } } }啟動類
package com.zhisheng.rocketmq; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class RocketmqApplication { public static void main(String[] args) { SpringApplication.run(RocketmqApplication.class, args); } }RocketMQ
代碼已經都寫好了,接下來我們需要將與 RocketMQ 有關的啟動起來。
啟動 Name Server在前面文章中已經寫過怎么啟動,http://www.54tianzhisheng.cn/2018/02/06/RocketMQ-install/#%E5%90%AF%E5%8A%A8-NameServer
進入到目錄 :
cd distribution/target/apache-rocketmq
啟動:
nohup sh bin/mqnamesrv & tail -f ~/logs/rocketmqlogs/namesrv.log //通過日志查看是否啟動成功啟動 Broker
nohup sh bin/mqbroker -n localhost:9876 & tail -f ~/logs/rocketmqlogs/broker.log //通過日志查看是否啟動成功
然后運行啟動類,運行效果如下:
監控RocketMQ有一個對其擴展的開源項目 ocketmq-console ,如今也提交給了 Apache ,地址在:[https://github.com/apache/roc...]() ,官方也給出了其支持的功能的中文文檔:[https://github.com/apache/roc...]() , 那么該如何安裝?
Docker 安裝1、獲取 Docker 鏡像
docker pull styletang/rocketmq-console-ng
2、運行,注意將你自己的 NameServer 地址替換下面的 127.0.0.1
docker run -e "JAVA_OPTS=-Drocketmq.namesrv.addr=127.0.0.1:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false" -p 8080:8080 -t styletang/rocketmq-console-ng非 Docker 安裝
我們 git clone 一份代碼到本地:
git clone https://github.com/apache/rocketmq-externals.git cd rocketmq-externals/rocketmq-console/
需要 jdk 1.7 以上。 執行以下命令:
mvn spring-boot:run
或者
mvn clean package -Dmaven.test.skip=true java -jar target/rocketmq-console-ng-1.0.0.jar
注意:
1、如果你下載依賴緩慢,你可以重新設置 maven 的 mirror 為阿里云的鏡像
alimaven aliyun maven http://maven.aliyun.com/nexus/content/groups/public/ central
2、如果你使用的 RocketMQ 版本小于 3.5.8,如果您使用 rocketmq < 3.5.8,請在啟動 rocketmq-console-ng 時添加 -Dcom.rocketmq.sendMessageWithVIPChannel = false(或者您可以在 ops 頁面中更改它)
3、更改 resource / application.properties 中的 rocketmq.config.namesrvAddr(或者可以在ops頁面中更改它)
錯誤解決方法1、Docker 啟動項目報錯
org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to
將 Docker 啟動命令改成如下以后:
docker run -e "JAVA_OPTS=-Drocketmq.config.namesrvAddr=127.0.0.1:9876 -Drocketmq.config.isVIPChannel=false" -p 8080:8080 -t styletang/rocketmq-console-ng
報錯信息改變了,新的報錯信息如下:
ERROR op=global_exception_handler_print_error org.apache.rocketmq.console.exception.ServiceException: This date have"t data!
看到網上有人也遇到這個問題,他們都通過自己的方式解決了,但是方法我都試了,不適合我。不得不說,阿里,你能再用心點嗎?既然把 RocketMQ 捐給 Apache 了,這些文檔啥的都必須更新啊,不要還滯后著呢,不然少不了被吐槽!
搞了很久這種方法沒成功,暫時放棄!mmp
2、非 Docker 安裝,只好把源碼編譯打包了。
1) 注意需要修改如下圖中的配置:
rocketmq.config.namesrvAddr=localhost:9876 //注意替換你自己的ip #如果你 rocketmq 版本小于 3.5.8 才需設置 `rocketmq.config.isVIPChannel` 為 false,默認是 true, 這個可以在源碼中可以看到的 rocketmq.config.isVIPChannel=
2) 執行以下命令:
mvn clean package -Dmaven.test.skip=true
編譯成功:
可以看到已經打好了 jar 包:
運行:
java -jar rocketmq-console-ng-1.0.0.jar
成功,不報錯了,開心?,訪問 http://localhost:8080/
整個監控大概就是這些了。
然后我運行之前的 SpringBoot 整合項目,查看監控信息如下:
總結整篇文章講述了 SpringBoot 與 RocketMQ 整合和 RocketMQ 監控平臺的搭建。
參考文章1、[http://www.ymq.io/2018/02/02/...]()
2、GitHub 官方 README
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/70963.html
摘要:前提好幾周沒更新博客了,對不斷支持我博客的童鞋們說聲抱歉了。熟悉我的人都知道我寫博客的時間比較早,而且堅持的時間也比較久,一直到現在也是一直保持著更新狀態。 showImg(https://segmentfault.com/img/remote/1460000014076586?w=1920&h=1080); 前提 好幾周沒更新博客了,對不斷支持我博客的童鞋們說聲:抱歉了!。自己這段時...
摘要:什么是是一個管理和監控你的應用程序的應用程序。這些應用程序通過通過注冊或者使用例如發現。剛才首頁的應用列表后面有個紅色的,我們可以將注冊上去的應用移除,但是只要你不把程序停掉,它立馬又會注冊上去。 showImg(http://ww3.sinaimg.cn/large/006tNc79ly1g5h6jqpgs9j30u00gwdhe.jpg); 什么是 SpringBoot Admin...
摘要:背景最近來了個實習僧小弟,安排他實現對目標網站連通性檢測的小功能簡單講就是將下邊的腳本換成代碼來實現百度平臺狀態不正常,請注意功能實現使用開始執行定時任務,檢測百度網站連通性請求百度成功,返回報文請求異常百度執行檢測百度網站連通 背景 最近來了個實習僧小弟,安排他實現對目標網站 連通性檢測的小功能,簡單講就是將下邊的shell 腳本換成Java 代碼來實現 1#!/bin/bash ...
摘要:注意一定要親自自己安裝實踐,接下來我們將這兩個進行整合。創建項目項目整體架構使用創建項目,這個很簡單了,這里不做過多的講解。 showImg(http://ww4.sinaimg.cn/large/006tNc79gy1g5iatph25rj30u00gw0yj.jpg); 前提 假設你了解過 SpringBoot 和 Kafka。 1、SpringBoot 如果對 SpringBoo...
摘要:可以在地址看到如何使用講解下上面命令行表示控制臺端口號,可以在瀏覽器中通過控制臺來執行的相關操作。同時從控制臺可以看到發送的速率多線程測試性能開了個線程,每個線程發送條消息。 showImg(http://ww2.sinaimg.cn/large/006tNc79ly1g5jjb62t88j30u00gwdi2.jpg); 前提 上次寫了篇文章,《SpringBoot Kafka 整合...
閱讀 1648·2021-09-22 15:21
閱讀 2868·2021-09-09 09:32
閱讀 2693·2021-09-02 09:52
閱讀 3308·2019-08-30 14:02
閱讀 2222·2019-08-26 13:25
閱讀 1456·2019-08-26 13:24
閱讀 1607·2019-08-26 10:31
閱讀 1559·2019-08-26 10:16