摘要:在公開的方法中,為的設置了繼承于回調句柄。如此看來,如果想要異步通信完畢后,處理一些回調,則只需實現,并在適當的位置設置到的的里。在其保護方法里,創建了對象,并傳入了。
ActiveMQChannelHandler
NettyConnector在公開的start方法中,為Channel的pipeline設置了ActiveMQChannelHandler(繼承于io.netty.channel.ChannelDuplexHandler)回調句柄。
ActiveMQChannelHandler其構造函數定義如下:
ActiveMQChannelHandler(final ChannelGroup group, final BufferHandler handler, final BaseConnectionLifeCycleListener listener)
可見它接收了一個BufferHandler對象。在其channelRead這個callback方法中,調用了這個BufferHandler對象bufferReceived方法。
如此看來,如果想要Netty異步通信完畢后,處理一些回調,則只需實現BufferHandler,并在適當的位置設置到Netty的Channel的pipeline里。
BufferHandlerClientSessionFactoryImpl在其保護方法createConnector里,創建了NettyConnector對象,并傳入了DelegatingBufferHandler。
DelegatingBufferHandler實現了BufferHandler,可用來處理Netty回調。
DelegatingBufferHandler定義如下,它是定義在ClientSessionFactoryImpl類里的:
private class DelegatingBufferHandler implements BufferHandler { @Override public void bufferReceived(final Object connectionID, final ActiveMQBuffer buffer) { RemotingConnection theConn = connection; if (theConn != null && connectionID.equals(theConn.getID())) { theConn.bufferReceived(connectionID, buffer); } else { logger.debug("TheConn == null on ClientSessionFactoryImpl::DelegatingBufferHandler, ignoring packet"); } } }
也就是說,在Netty執行回調時,會調用ClientSessionFactory中的成員對象connection(類型:RemotingConnection)的bufferReceived方法來處理數據。
RemotingConnection實際上RemotingConnection也是一種BufferHandler
RemotingConnection(Impl)實現了bufferReceived(connectionID, buffer)方法,該方法會根據傳入的buffer來decode出一個package。
bufferReceived
=> doBufferReceived (以下ChannelImpl對應的實例,是根據decode出來的package對應的channelID,到RemotingConnectionImpl包含的channel集合里取得的)
=> ChannelImpl::doBufferReceived
=> ChannelImpl::handlePacket
=> ChannelImpl::clearUpTo
=> commandConfirmationHandler.commandConfirmed(packet)
以ArtemisMQMessageProducer為例:
他的send方法中,最后是調用core api的ClientProducer的send方法的,傳入一個core api的handler —— CompletionListenerWrapper(繼承于SendAcknowledgementHandler類型),它包裝了JMS的CompletionListener。
再轉到ClientProducer的send方法, 它又調用了doSend方法,
然后它又調用了sendRegularMessage方法,它又調用了sessionContext.sendFullMessage方法。
在sessionContext.sendFullMessage方法里,可以看到,handler被包裝到packet里了,并且傳給了sessionChannel.sendBatched(packet)方法去異步發送了。
在服務器返回的packet里,也會帶有這個handler,然后BufferHandler的實現者RemotingConnection(Impl)的bufferReceived方法會被回調,它會解析服務器回傳的packet里的handler進行執行。
CompletionListenerWrapper類定義:packet是SessionSendMessage類型的消息的別名
sessionContext.sendFullMessage方法里負責將SendAcknowledgementHandler包裝到SessionSendMessage類型的packet里,然后才發送至服務器
服務器返回的packet,也會首先被轉換成SessionSendMessage類型,然后獲取里面包含的SendAcknowledgementHandler類型的回調handler執行回調。
private static final class CompletionListenerWrapper implements SendAcknowledgementHandler { private final CompletionListener completionListener; private final Message jmsMessage; private final ActiveMQMessageProducer producer; /** * @param jmsMessage * @param producer */ private CompletionListenerWrapper(CompletionListener listener, Message jmsMessage, ActiveMQMessageProducer producer) { this.completionListener = listener; this.jmsMessage = jmsMessage; this.producer = producer; } @Override public void sendAcknowledged(org.apache.activemq.artemis.api.core.Message clientMessage) { if (jmsMessage instanceof StreamMessage) { try { ((StreamMessage) jmsMessage).reset(); } catch (JMSException e) { // HORNETQ-1209 XXX ignore? } } if (jmsMessage instanceof BytesMessage) { try { ((BytesMessage) jmsMessage).reset(); } catch (JMSException e) { // HORNETQ-1209 XXX ignore? } } try { producer.connection.getThreadAwareContext().setCurrentThread(true); completionListener.onCompletion(jmsMessage); } finally { producer.connection.getThreadAwareContext().clearCurrentThread(true); } } @Override public String toString() { return CompletionListenerWrapper.class.getSimpleName() + "( completionListener=" + completionListener + ")"; } }
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/66894.html
摘要:還自動配置發送和接收消息所需的基礎設施。支持是一個輕量級的可靠的可伸縮的可移植的消息代理,基于協議,使用通過協議進行通信。 32. 消息傳遞 Spring框架為與消息傳遞系統集成提供了廣泛的支持,從使用JmsTemplate簡化的JMS API到使用完整的基礎設施異步接收消息,Spring AMQP為高級消息隊列協議提供了類似的特性集。Spring Boot還為RabbitTempla...
摘要:通過以上修改保證了客戶端連接能夠快速的斷開,在應用重啟時不會持續往這邊發送消息,我使用進行壓測,重啟消費者過程中,消息都正常。 showImg(https://segmentfault.com/img/bVbjWjt?w=470&h=200);2018年6月份,我們開發了兩個使用Artemis做消息隊列實現的積分模塊和PUSH推送模塊,在幾輪測試以后,大家信心滿滿的正式上線了,而且經過...
摘要:本文旨在指出中集成的一些性能陷阱,在另一篇文章各組件詳解里有組件介紹及如何正確使用的內容。因此的做法會大大降低性能,并且將大部分的時間都花在反復重建這些對象上。提供的可以讓使用避免頻繁創建的問題。至于使用的性能測試則留給同學自己做了。 Github 本文旨在指出Spring/Spring Boot中集成JMS的一些性能陷阱,在另一篇文章Spring JMS各組件詳解里有Spring J...
閱讀 1565·2021-10-25 09:44
閱讀 2926·2021-09-04 16:48
閱讀 1543·2019-08-30 15:44
閱讀 2475·2019-08-30 15:44
閱讀 1731·2019-08-30 15:44
閱讀 2816·2019-08-30 14:14
閱讀 2964·2019-08-30 13:00
閱讀 2143·2019-08-30 11:09