摘要:背景起因是一個朋友問我的一個關于啟動的問題相關他的問題我復述一下的綁定流程如下在中可能會調用即并且在中也會有的調用即那么有沒有可能造成了兩次的調用我的回答是不會為什么呢對于直接想知道答案的朋友可以直接閱讀到最后面的回答與總
背景
起因是一個朋友問我的一個關于 ServerBootstrap 啟動的問題.
相關 issue
他的問題我復述一下:
ServerBootstrap 的綁定流程如下:
ServerBootstrap.bind -> AbstractBootstrap.bind -> AbstractBootstrap.doBind -> AbstractBootstrap.initAndRegister -> AbstractChannel#AbstractUnsafe.register -> eventLoop.execute( () -> AbstractUnsafe.register0) doBind0() -> channel.eventLoop().execute( () -> channel.bind) -> AbstractUnsafe.bind
在 AbstractUnsafe.register0 中可能會調用 pipeline.fireChannelActive(), 即:
private void register0(ChannelPromise promise) { try { ... boolean firstRegistration = neverRegistered; doRegister(); ... if (firstRegistration && isActive()) { pipeline.fireChannelActive(); } } catch (Throwable t) { ... } }
并且在 AbstractUnsafe.bind 中也會有 pipeline.fireChannelActive() 的調用, 即:
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) { ... boolean wasActive = isActive(); try { doBind(localAddress); } catch (Throwable t) { ... } if (!wasActive && isActive()) { invokeLater(new OneTimeTask() { @Override public void run() { pipeline.fireChannelActive(); } }); } ... }
那么有沒有可能造成了兩次的 pipeline.fireChannelActive() 調用?
我的回答是不會. 為什么呢? 對于直接想知道答案的朋友可以直接閱讀到最后面的 回答 與 總結 兩節..
下面我們就來根據代碼詳細分析一下.
分析首先, 根據我們上面所列出的調用流程, 會有 AbstractBootstrap.doBind 的調用, 它的代碼如下:
private ChannelFuture doBind(final SocketAddress localAddress) { // 步驟1 final ChannelFuture regFuture = initAndRegister(); ... // 步驟2 if (regFuture.isDone()) { ... doBind0(regFuture, channel, localAddress, promise); ... } else { regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { ... doBind0(regFuture, channel, localAddress, promise); } }); } }
首先在 doBind 中, 執行步驟1, 即調用 initAndRegister 方法, 這個方法會最終調用到AbstractChannel#AbstractUnsafe.register. 而在 AbstractChannel#AbstractUnsafe.register 中, 會通過 eventLoop.execute 的形式將 AbstractUnsafe.register0 的調用提交到任務隊列中(即提交到 eventLoop 線程中, 而當前代碼所在的線程是 main 線程), 即:
@Override public final void register(EventLoop eventLoop, final ChannelPromise promise) { // 當前線程是主線程, 因此這個判斷是 false if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(new OneTimeTask() { @Override public void run() { // register0 在 eventLoop 線程中執行. register0(promise); } }); } catch (Throwable t) { ... } } }
接著 AbstractBootstrap.initAndRegister 返回, 回到 AbstractBootstrap.doBind 中, 于是執行到步驟2. 注意, 因為 AbstractUnsafe.register0 是在 eventLoop 中執行的, 因此有可能主線程執行到步驟2 時, AbstractUnsafe.register0 已經執行完畢了, 此時必然有 regFuture.isDone() == true; 但也有可能 AbstractUnsafe.register0 沒有來得及執行, 因此此時 regFuture.isDone() == false. 所以上面的步驟2 考慮到了這兩種情況, 因此分別針對這兩種情況做了區分, 即:
// 步驟2 if (regFuture.isDone()) { ... doBind0(regFuture, channel, localAddress, promise); ... } else { regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { ... doBind0(regFuture, channel, localAddress, promise); } }); }
一般情況下, regFuture.isDone() 為 false, 因為綁定操作是比較費時的, 因此很大幾率會執行到 else 分支, 并且 if 分支和 else 分支從結果上說沒有不同, 而且 if 分支邏輯還更簡單一些, 因此我們以 else 分支來分析吧. 在 else 分支中, 會為 regFuture 設置一個回調監聽器. regFuture 是一個 ChannelFuture, 而 ChannelFuture 代表了一個 Channel 的異步 IO 的操作結果, 因此這里 regFuture 代表了 Channel 注冊(register) 的這個異步 IO 的操作結果.
Netty 這里之所以要為 regFuture 設置一個回調監聽器, 是為了保證 register 和 bind 的時序上的正確性: Channel 的注冊必須要發生在 Channel 的綁定之前.
(關于時序的正確性的問題, 我們在后面有證明)
接下來我們來看一下 AbstractUnsafe.register0 方法:
private void register0(ChannelPromise promise) { try { .... // neverRegistered 一開始是 true, 因此 firstRegistration == true boolean firstRegistration = neverRegistered; doRegister(); neverRegistered = false; registered = true; safeSetSuccess(promise); pipeline.fireChannelRegistered(); // Only fire a channelActive if the channel has never been registered. This prevents firing // multiple channel actives if the channel is deregistered and re-registered. // firstRegistration == true, 而 isActive() == false, // 因此不會執行到 pipeline.fireChannelActive() if (firstRegistration && isActive()) { pipeline.fireChannelActive(); } } catch (Throwable t) { // Close the channel directly to avoid FD leak. closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } }
注意, 我需要再強調一下, 這里 AbstractUnsafe.register0 是在 eventLoop 中執行的.
AbstractUnsafe.register0 中會調用 doRegister() 注冊 NioServerSocketChannel, 然后調用 safeSetSuccess() 設置 promise 的狀態為成功. 而這個 promise 變量是什么呢? 我將 AbstractBootstrap.doBind 的調用鏈寫詳細一些:
AbstractBootstrap.doBind -> AbstractBootstrap.initAndRegister -> MultithreadEventLoopGroup.register -> SingleThreadEventLoop.register -> AbstractChannel#AbstractUnsafe.register -> eventLoop.execute( () -> AbstractUnsafe.register0)
在 SingleThreadEventLoop.register 中會實例化一個 DefaultChannelPromise, 即:
@Override public ChannelFuture register(Channel channel) { return register(channel, new DefaultChannelPromise(channel, this)); }
接著調用重載的 SingleThreadEventLoop.register 方法:
@Override public ChannelFuture register(final Channel channel, final ChannelPromise promise) { if (channel == null) { throw new NullPointerException("channel"); } if (promise == null) { throw new NullPointerException("promise"); } channel.unsafe().register(this, promise); return promise; }
我們看到, 實例化的 DefaultChannelPromise 最終會以方法返回值的方式返回到調用方, 即返回到 AbstractBootstrap.doBind 中:
final ChannelFuture regFuture = initAndRegister();
因此我們這里有一個共識: regFuture 是一個在 SingleThreadEventLoop.register 中實例化的 DefaultChannelPromise 對象.
再回到 SingleThreadEventLoop.register 中, 在這里會調用 channel.unsafe().register(this, promise), 將 promise 對象傳遞到 AbstractChannel#AbstractUnsafe.register 中, 因此在 AbstractUnsafe.register0 中的 promise 就是 AbstractBootstrap.doBind 中的 regFuture.
promise == regFuture 很關鍵.
既然我們已經確定了 promise 的身份, 那么調用的 safeSetSuccess(promise); 我們也知道是干嘛的了. safeSetSuccess 方法設置一個 Promise 的狀態為成功態, 而 Promise 的 成功態 是最終狀態, 即此時 promise.isDone() == true. 那么 設置 promise 為成功態后, 會發生什么呢?
還記得不 promise == regFuture, 而我們在 AbstractBootstrap.doBind 的 else 分支中設置了一個回調監聽器:
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel); regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { Throwable cause = future.cause(); if (cause != null) { // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an // IllegalStateException once we try to access the EventLoop of the Channel. promise.setFailure(cause); } else { // Registration was successful, so set the correct executor to use. // See https://github.com/netty/netty/issues/2586 promise.executor = channel.eventLoop(); } doBind0(regFuture, channel, localAddress, promise); } });
因此當 safeSetSuccess(promise); 調用時, 根據 Netty 的 Promise/Future 機制, 會觸發上面的 operationComplete 回調, 在回調中調用 doBind0 方法:
private static void doBind0( final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) { // This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up // the pipeline in its channelRegistered() implementation. channel.eventLoop().execute(new Runnable() { @Override public void run() { if (regFuture.isSuccess()) { channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { promise.setFailure(regFuture.cause()); } } }); }
注意到, 有一個關鍵的地方, 代碼中將 **channel.bind** 的調用放到了 eventLoop 中執行. doBind0 返回后, 代碼繼續執行 AbstractUnsafe.register0 方法的剩余部分代碼, 即:
private void register0(ChannelPromise promise) { try { .... safeSetSuccess(promise); // safeSetSuccess 返回后, 繼續執行如下代碼 pipeline.fireChannelRegistered(); // Only fire a channelActive if the channel has never been registered. This prevents firing // multiple channel actives if the channel is deregistered and re-registered. // firstRegistration == true, 而 isActive() == false, // 因此不會執行到 pipeline.fireChannelActive() if (firstRegistration && isActive()) { pipeline.fireChannelActive(); } } catch (Throwable t) { // Close the channel directly to avoid FD leak. closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } }
當 AbstractUnsafe.register0 方法執行完畢后, 才執行到 channel.bind 方法.
而 channel.bind 方法最終會調用到 AbstractChannel#AbstractUnsafe.bind 方法, 源碼如下:
@Override public final void bind(final SocketAddress localAddress, final ChannelPromise promise) { boolean wasActive = isActive(); logger.info("---wasActive: {}---", wasActive); try { // 調用 NioServerSocketChannel.bind 方法, // 將底層的 Java NIO SocketChannel 綁定到指定的端口. // 當 SocketChannel 綁定到端口后, isActive() 才為真. doBind(localAddress); } catch (Throwable t) { ... } boolean activeNow = isActive(); logger.info("---activeNow: {}---", activeNow); // 這里 wasActive == false // isActive() == true if (!wasActive && isActive()) { invokeLater(new OneTimeTask() { @Override public void run() { pipeline.fireChannelActive(); } }); } safeSetSuccess(promise); }
上面的代碼中, 調用了 doBind(localAddress) 將底層的 Java NIO SocketChannel 綁定到指定的端口. 并且當 SocketChannel 綁定到端口后, isActive() 才為真.
因此我們知道, 如果 SocketChannel 第一次綁定時, 在調用 doBind 前, wasActive == false == isActive(), 而當調用了 doBind 后, isActive() == true, 因此第一次綁定端口時, if 判斷成立, 會調用 pipeline.fireChannelActive().
我們在前的分析中, 直接認定了 Channel 注冊 在 Channel 的綁定 之前完成, 那么依據是什么呢?
其實所有的關鍵在于 EventLoop 的任務隊列機制.
不要閑我啰嗦哦. 我們需要繼續回到 AbstractUnsafe.register0 的調用中(再次強調一下, 在 eventLoop 線程中執行AbstractUnsafe.register0), 這個方法我們已經分析了, 它會調用 safeSetSuccess(promise), 并由 Netty 的 Promise/Future 機制, 導致了AbstractBootstrap.doBind 中的 regFuture 所設置的回調監聽器的 operationComplete 方法調用, 而 operationComplete 中調用了 AbstractBootstrap.doBind0:
private static void doBind0( final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) { channel.eventLoop().execute(new Runnable() { @Override public void run() { if (regFuture.isSuccess()) { channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { promise.setFailure(regFuture.cause()); } } }); }
在 doBind0 中, 根據 EventLoop 的任務隊列機制, 會使用 eventLoop().execute 將 channel.bind 封裝為一個 Task, 放到 eventLoop 的 taskQueue 中.
如下用一幅圖表示上面的過程:
原圖在此
而當 channel.bind 被調度時, AbstractUnsafe.register0 早就已經調用結束了.
因此由于 EventLoop 的任務隊列機制, 我們知道, 在執行 AbstractUnsafe.register0 時, 是在 EventLoop 線程中的, 而 channel.bind 的調用是以 task 的形式添加到 taskQueue 隊列的末尾, 因此必然是有 EventLoop 線程先執行完 AbstractUnsafe.register0 方法后, 才有機會從 taskQueue 中取出一個 task 來執行, 因此這個機制從根本上保證了 Channel 注冊發生在綁定 之前.
回答你的疑惑是, AbstractChannel#AbstractUnsafe.register0 中, 可能會調用 pipeline.fireChannelActive(), 即:
private void register0(ChannelPromise promise) { try { ... boolean firstRegistration = neverRegistered; doRegister(); ... if (firstRegistration && isActive()) { pipeline.fireChannelActive(); } } catch (Throwable t) { ... } }
并且在 AbstractChannel#AbstractUnsafe.bind 中也可能會調用到pipeline.fireChannelActive(), 即:
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) { ... boolean wasActive = isActive(); try { doBind(localAddress); } catch (Throwable t) { ... } if (!wasActive && isActive()) { invokeLater(new OneTimeTask() { @Override public void run() { pipeline.fireChannelActive(); } }); } ... }
我覺得是 不會. 因為根據上面我們分析的結果可知, Netty 的 Promise/Future 與 EventLoop 的任務隊列機制保證了 NioServerSocketChannel 的注冊和 NioServerSocketChannel 的綁定的時序: Channel 的注冊必須要發生在 Channel 的綁定之前, 而當一個 NioServerSocketChannel 沒有綁定到具體的端口前, 它是不活躍的(Inactive), 因此在 register0 中, if (firstRegistration && isActive()) 就不成立, 進而就不會執行到 pipeline.fireChannelActive() 了.
而執行完注冊操作后, 在 AbstractChannel#AbstractUnsafe.bind 才會調用pipeline.fireChannelActive(), 因此最終只有一次 fireChannelActive 調用.
有兩點需要注意的:
isActive() == true 成立的關鍵是此 NioServerSocketChannel 已經綁定到端口上了.
由 Promise/Future 與 EventLoop 機制, 導致了 Channel 的注冊 發生在 Channel 的綁定 之前, 因此在 AbstractChannel#AbstractUnsafe.register0 中的 isActive() == false, if 判斷不成立, 最終就是 register0 中的 pipeline.fireChannelActive() 不會被調用.
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/65271.html
摘要:盡可能地將數據寫入,例如創建設置的都會將數據立即的寫入再來看看文檔怎么描述的看看這可愛的默認值我們終于知道了當我們不做任何設置時,默認采用的是方式顯而易見,使用方式能最大限度的減少與的交互,而在大多數場景下都是沒有問題的。 0.問題背景 此次問題源于一次挺嚴重的生產事故:客戶的訂單被重復生成了,而出問題的代碼其實很簡單: // .... redisLockUtil.lock(membe...
摘要:前言上個月月底開源組開源了使用適配人人企業版專業版的前端工程具體詳情見人人企業版適配發布。當然,也督促自己產出一篇相關的文章,來記錄這次有趣的學習之旅。 Created by huqi at 2019-5-5 13:01:14 Updated by huqi at 2019-5-20 15:57:37 前言 上個月月底@D2開源組 開源了使用 D2Admin 適配 人人企業版(專業版) 的...
閱讀 1686·2019-08-30 15:54
閱讀 3340·2019-08-26 17:15
閱讀 3529·2019-08-26 13:49
閱讀 2587·2019-08-26 13:38
閱讀 2298·2019-08-26 12:08
閱讀 3054·2019-08-26 10:41
閱讀 1374·2019-08-26 10:24
閱讀 3382·2019-08-23 18:35