国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專欄INFORMATION COLUMN

Artemis的JMS客戶端中的CompletionHandler是如何在artemis core

Edison / 983人閱讀

摘要:在公開的方法中,為的設置了繼承于回調句柄。如此看來,如果想要異步通信完畢后,處理一些回調,則只需實現,并在適當的位置設置到的的里。在其保護方法里,創建了對象,并傳入了。

ActiveMQChannelHandler

NettyConnector在公開的start方法中,為Channel的pipeline設置了ActiveMQChannelHandler(繼承于io.netty.channel.ChannelDuplexHandler)回調句柄。
ActiveMQChannelHandler其構造函數定義如下:

ActiveMQChannelHandler(final ChannelGroup group,
                       final BufferHandler handler,
                       final BaseConnectionLifeCycleListener listener)

可見它接收了一個BufferHandler對象。在其channelRead這個callback方法中,調用了這個BufferHandler對象bufferReceived方法。

如此看來,如果想要Netty異步通信完畢后,處理一些回調,則只需實現BufferHandler,并在適當的位置設置到Netty的Channel的pipeline里。

BufferHandler

ClientSessionFactoryImpl在其保護方法createConnector里,創建了NettyConnector對象,并傳入了DelegatingBufferHandler。
DelegatingBufferHandler實現了BufferHandler,可用來處理Netty回調。

DelegatingBufferHandler

DelegatingBufferHandler定義如下,它是定義在ClientSessionFactoryImpl類里的:

private class DelegatingBufferHandler implements BufferHandler {

      @Override
      public void bufferReceived(final Object connectionID, final ActiveMQBuffer buffer) {
         RemotingConnection theConn = connection;

         if (theConn != null && connectionID.equals(theConn.getID())) {
            theConn.bufferReceived(connectionID, buffer);
         } else {
            logger.debug("TheConn == null on ClientSessionFactoryImpl::DelegatingBufferHandler, ignoring packet");
         }
      }
   }

也就是說,在Netty執行回調時,會調用ClientSessionFactory中的成員對象connection(類型:RemotingConnection)的bufferReceived方法來處理數據。

實際上RemotingConnection也是一種BufferHandler

RemotingConnection

RemotingConnection(Impl)實現了bufferReceived(connectionID, buffer)方法,該方法會根據傳入的buffer來decode出一個package。
bufferReceived
=> doBufferReceived (以下ChannelImpl對應的實例,是根據decode出來的package對應的channelID,到RemotingConnectionImpl包含的channel集合里取得的)
=> ChannelImpl::doBufferReceived
=> ChannelImpl::handlePacket
=> ChannelImpl::clearUpTo
=> commandConfirmationHandler.commandConfirmed(packet)

舉例:Artemis中實現的JMS規范下的Producer在異步投遞消息后的回調函數是如何被調用的

以ArtemisMQMessageProducer為例:

他的send方法中,最后是調用core api的ClientProducer的send方法的,傳入一個core api的handler —— CompletionListenerWrapper(繼承于SendAcknowledgementHandler類型),它包裝了JMS的CompletionListener。

再轉到ClientProducer的send方法, 它又調用了doSend方法,

然后它又調用了sendRegularMessage方法,它又調用了sessionContext.sendFullMessage方法。

在sessionContext.sendFullMessage方法里,可以看到,handler被包裝到packet里了,并且傳給了sessionChannel.sendBatched(packet)方法去異步發送了。

在服務器返回的packet里,也會帶有這個handler,然后BufferHandler的實現者RemotingConnection(Impl)的bufferReceived方法會被回調,它會解析服務器回傳的packet里的handler進行執行。

packet是SessionSendMessage類型的消息的別名
sessionContext.sendFullMessage方法里負責將SendAcknowledgementHandler包裝到SessionSendMessage類型的packet里,然后才發送至服務器
服務器返回的packet,也會首先被轉換成SessionSendMessage類型,然后獲取里面包含的SendAcknowledgementHandler類型的回調handler執行回調。

CompletionListenerWrapper類定義:
private static final class CompletionListenerWrapper implements SendAcknowledgementHandler {

      private final CompletionListener completionListener;
      private final Message jmsMessage;
      private final ActiveMQMessageProducer producer;

      /**
       * @param jmsMessage
       * @param producer
       */
      private CompletionListenerWrapper(CompletionListener listener,
                                        Message jmsMessage,
                                        ActiveMQMessageProducer producer) {
         this.completionListener = listener;
         this.jmsMessage = jmsMessage;
         this.producer = producer;
      }

      @Override
      public void sendAcknowledged(org.apache.activemq.artemis.api.core.Message clientMessage) {
         if (jmsMessage instanceof StreamMessage) {
            try {
               ((StreamMessage) jmsMessage).reset();
            } catch (JMSException e) {
               // HORNETQ-1209 XXX ignore?
            }
         }
         if (jmsMessage instanceof BytesMessage) {
            try {
               ((BytesMessage) jmsMessage).reset();
            } catch (JMSException e) {
               // HORNETQ-1209 XXX ignore?
            }
         }

         try {
            producer.connection.getThreadAwareContext().setCurrentThread(true);
            completionListener.onCompletion(jmsMessage);
         } finally {
            producer.connection.getThreadAwareContext().clearCurrentThread(true);
         }
      }

      @Override
      public String toString() {
         return CompletionListenerWrapper.class.getSimpleName() + "( completionListener=" + completionListener + ")";
      }
   }

文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。

轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/66894.html

相關文章

  • Spring Boot 參考指南(消息傳遞)

    摘要:還自動配置發送和接收消息所需的基礎設施。支持是一個輕量級的可靠的可伸縮的可移植的消息代理,基于協議,使用通過協議進行通信。 32. 消息傳遞 Spring框架為與消息傳遞系統集成提供了廣泛的支持,從使用JmsTemplate簡化的JMS API到使用完整的基礎設施異步接收消息,Spring AMQP為高級消息隊列協議提供了類似的特性集。Spring Boot還為RabbitTempla...

    Doyle 評論0 收藏0
  • ArtemisMQ“未消費之謎”

    摘要:通過以上修改保證了客戶端連接能夠快速的斷開,在應用重啟時不會持續往這邊發送消息,我使用進行壓測,重啟消費者過程中,消息都正常。 showImg(https://segmentfault.com/img/bVbjWjt?w=470&h=200);2018年6月份,我們開發了兩個使用Artemis做消息隊列實現的積分模塊和PUSH推送模塊,在幾輪測試以后,大家信心滿滿的正式上線了,而且經過...

    tomato 評論0 收藏0
  • 使用Spring/Spring Boot集成JMS陷阱

    摘要:本文旨在指出中集成的一些性能陷阱,在另一篇文章各組件詳解里有組件介紹及如何正確使用的內容。因此的做法會大大降低性能,并且將大部分的時間都花在反復重建這些對象上。提供的可以讓使用避免頻繁創建的問題。至于使用的性能測試則留給同學自己做了。 Github 本文旨在指出Spring/Spring Boot中集成JMS的一些性能陷阱,在另一篇文章Spring JMS各組件詳解里有Spring J...

    xcold 評論0 收藏0

發表評論

0條評論

最新活動
閱讀需要支付1元查看
<