摘要:發現問題在使用中發現在設備頻繁上下線和兩個設備一樣相互頂替連接的情況下,的和的方法調用沒有先后順序,如果在這兩個方法里面來記錄設備上下線狀態,會造成狀態不對。因為相互頂替的情況并不多見,因此兩個也可以接受,在性能上并不會造成多大影響。
發現問題
在moquette使用中發現在設備頻繁上下線和兩個設備ClientId一樣相互頂替連接的情況下,InterceptHandler的onConnect和onConnectionLost的方法調用沒有先后順序,如果在這兩個方法里面來記錄設備上下線狀態,會造成狀態不對。
io.moquette.spi.impl.ProtocolProcessor中的processConnect(Channel channel, MqttConnectMessage msg)部分代碼如下
ConnectionDescriptor descriptor = new ConnectionDescriptor(clientId, channel, cleanSession); final ConnectionDescriptor existing = this.connectionDescriptors.addConnection(descriptor); if (existing != null) { LOG.info("Client ID is being used in an existing connection, force to be closed. CId={}", clientId); existing.abort(); this.connectionDescriptors.removeConnection(existing); this.connectionDescriptors.addConnection(descriptor); } initializeKeepAliveTimeout(channel, msg, clientId); storeWillMessage(msg, clientId); if (!sendAck(descriptor, msg, clientId)) { channel.close().addListener(CLOSE_ON_FAILURE); return; } m_interceptor.notifyClientConnected(msg);
可以看到existing.abort();后會m_interceptor.notifyClientConnected(msg); 先斷開原來的連接,然后接著通知上線。由于Netty本身就是異步的,再加上InterceptHandler相關方法的調用都是在線程池中進行的,因此nterceptHandler的onConnect和onConnectionLost的方法調用先后順序是無法保證的。
解決方法在ChannelHandler鏈中添加一個handler,專門處理設備上線事件,對于相同ClientId的連接已經存在時,連接斷開和連接事件強制加上時序。
@Sharable public class AbrotExistConnectionMqttHandler extends ChannelInboundHandlerAdapter { private static final Logger LOG = LoggerFactory.getLogger(AbrotExistConnectionMqttHandler.class); private final ProtocolProcessor m_processor; private static final ReentrantLock[] locks = new ReentrantLock[8]; static { for (int i = 0; i < locks.length; i++) { locks[i] = new ReentrantLock(); } } public AbrotExistConnectionMqttHandler(ProtocolProcessor m_processor) { this.m_processor = m_processor; } @Override public void channelRead(ChannelHandlerContext ctx, Object message) throws Exception { MqttMessage msg = (MqttMessage) message; MqttMessageType messageType = msg.fixedHeader().messageType(); LOG.debug("Processing MQTT message, type: {}", messageType); if (messageType != MqttMessageType.CONNECT) { super.channelRead(ctx, message); return; } MqttConnectMessage connectMessage = (MqttConnectMessage) msg; String clientId = connectMessage.payload().clientIdentifier(); /** * 通過鎖和sleep來解決設備互頂出現的設備上線和下線回調時序錯亂的問題 * 目前解決的方式通過sleep不是太好 * 解決了多個連接互相頂替出現的問題(有一個連接先連接的情況) * */ ReentrantLock lock = locks[Math.abs(clientId.hashCode()) % locks.length]; lock.lock(); try { if (!m_processor.isConnected(clientId)) { super.channelRead(ctx, message); return; } m_processor.abortIfExist(clientId); Thread.sleep(50); super.channelRead(ctx, message); Thread.sleep(30); } catch (Exception ex) { ex.printStackTrace(); super.channelRead(ctx, message); } finally { lock.unlock(); } } }
解釋:
1.通過ReentrantLock lock = locks[Math.abs(clientId.hashCode()) % locks.length];來保證相同的ClientId的連接都會獲得同一個鎖
2.通過兩次Thread.sleep(50);將斷開連接和處理設備上線變成先后順序關系。
3.因為相互頂替的情況并不多見,因此兩個Thread.sleep()也可以接受,在性能上并不會造成多大影響。
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/77220.html
摘要:整合到本文更加注重代碼實踐,對于配置相關的知識會一筆帶過,不做過多的詳解。筆者是上傳到私服,然后通過導入。接口是預留給開發者根據不同事件處理業務邏輯的接口。改造筆記二優化邏輯 Moquette簡介 Mqtt作為物聯網比較流行的協議現在已經被大范圍使用,其中也有很多開源的MQTT BROKEN。Moquette是用java基于netty實現的輕量級的MQTT BROKEN. Moquet...
摘要:和二級緩存影響狀態更新,縮短這兩個定時任務周期可減少滯后時間,例如配置更新周期更新周期服務提供者保證服務正常下線。服務提供者延遲下線。 引言 Eureka是Netflix開源的、用于實現服務注冊和發現的服務。Spring Cloud Eureka基于Eureka進行二次封裝,增加了更人性化的UI,使用更為方便。但是由于Eureka本身存在較多緩存,服務狀態更新滯后,最常見的狀況是:服務...
閱讀 3485·2023-04-25 22:45
閱讀 1288·2021-11-11 16:54
閱讀 2793·2019-08-30 15:44
閱讀 3193·2019-08-30 15:44
閱讀 1652·2019-08-30 13:55
閱讀 946·2019-08-29 18:45
閱讀 1200·2019-08-29 17:25
閱讀 1014·2019-08-29 12:59