摘要:性能正在發布過多的消息系統性能,注意請使用單線程的萬條毫秒萬條毫秒萬毫秒萬條毫秒多線程的正在發布過多的消息問題異常信息正在進行過多的發布解決辦法消息發送發送限流用多帶帶的一個線程來完成消息的推送不用這個,使用就沒有事增加的值反思筆者出現這個錯
mqttclient性能&MQTT(32202): 正在發布過多的消息
org.eclipse.paho.client.mqttv3
2.2 GHz Intel Core i7 mac系統
publish性能,注意請使用單線程的 mqttclinet
1萬條 341毫秒
4萬條 1163毫秒
5萬 1450毫秒
10萬條 2700毫秒
多線程的 mqttclinet?MQTT(32202): 正在發布過多的消息 問題
異常信息[15:07:21]: publish failed, message: aaaa 正在進行過多的發布 (32202) at org.eclipse.paho.client.mqttv3.internal.ClientState.send(ClientState.java:496) at org.eclipse.paho.client.mqttv3.internal.ClientComms.internalSend(ClientComms.java:132) at org.eclipse.paho.client.mqttv3.internal.ClientComms.sendNoWait(ClientComms.java:156) at org.eclipse.paho.client.mqttv3.MqttAsyncClient.publish(MqttAsyncClient.java:1027) at org.eclipse.paho.client.mqttv3.MqttClient.publish(MqttClient.java:399) at io.communet.ichater.emq.util.MqttUtil.publishMsg(MqttUtil.java:171) at io.communet.ichater.emq.util.MqttUtil.publishMsg(MqttUtil.java:161) at io.communet.ichater.emq.sub.MqttSendMsgEventSubscribe.onEvent(MqttSendMsgEventSubscribe.java:28) at java.lang.reflect.Method.invoke(Native Method) at java.lang.reflect.Method.invoke(Method.java:372) at org.greenrobot.eventbus.EventBus.invokeSubscriber(EventBus.java:507) at org.greenrobot.eventbus.EventBus.invokeSubscriber(EventBus.java:501) at org.greenrobot.eventbus.AsyncPoster.run(AsyncPoster.java:46) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1112) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:587) at java.lang.Thread.run(Thread.java:818)解決辦法
消息發送發送限流
用多帶帶的一個線程來完成 MQ 消息的推送 (不用這個MqttAsyncClient ,使用MqttClient 就沒有事)
options.setMaxInflight(1000)?增加?actualInFlight?的值;
反思筆者出現這個錯誤是因為使用 EventBus, 之前使用多帶帶線程的 Handler 是沒有問題的, 調查發現, 使用 EventBus 是新建線程運行的, 而 Handler 是多帶帶一個線程.
所以當發送大量消息的時候, EventBus 幾乎是同一個點發出去, 就會造成這個錯誤
根據堆棧信息找到報錯地方
if (actualInFlight >= this.maxInflight) { //@TRACE 613= sending {0} msgs at max inflight window log.fine(CLASS_NAME, methodName, "613", new Object[]{new Integer(actualInFlight)}); throw new MqttException(MqttException.REASON_CODE_MAX_INFLIGHT); }
其中?actualInFlight?如下
// processed until the inflight window has space. if (actualInFlight < this.maxInflight) { // The in flight window is not full so process the // first message in the queue result = (MqttWireMessage)pendingMessages.elementAt(0); pendingMessages.removeElementAt(0); actualInFlight++; //@TRACE 623=+1 actualInFlight={0} log.fine(CLASS_NAME,methodName,"623",new Object[]{new Integer(actualInFlight)}); }
從?pendingMessages?中取出消息時,?actualInFlight?加 1,?maxInflight?可以自己設定, 默認值為 10.
public class ClientState { ... volatile private Vector pendingMessages; ... }
在?ClientState?中:
public void send(MqttWireMessage message, MqttToken token) throws MqttException { ... if (message instanceof MqttPublish) { synchronized (queueLock) { if (actualInFlight >= this.maxInflight) { //@TRACE 613= sending {0} msgs at max inflight window log.fine(CLASS_NAME, methodName, "613", new Object[]{new Integer(actualInFlight)}); throw new MqttException(MqttException.REASON_CODE_MAX_INFLIGHT); } MqttMessage innerMessage = ((MqttPublish) message).getMessage(); //@TRACE 628=pending publish key={0} qos={1} message={2} log.fine(CLASS_NAME,methodName,"628", new Object[]{new Integer(message.getMessageId()), new Integer(innerMessage.getQos()), message}); switch(innerMessage.getQos()) { case 2: outboundQoS2.put(new Integer(message.getMessageId()), message); persistence.put(getSendPersistenceKey(message), (MqttPublish) message); break; case 1: outboundQoS1.put(new Integer(message.getMessageId()), message); persistence.put(getSendPersistenceKey(message), (MqttPublish) message); break; } tokenStore.saveToken(token, message); pendingMessages.addElement(message); queueLock.notifyAll(); } } else { ... } }
可以看到?pendingMessages?中添加元素的時候并沒有做?qos?類型的判斷
private void decrementInFlight() { final String methodName = "decrementInFlight"; synchronized (queueLock) { actualInFlight--; //@TRACE 646=-1 actualInFlight={0} log.fine(CLASS_NAME,methodName,"646",new Object[]{new Integer(actualInFlight)}); if (!checkQuiesceLock()) { queueLock.notifyAll(); } } }
當收到消息反饋時?actualInFlight?減 1.
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/75369.html
摘要:現在很多網站都通過服務來實現消息推送及數據即時同步功能,即時通訊組件逐漸成為產品的標配。目前國內有很多成熟穩定的第三方即時通訊服務廠家,比如融云。 現在很多網站、APP都通過IM服務來實現消息推送及數據即時同步功能,即時通訊組件逐漸成為產品的標配。目前國內有很多成熟穩定的第三方即時通訊服務廠家,比如:融云。使用這些專業的服務可以提高開發效率而且服務穩定有保障。 如果自己DIY或者需要在...
摘要:本文是其中的一個解決方案。地址客戶端服務端前端網頁介紹,消息隊列遙測傳輸是開發的一個即時通訊協議,有可能成為物聯網的重要組成部分。必須用于在頂層分隔符之后,除了當自己指定時。 1. 問題描述 最近,本實驗室大量上馬云測量,云監控方面的項目,大概是屬于物聯網應用的一個分支。老板也有將舊有儀器改造的想法,所以要實現儀器設備的云控制。本文是其中的一個解決方案。 2. 技術選型 消息隊列:M...
摘要:超簡單深度睡眠模式下遠程采集溫濕度信息項目背景相關技術深度睡眠模式溫濕度采集數據收發前后端實現后端前端項目背景自己用收納箱做了一個用于存放打印耗材的干燥箱,想用閑置的開發板和溫濕度傳感器做一個遠程溫濕度監測的小項目。 ...
摘要:前言前些日子了解到這樣一個協議,可以在上達到即時通訊的效果,但網上并不能很方便地找到一篇目前版本的在下正確實現這個協議的博客。 前言 前些日子了解到mqtt這樣一個協議,可以在web上達到即時通訊的效果,但網上并不能很方便地找到一篇目前版本的在node下正確實現這個協議的博客。 自己搗鼓了一段時間,理解不深刻,但也算是基本能夠達到使用目的。 本文目的為對MQTT有需求的學習者提供一定程...
閱讀 954·2019-08-30 15:55
閱讀 550·2019-08-26 13:56
閱讀 2079·2019-08-26 12:23
閱讀 3295·2019-08-26 10:29
閱讀 600·2019-08-26 10:17
閱讀 2867·2019-08-23 16:53
閱讀 697·2019-08-23 15:55
閱讀 2813·2019-08-23 14:25