

記一次有趣的 Netty 源碼問題

起因是一個朋友問我的一個關于 ServerBootstrap 啟動的問題.
相關 issue
ServerBootstrap 的綁定流程如下:

ServerBootstrap.bind ->
    AbstractBootstrap.bind ->
        AbstractBootstrap.doBind ->
            AbstractBootstrap.initAndRegister ->
                AbstractChannel#AbstractUnsafe.register ->
                    eventLoop.execute( () -> AbstractUnsafe.register0)
            doBind0() ->
                channel.eventLoop().execute( () -> channel.bind) ->

AbstractUnsafe.register0 中可能會調用 pipeline.fireChannelActive(), 即:

private void register0(ChannelPromise promise) {
    try {
        boolean firstRegistration = neverRegistered;
        if (firstRegistration && isActive()) {
    } catch (Throwable t) {

并且在 AbstractUnsafe.bind 中也會有 pipeline.fireChannelActive() 的調用, 即:

public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
    boolean wasActive = isActive();
    try {
    } catch (Throwable t) {

    if (!wasActive && isActive()) {
        invokeLater(new OneTimeTask() {
            public void run() {

那么有沒有可能造成了兩次的 pipeline.fireChannelActive() 調用?

我的回答是不會. 為什么呢? 對于直接想知道答案的朋友可以直接閱讀到最后面的 回答總結 兩節..



首先, 根據我們上面所列出的調用流程, 會有 AbstractBootstrap.doBind 的調用, 它的代碼如下:

private ChannelFuture doBind(final SocketAddress localAddress) {
        // 步驟1
        final ChannelFuture regFuture = initAndRegister();
        // 步驟2
        if (regFuture.isDone()) {
            doBind0(regFuture, channel, localAddress, promise);
        } else {
            regFuture.addListener(new ChannelFutureListener() {
                public void operationComplete(ChannelFuture future) throws Exception {
                    doBind0(regFuture, channel, localAddress, promise);

首先在 doBind 中, 執行步驟1, 即調用 initAndRegister 方法, 這個方法會最終調用到AbstractChannel#AbstractUnsafe.register. 而在 AbstractChannel#AbstractUnsafe.register 中, 會通過 eventLoop.execute 的形式將 AbstractUnsafe.register0 的調用提交到任務隊列中(即提交到 eventLoop 線程中, 而當前代碼所在的線程是 main 線程), 即:

public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    // 當前線程是主線程, 因此這個判斷是 false
    if (eventLoop.inEventLoop()) {
    } else {
        try {
            eventLoop.execute(new OneTimeTask() {
                public void run() {
                    // register0 在 eventLoop 線程中執行.
        } catch (Throwable t) {

接著 AbstractBootstrap.initAndRegister 返回, 回到 AbstractBootstrap.doBind 中, 于是執行到步驟2. 注意, 因為 AbstractUnsafe.register0 是在 eventLoop 中執行的, 因此有可能主線程執行到步驟2 時, AbstractUnsafe.register0 已經執行完畢了, 此時必然有 regFuture.isDone() == true; 但也有可能 AbstractUnsafe.register0 沒有來得及執行, 因此此時 regFuture.isDone() == false. 所以上面的步驟2 考慮到了這兩種情況, 因此分別針對這兩種情況做了區分, 即:

// 步驟2
if (regFuture.isDone()) {
    doBind0(regFuture, channel, localAddress, promise);
} else {
    regFuture.addListener(new ChannelFutureListener() {
        public void operationComplete(ChannelFuture future) throws Exception {
            doBind0(regFuture, channel, localAddress, promise);

一般情況下, regFuture.isDone() 為 false, 因為綁定操作是比較費時的, 因此很大幾率會執行到 else 分支, 并且 if 分支和 else 分支從結果上說沒有不同, 而且 if 分支邏輯還更簡單一些, 因此我們以 else 分支來分析吧. 在 else 分支中, 會為 regFuture 設置一個回調監聽器. regFuture 是一個 ChannelFuture, 而 ChannelFuture 代表了一個 Channel 的異步 IO 的操作結果, 因此這里 regFuture 代表了 Channel 注冊(register) 的這個異步 IO 的操作結果.
Netty 這里之所以要為 regFuture 設置一個回調監聽器, 是為了保證 register 和 bind 的時序上的正確性: Channel 的注冊必須要發生在 Channel 的綁定之前.
(關于時序的正確性的問題, 我們在后面有證明)

接下來我們來看一下 AbstractUnsafe.register0 方法:

private void register0(ChannelPromise promise) {
    try {
        // neverRegistered 一開始是 true, 因此 firstRegistration == true
        boolean firstRegistration = neverRegistered;
        neverRegistered = false;
        registered = true;
        // Only fire a channelActive if the channel has never been registered. This prevents firing
        // multiple channel actives if the channel is deregistered and re-registered.
        // firstRegistration == true, 而 isActive() == false, 
        // 因此不會執行到 pipeline.fireChannelActive()
        if (firstRegistration && isActive()) {
    } catch (Throwable t) {
        // Close the channel directly to avoid FD leak.
        safeSetFailure(promise, t);

注意, 我需要再強調一下, 這里 AbstractUnsafe.register0 是在 eventLoop 中執行的.
AbstractUnsafe.register0 中會調用 doRegister() 注冊 NioServerSocketChannel, 然后調用 safeSetSuccess() 設置 promise 的狀態為成功. 而這個 promise 變量是什么呢? 我將 AbstractBootstrap.doBind 的調用鏈寫詳細一些:

AbstractBootstrap.doBind ->
    AbstractBootstrap.initAndRegister ->
        MultithreadEventLoopGroup.register ->
            SingleThreadEventLoop.register -> 
                AbstractChannel#AbstractUnsafe.register ->
                    eventLoop.execute( () -> AbstractUnsafe.register0)

在 SingleThreadEventLoop.register 中會實例化一個 DefaultChannelPromise, 即:

public ChannelFuture register(Channel channel) {
    return register(channel, new DefaultChannelPromise(channel, this));

接著調用重載的 SingleThreadEventLoop.register 方法:

public ChannelFuture register(final Channel channel, final ChannelPromise promise) {
    if (channel == null) {
        throw new NullPointerException("channel");
    if (promise == null) {
        throw new NullPointerException("promise");

    channel.unsafe().register(this, promise);
    return promise;

我們看到, 實例化的 DefaultChannelPromise 最終會以方法返回值的方式返回到調用方, 即返回到 AbstractBootstrap.doBind 中:

final ChannelFuture regFuture = initAndRegister();

因此我們這里有一個共識: regFuture 是一個在 SingleThreadEventLoop.register 中實例化的 DefaultChannelPromise 對象.

再回到 SingleThreadEventLoop.register 中, 在這里會調用 channel.unsafe().register(this, promise), 將 promise 對象傳遞到 AbstractChannel#AbstractUnsafe.register 中, 因此在 AbstractUnsafe.register0 中的 promise 就是 AbstractBootstrap.doBind 中的 regFuture.
promise == regFuture 很關鍵.

既然我們已經確定了 promise 的身份, 那么調用的 safeSetSuccess(promise); 我們也知道是干嘛的了. safeSetSuccess 方法設置一個 Promise 的狀態為成功態, 而 Promise 的 成功態 是最終狀態, 即此時 promise.isDone() == true. 那么 設置 promise 為成功態后, 會發生什么呢?
還記得不 promise == regFuture, 而我們在 AbstractBootstrap.doBind 的 else 分支中設置了一個回調監聽器:

final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
    public void operationComplete(ChannelFuture future) throws Exception {
        Throwable cause = future.cause();
        if (cause != null) {
            // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
            // IllegalStateException once we try to access the EventLoop of the Channel.
        } else {
            // Registration was successful, so set the correct executor to use.
            // See https://github.com/netty/netty/issues/2586
            promise.executor = channel.eventLoop();
        doBind0(regFuture, channel, localAddress, promise);

因此當 safeSetSuccess(promise); 調用時, 根據 Netty 的 Promise/Future 機制, 會觸發上面的 operationComplete 回調, 在回調中調用 doBind0 方法:

private static void doBind0(
        final ChannelFuture regFuture, final Channel channel,
        final SocketAddress localAddress, final ChannelPromise promise) {
    // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
    // the pipeline in its channelRegistered() implementation.
    channel.eventLoop().execute(new Runnable() {
        public void run() {
            if (regFuture.isSuccess()) {
                channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            } else {

注意到, 有一個關鍵的地方, 代碼中將 **channel.bind** 的調用放到了 eventLoop 中執行. doBind0 返回后, 代碼繼續執行 AbstractUnsafe.register0 方法的剩余部分代碼, 即:

private void register0(ChannelPromise promise) {
    try {
        // safeSetSuccess 返回后, 繼續執行如下代碼
        // Only fire a channelActive if the channel has never been registered. This prevents firing
        // multiple channel actives if the channel is deregistered and re-registered.
        // firstRegistration == true, 而 isActive() == false, 
        // 因此不會執行到 pipeline.fireChannelActive()
        if (firstRegistration && isActive()) {
    } catch (Throwable t) {
        // Close the channel directly to avoid FD leak.
        safeSetFailure(promise, t);

AbstractUnsafe.register0 方法執行完畢后, 才執行到 channel.bind 方法.

channel.bind 方法最終會調用到 AbstractChannel#AbstractUnsafe.bind 方法, 源碼如下:

public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
    boolean wasActive = isActive();
    logger.info("---wasActive: {}---", wasActive);

    try {
        // 調用 NioServerSocketChannel.bind 方法, 
        // 將底層的 Java NIO SocketChannel 綁定到指定的端口.
        // 當 SocketChannel 綁定到端口后, isActive() 才為真.
    } catch (Throwable t) {

    boolean activeNow = isActive();
    logger.info("---activeNow: {}---", activeNow);

    // 這里 wasActive == false
    // isActive() == true
    if (!wasActive && isActive()) {
        invokeLater(new OneTimeTask() {
            public void run() {


上面的代碼中, 調用了 doBind(localAddress) 將底層的 Java NIO SocketChannel 綁定到指定的端口. 并且當 SocketChannel 綁定到端口后, isActive() 才為真.
因此我們知道, 如果 SocketChannel 第一次綁定時, 在調用 doBind 前, wasActive == false == isActive(), 而當調用了 doBind 后, isActive() == true, 因此第一次綁定端口時, if 判斷成立, 會調用 pipeline.fireChannelActive().

關于 Channel 注冊與綁定的時序問題

我們在前的分析中, 直接認定了 Channel 注冊Channel 的綁定 之前完成, 那么依據是什么呢?
其實所有的關鍵在于 EventLoop 的任務隊列機制.
不要閑我啰嗦哦. 我們需要繼續回到 AbstractUnsafe.register0 的調用中(再次強調一下, 在 eventLoop 線程中執行AbstractUnsafe.register0), 這個方法我們已經分析了, 它會調用 safeSetSuccess(promise), 并由 Netty 的 Promise/Future 機制, 導致了AbstractBootstrap.doBind 中的 regFuture 所設置的回調監聽器的 operationComplete 方法調用, 而 operationComplete 中調用了 AbstractBootstrap.doBind0:

private static void doBind0(
        final ChannelFuture regFuture, final Channel channel,
        final SocketAddress localAddress, final ChannelPromise promise) {
    channel.eventLoop().execute(new Runnable() {
        public void run() {
            if (regFuture.isSuccess()) {
                channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            } else {

doBind0 中, 根據 EventLoop 的任務隊列機制, 會使用 eventLoop().execute 將 channel.bind 封裝為一個 Task, 放到 eventLoop 的 taskQueue 中.


而當 channel.bind 被調度時, AbstractUnsafe.register0 早就已經調用結束了.

因此由于 EventLoop 的任務隊列機制, 我們知道, 在執行 AbstractUnsafe.register0 時, 是在 EventLoop 線程中的, 而 channel.bind 的調用是以 task 的形式添加到 taskQueue 隊列的末尾, 因此必然是有 EventLoop 線程先執行完 AbstractUnsafe.register0 方法后, 才有機會從 taskQueue 中取出一個 task 來執行, 因此這個機制從根本上保證了 Channel 注冊發生在綁定 之前.


你的疑惑是, AbstractChannel#AbstractUnsafe.register0 中, 可能會調用 pipeline.fireChannelActive(), 即:

private void register0(ChannelPromise promise) {
    try {
        boolean firstRegistration = neverRegistered;
        if (firstRegistration && isActive()) {
    } catch (Throwable t) {

并且在 AbstractChannel#AbstractUnsafe.bind 中也可能會調用到pipeline.fireChannelActive(), 即:

public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
    boolean wasActive = isActive();
    try {
    } catch (Throwable t) {

    if (!wasActive && isActive()) {
        invokeLater(new OneTimeTask() {
            public void run() {

我覺得是 不會. 因為根據上面我們分析的結果可知, Netty 的 Promise/Future 與 EventLoop 的任務隊列機制保證了 NioServerSocketChannel 的注冊和 NioServerSocketChannel 的綁定的時序: Channel 的注冊必須要發生在 Channel 的綁定之前, 而當一個 NioServerSocketChannel 沒有綁定到具體的端口前, 它是不活躍的(Inactive), 因此在 register0 中, if (firstRegistration && isActive()) 就不成立, 進而就不會執行到 pipeline.fireChannelActive() 了.
而執行完注冊操作后, 在 AbstractChannel#AbstractUnsafe.bind 才會調用pipeline.fireChannelActive(), 因此最終只有一次 fireChannelActive 調用.



isActive() == true 成立的關鍵是此 NioServerSocketChannel 已經綁定到端口上了.

由 Promise/Future 與 EventLoop 機制, 導致了 Channel 的注冊 發生在 Channel 的綁定 之前, 因此在 AbstractChannel#AbstractUnsafe.register0 中的 isActive() == false, if 判斷不成立, 最終就是 register0 中的 pipeline.fireChannelActive() 不會被調用.




