国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專欄INFORMATION COLUMN

Dubbo 源碼分析 - 服務調用過程

Travis / 2086人閱讀

摘要:服務調用過程比較復雜,包含眾多步驟。源碼分析在進行源碼分析之前,我們先來通過一張圖了解服務調用過程。服務調用方式支持同步和異步兩種調用方式,其中異步調用還可細分為有返回值的異步調用和無返回值的異步調用。

注: 本系列文章已捐贈給 Dubbo 社區,你也可以在 Dubbo 官方文檔中閱讀本系列文章。

1. 簡介

在前面的文章中,我們分析了 Dubbo SPI、服務導出與引入、以及集群容錯方面的代碼。經過前文的鋪墊,本篇文章我們終于可以分析服務調用過程了。Dubbo 服務調用過程比較復雜,包含眾多步驟。比如發送請求、編解碼、服務降級、過濾器鏈處理、序列化、線程派發以及響應請求等步驟。限于篇幅原因,本篇文章無法對所有的步驟一一進行分析。本篇文章將會重點分析請求的發送與接收、編解碼、線程派發以及響應的發送與接收等過程,至于服務降級、過濾器鏈和序列化大家自行進行分析,也可以將其當成一個黑盒,暫時忽略也沒關系。介紹完本篇文章要分析的內容,接下來我們進入正題吧。

2. 源碼分析

在進行源碼分析之前,我們先來通過一張圖了解 Dubbo 服務調用過程。

首先服務消費者通過代理對象 Proxy 發起遠程調用,接著通過網絡客戶端 Client 將編碼后的請求發送給服務提供方的網絡層上,也就是 Server。Server 在收到請求后,首先要做的事情是對數據包進行解碼。然后將解碼后的請求發送至分發器 Dispatcher,再由分發器將請求派發到指定的線程池上,最后由線程池調用具體的服務。這就是一個遠程調用請求的發送與接收過程。至于響應的發送與接收過程,這張圖中沒有表現出來。對于這兩個過程,我們也會進行詳細分析。

2.1 服務調用方式

Dubbo 支持同步和異步兩種調用方式,其中異步調用還可細分為“有返回值”的異步調用和“無返回值”的異步調用。所謂“無返回值”異步調用是指服務消費方只管調用,但不關心調用結果,此時 Dubbo 會直接返回一個空的 RpcResult。若要使用異步特性,需要服務消費方手動進行配置。默認情況下,Dubbo 使用同步調用方式。

本節以及其他章節將會使用 Dubbo 官方提供的 Demo 分析整個調用過程,下面我們從 DemoService 接口的代理類開始進行分析。Dubbo 默認使用 Javassist 框架為服務接口生成動態代理類,因此我們需要先將代理類進行反編譯才能看到源碼。這里使用阿里開源 Java 應用診斷工具 Arthas 反編譯代理類,結果如下:

/**
 * Arthas 反編譯步驟:
 * 1. 啟動 Arthas
 *    java -jar arthas-boot.jar
 *
 * 2. 輸入編號選擇進程
 *    Arthas 啟動后,會打印 Java 應用進程列表,比如:
 *    [1]: 11232 org.jetbrains.jps.cmdline.Launcher
 *    [2]: 22370 org.jetbrains.jps.cmdline.Launcher
 *    [3]: 22371 com.alibaba.dubbo.demo.consumer.Consumer
 *    [4]: 22362 com.alibaba.dubbo.demo.provider.Provider
 *    [5]: 2074 org.apache.zookeeper.server.quorum.QuorumPeerMain
 * 這里輸入編號 3,讓 Arthas 關聯到啟動類為 com.....Consumer 的 Java 進程上
 *
 * 3. 由于 Demo 項目中只有一個服務接口,因此此接口的代理類類名為 proxy0,此時使用 sc 命令搜索這個類名。
 *    $ sc *.proxy0
 *    com.alibaba.dubbo.common.bytecode.proxy0
 *
 * 4. 使用 jad 命令反編譯 com.alibaba.dubbo.common.bytecode.proxy0
 *    $ jad com.alibaba.dubbo.common.bytecode.proxy0
 *
 * 更多使用方法請參考 Arthas 官方文檔:
 *   https://alibaba.github.io/arthas/quick-start.html
 */
public class proxy0 implements ClassGenerator.DC, EchoService, DemoService {
    // 方法數組
    public static Method[] methods;
    private InvocationHandler handler;

    public proxy0(InvocationHandler invocationHandler) {
        this.handler = invocationHandler;
    }

    public proxy0() {
    }

    public String sayHello(String string) {
        // 將參數存儲到 Object 數組中
        Object[] arrobject = new Object[]{string};
        // 調用 InvocationHandler 實現類的 invoke 方法得到調用結果
        Object object = this.handler.invoke(this, methods[0], arrobject);
        // 返回調用結果
        return (String)object;
    }

    /** 回聲測試方法 */
    public Object $echo(Object object) {
        Object[] arrobject = new Object[]{object};
        Object object2 = this.handler.invoke(this, methods[1], arrobject);
        return object2;
    }
}

如上,代理類的邏輯比較簡單。首先將運行時參數存儲到數組中,然后調用 InvocationHandler 接口實現類的 invoke 方法,得到調用結果,最后將結果轉型并返回給調用方。關于代理類的邏輯就說這么多,繼續向下分析。

public class InvokerInvocationHandler implements InvocationHandler {

    private final Invoker invoker;

    public InvokerInvocationHandler(Invoker handler) {
        this.invoker = handler;
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        String methodName = method.getName();
        Class[] parameterTypes = method.getParameterTypes();
        
        // 攔截定義在 Object 類中的方法(未被子類重寫),比如 wait/notify
        if (method.getDeclaringClass() == Object.class) {
            return method.invoke(invoker, args);
        }
        
        // 如果 toString、hashCode 和 equals 等方法被子類重寫了,這里也直接調用
        if ("toString".equals(methodName) && parameterTypes.length == 0) {
            return invoker.toString();
        }
        if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
            return invoker.hashCode();
        }
        if ("equals".equals(methodName) && parameterTypes.length == 1) {
            return invoker.equals(args[0]);
        }
        
        // 將 method 和 args 封裝到 RpcInvocation 中,并執行后續的調用
        return invoker.invoke(new RpcInvocation(method, args)).recreate();
    }
}

InvokerInvocationHandler 中的 invoker 成員變量類型為 MockClusterInvoker,MockClusterInvoker 內部封裝了服務降級邏輯。下面簡單看一下:

public class MockClusterInvoker implements Invoker {
    
    private final Invoker invoker;
    
    public Result invoke(Invocation invocation) throws RpcException {
        Result result = null;

        // 獲取 mock 配置值
        String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), Constants.MOCK_KEY, Boolean.FALSE.toString()).trim();
        if (value.length() == 0 || value.equalsIgnoreCase("false")) {
            // 無 mock 邏輯,直接調用其他 Invoker 對象的 invoke 方法,
            // 比如 FailoverClusterInvoker
            result = this.invoker.invoke(invocation);
        } else if (value.startsWith("force")) {
            // force:xxx 直接執行 mock 邏輯,不發起遠程調用
            result = doMockInvoke(invocation, null);
        } else {
            // fail:xxx 表示消費方對調用服務失敗后,再執行 mock 邏輯,不拋出異常
            try {
                // 調用其他 Invoker 對象的 invoke 方法
                result = this.invoker.invoke(invocation);
            } catch (RpcException e) {
                if (e.isBiz()) {
                    throw e;
                } else {
                    // 調用失敗,執行 mock 邏輯
                    result = doMockInvoke(invocation, e);
                }
            }
        }
        return result;
    }
    
    // 省略其他方法
}

服務降級不是本文重點,因此這里就不分析 doMockInvoke 方法了。考慮到前文已經詳細分析過 FailoverClusterInvoker,因此本節略過 FailoverClusterInvoker,直接分析 DubboInvoker。

public abstract class AbstractInvoker implements Invoker {
    
    public Result invoke(Invocation inv) throws RpcException {
        if (destroyed.get()) {
            throw new RpcException("Rpc invoker for service ...");
        }
        RpcInvocation invocation = (RpcInvocation) inv;
        // 設置 Invoker
        invocation.setInvoker(this);
        if (attachment != null && attachment.size() > 0) {
            // 設置 attachment
            invocation.addAttachmentsIfAbsent(attachment);
        }
        Map contextAttachments = RpcContext.getContext().getAttachments();
        if (contextAttachments != null && contextAttachments.size() != 0) {
            // 添加 contextAttachments 到 RpcInvocation#attachment 變量中
            invocation.addAttachments(contextAttachments);
        }
        if (getUrl().getMethodParameter(invocation.getMethodName(), Constants.ASYNC_KEY, false)) {
            // 設置異步信息到 RpcInvocation#attachment 中
            invocation.setAttachment(Constants.ASYNC_KEY, Boolean.TRUE.toString());
        }
        RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);

        try {
            // 抽象方法,由子類實現
            return doInvoke(invocation);
        } catch (InvocationTargetException e) {
            // ...
        } catch (RpcException e) {
            // ...
        } catch (Throwable e) {
            return new RpcResult(e);
        }
    }

    protected abstract Result doInvoke(Invocation invocation) throws Throwable;
    
    // 省略其他方法
}

上面的代碼來自 AbstractInvoker 類,其中大部分代碼用于添加信息到 RpcInvocation#attachment 變量中,添加完畢后,調用 doInvoke 執行后續的調用。doInvoke 是一個抽象方法,需要由子類實現,下面到 DubboInvoker 中看一下。

public class DubboInvoker extends AbstractInvoker {
    
    private final ExchangeClient[] clients;
    
    protected Result doInvoke(final Invocation invocation) throws Throwable {
        RpcInvocation inv = (RpcInvocation) invocation;
        final String methodName = RpcUtils.getMethodName(invocation);
        // 設置 path 和 version 到 attachment 中
        inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
        inv.setAttachment(Constants.VERSION_KEY, version);

        ExchangeClient currentClient;
        if (clients.length == 1) {
            // 從 clients 數組中獲取 ExchangeClient
            currentClient = clients[0];
        } else {
            currentClient = clients[index.getAndIncrement() % clients.length];
        }
        try {
            // 獲取異步配置
            boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
            // isOneway 為 true,表示“單向”通信
            boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
            int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);

            // 異步無返回值
            if (isOneway) {
                boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                // 發送請求
                currentClient.send(inv, isSent);
                // 設置上下文中的 future 為 null
                RpcContext.getContext().setFuture(null);
                // 返回一個空的 RpcResult
                return new RpcResult();
            } 

            // 異步有返回值
            else if (isAsync) {
                // 發送請求,獲得 ResponseFuture 實例
                ResponseFuture future = currentClient.request(inv, timeout);
                // 設置 future 到上下文中
                RpcContext.getContext().setFuture(new FutureAdapter(future));
                // 暫時返回一個空結果
                return new RpcResult();
            } 

            // 同步調用
            else {
                RpcContext.getContext().setFuture(null);
                // 發送請求,得到一個 ResponseFuture 實例,并調用該實例的 get 方法進行等待
                return (Result) currentClient.request(inv, timeout).get();
            }
        } catch (TimeoutException e) {
            throw new RpcException(..., "Invoke remote method timeout....");
        } catch (RemotingException e) {
            throw new RpcException(..., "Failed to invoke remote method: ...");
        }
    }
    
    // 省略其他方法
}

上面的代碼包含了 Dubbo 對同步和異步調用的處理邏輯,搞懂了上面的代碼,會對 Dubbo 的同步和異步調用方式有更深入的了解。Dubbo 實現同步和異步調用比較關鍵的一點就在于由誰調用 ResponseFuture 的 get 方法。同步調用模式下,由框架自身調用 ResponseFuture 的 get 方法。異步調用模式下,則由用戶調用該方法。ResponseFuture 是一個接口,下面我們來看一下它的默認實現類 DefaultFuture 的源碼。

public class DefaultFuture implements ResponseFuture {
    
    private static final Map CHANNELS = 
        new ConcurrentHashMap();

    private static final Map FUTURES = 
        new ConcurrentHashMap();
    
    private final long id;
    private final Channel channel;
    private final Request request;
    private final int timeout;
    private final Lock lock = new ReentrantLock();
    private final Condition done = lock.newCondition();
    private volatile Response response;
    
    public DefaultFuture(Channel channel, Request request, int timeout) {
        this.channel = channel;
        this.request = request;
        
        // 獲取請求 id,這個 id 很重要,后面還會見到
        this.id = request.getId();
        this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
        // 存儲  映射關系到 FUTURES 中
        FUTURES.put(id, this);
        CHANNELS.put(id, channel);
    }
    
    @Override
    public Object get() throws RemotingException {
        return get(timeout);
    }

    @Override
    public Object get(int timeout) throws RemotingException {
        if (timeout <= 0) {
            timeout = Constants.DEFAULT_TIMEOUT;
        }
        if (!isDone()) {
            long start = System.currentTimeMillis();
            lock.lock();
            try {
                // 循環檢測服務提供方是否成功返回了調用結果
                while (!isDone()) {
                    // 如果調用結果尚未返回,這里等待一段時間
                    done.await(timeout, TimeUnit.MILLISECONDS);
                    // 如果調用結果成功返回,或等待超時,此時跳出 while 循環,執行后續的邏輯
                    if (isDone() || System.currentTimeMillis() - start > timeout) {
                        break;
                    }
                }
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } finally {
                lock.unlock();
            }
            
            // 如果調用結果仍未返回,則拋出超時異常
            if (!isDone()) {
                throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
            }
        }
        
        // 返回調用結果
        return returnFromResponse();
    }
    
    @Override
    public boolean isDone() {
        // 通過檢測 response 字段為空與否,判斷是否收到了調用結果
        return response != null;
    }
    
    private Object returnFromResponse() throws RemotingException {
        Response res = response;
        if (res == null) {
            throw new IllegalStateException("response cannot be null");
        }
        
        // 如果調用結果的狀態為 Response.OK,則表示調用過程正常,服務提供方成功返回了調用結果
        if (res.getStatus() == Response.OK) {
            return res.getResult();
        }
        
        // 拋出異常
        if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) {
            throw new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage());
        }
        throw new RemotingException(channel, res.getErrorMessage());
    }
    
    // 省略其他方法
}

如上,當服務消費者還未接收到調用結果時,用戶線程調用 get 方法會被阻塞住。同步調用模式下,框架獲得 DefaultFuture 對象后,會立即調用 get 方法進行等待。而異步模式下則是將該對象封裝到 FutureAdapter 實例中,并將 FutureAdapter 實例設置到 RpcContext 中,供用戶使用。FutureAdapter 是一個適配器,用于將 Dubbo 中的 ResponseFuture 與 JDK 中的 Future 進行適配。這樣當用戶線程調用 Future 的 get 方法時,經過 FutureAdapter 適配,最終會調用 ResponseFuture 實現類對象的 get 方法,也就是 DefaultFuture 的 get 方法。

到這里關于 Dubbo 幾種調用方式的代碼邏輯就分析完了,下面來分析請求數據的發送與接收,以及響應數據的發送與接收過程。

2.2 服務消費方發送請求 2.2.1 發送請求

本節我們來看一下同步調用模式下,服務消費方是如何發送調用請求的。在深入分析源碼前,我們先來看一張圖。

這張圖展示了服務消費方發送請求過程的部分調用棧,略為復雜。從上圖可以看出,經過多次調用后,才將請求數據送至 Netty NioClientSocketChannel。這樣做的原因是通過 Exchange 層為框架引入 Request 和 Response 語義,這一點會在接下來的源碼分析過程中會看到。其他的就不多說了,下面開始進行分析。首先分析 ReferenceCountExchangeClient 的源碼。

final class ReferenceCountExchangeClient implements ExchangeClient {

    private final URL url;
    private final AtomicInteger referenceCount = new AtomicInteger(0);

    public ReferenceCountExchangeClient(ExchangeClient client, ConcurrentMap ghostClientMap) {
        this.client = client;
        // 引用計數自增
        referenceCount.incrementAndGet();
        this.url = client.getUrl();
        
        // ...
    }

    @Override
    public ResponseFuture request(Object request) throws RemotingException {
        // 直接調用被裝飾對象的同簽名方法
        return client.request(request);
    }

    @Override
    public ResponseFuture request(Object request, int timeout) throws RemotingException {
        // 直接調用被裝飾對象的同簽名方法
        return client.request(request, timeout);
    }

    /** 引用計數自增,該方法由外部調用 */
    public void incrementAndGetCount() {
        // referenceCount 自增
        referenceCount.incrementAndGet();
    }
    
        @Override
    public void close(int timeout) {
        // referenceCount 自減
        if (referenceCount.decrementAndGet() <= 0) {
            if (timeout == 0) {
                client.close();
            } else {
                client.close(timeout);
            }
            client = replaceWithLazyClient();
        }
    }
    
    // 省略部分方法
}

ReferenceCountExchangeClient 內部定義了一個引用計數變量 referenceCount,每當該對象被引用一次 referenceCount 都會進行自增。每當 close 方法被調用時,referenceCount 進行自減。ReferenceCountExchangeClient 內部僅實現了一個引用計數的功能,其他方法并無復雜邏輯,均是直接調用被裝飾對象的相關方法。所以這里就不多說了,繼續向下分析,這次是 HeaderExchangeClient。

public class HeaderExchangeClient implements ExchangeClient {

    private static final ScheduledThreadPoolExecutor scheduled = new ScheduledThreadPoolExecutor(2, new NamedThreadFactory("dubbo-remoting-client-heartbeat", true));
    private final Client client;
    private final ExchangeChannel channel;
    private ScheduledFuture heartbeatTimer;
    private int heartbeat;
    private int heartbeatTimeout;

    public HeaderExchangeClient(Client client, boolean needHeartbeat) {
        if (client == null) {
            throw new IllegalArgumentException("client == null");
        }
        this.client = client;
        
        // 創建 HeaderExchangeChannel 對象
        this.channel = new HeaderExchangeChannel(client);
        
        // 以下代碼均與心跳檢測邏輯有關
        String dubbo = client.getUrl().getParameter(Constants.DUBBO_VERSION_KEY);
        this.heartbeat = client.getUrl().getParameter(Constants.HEARTBEAT_KEY, dubbo != null && dubbo.startsWith("1.0.") ? Constants.DEFAULT_HEARTBEAT : 0);
        this.heartbeatTimeout = client.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);
        if (heartbeatTimeout < heartbeat * 2) {
            throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2");
        }
        if (needHeartbeat) {
            // 開啟心跳檢測定時器
            startHeartbeatTimer();
        }
    }

    @Override
    public ResponseFuture request(Object request) throws RemotingException {
        // 直接 HeaderExchangeChannel 對象的同簽名方法
        return channel.request(request);
    }

    @Override
    public ResponseFuture request(Object request, int timeout) throws RemotingException {
        // 直接 HeaderExchangeChannel 對象的同簽名方法
        return channel.request(request, timeout);
    }

    @Override
    public void close() {
        doClose();
        channel.close();
    }
    
    private void doClose() {
        // 停止心跳檢測定時器
        stopHeartbeatTimer();
    }

    private void startHeartbeatTimer() {
        stopHeartbeatTimer();
        if (heartbeat > 0) {
            heartbeatTimer = scheduled.scheduleWithFixedDelay(
                    new HeartBeatTask(new HeartBeatTask.ChannelProvider() {
                        @Override
                        public Collection getChannels() {
                            return Collections.singletonList(HeaderExchangeClient.this);
                        }
                    }, heartbeat, heartbeatTimeout),
                    heartbeat, heartbeat, TimeUnit.MILLISECONDS);
        }
    }

    private void stopHeartbeatTimer() {
        if (heartbeatTimer != null && !heartbeatTimer.isCancelled()) {
            try {
                heartbeatTimer.cancel(true);
                scheduled.purge();
            } catch (Throwable e) {
                if (logger.isWarnEnabled()) {
                    logger.warn(e.getMessage(), e);
                }
            }
        }
        heartbeatTimer = null;
    }
    
    // 省略部分方法
}

HeaderExchangeClient 中很多方法只有一行代碼,即調用 HeaderExchangeChannel 對象的同簽名方法。那 HeaderExchangeClient 有什么用處呢?答案是封裝了一些關于心跳檢測的邏輯。心跳檢測并非本文所關注的點,因此就不多說了,繼續向下看。

final class HeaderExchangeChannel implements ExchangeChannel {
    
    private final Channel channel;
    
    HeaderExchangeChannel(Channel channel) {
        if (channel == null) {
            throw new IllegalArgumentException("channel == null");
        }
        
        // 這里的 channel 指向的是 NettyClient
        this.channel = channel;
    }
    
    @Override
    public ResponseFuture request(Object request) throws RemotingException {
        return request(request, channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT));
    }

    @Override
    public ResponseFuture request(Object request, int timeout) throws RemotingException {
        if (closed) {
            throw new RemotingException(..., "Failed to send request ...");
        }
        // 創建 Request 對象
        Request req = new Request();
        req.setVersion(Version.getProtocolVersion());
        // 設置雙向通信標志為 true
        req.setTwoWay(true);
        // 這里的 request 變量類型為 RpcInvocation
        req.setData(request);
                                        
        // 創建 DefaultFuture 對象
        DefaultFuture future = new DefaultFuture(channel, req, timeout);
        try {
            // 調用 NettyClient 的 send 方法發送請求
            channel.send(req);
        } catch (RemotingException e) {
            future.cancel();
            throw e;
        }
        // 返回 DefaultFuture 對象
        return future;
    }
}

到這里大家終于看到了 Request 語義了,上面的方法首先定義了一個 Request 對象,然后再將該對象傳給 NettyClient 的 send 方法,進行后續的調用。需要說明的是,NettyClient 中并未實現 send 方法,該方法繼承自父類 AbstractPeer,下面直接分析 AbstractPeer 的代碼。

public abstract class AbstractPeer implements Endpoint, ChannelHandler {
    
    @Override
    public void send(Object message) throws RemotingException {
        // 該方法由 AbstractClient 類實現
        send(message, url.getParameter(Constants.SENT_KEY, false));
    }
    
    // 省略其他方法
}

public abstract class AbstractClient extends AbstractEndpoint implements Client {
    
    @Override
    public void send(Object message, boolean sent) throws RemotingException {
        if (send_reconnect && !isConnected()) {
            connect();
        }
        
        // 獲取 Channel,getChannel 是一個抽象方法,具體由子類實現
        Channel channel = getChannel();
        if (channel == null || !channel.isConnected()) {
            throw new RemotingException(this, "message can not send ...");
        }
        
        // 繼續向下調用
        channel.send(message, sent);
    }
    
    protected abstract Channel getChannel();
    
    // 省略其他方法
}

默認情況下,Dubbo 使用 Netty 作為底層的通信框架,因此下面我們到 NettyClient 類中看一下 getChannel 方法的實現邏輯。

public class NettyClient extends AbstractClient {
    
    // 這里的 Channel 全限定名稱為 org.jboss.netty.channel.Channel
    private volatile Channel channel;

    @Override
    protected com.alibaba.dubbo.remoting.Channel getChannel() {
        Channel c = channel;
        if (c == null || !c.isConnected())
            return null;
        // 獲取一個 NettyChannel 類型對象
        return NettyChannel.getOrAddChannel(c, getUrl(), this);
    }
}

final class NettyChannel extends AbstractChannel {

    private static final ConcurrentMap channelMap = 
        new ConcurrentHashMap();

    private final org.jboss.netty.channel.Channel channel;
    
    /** 私有構造方法 */
    private NettyChannel(org.jboss.netty.channel.Channel channel, URL url, ChannelHandler handler) {
        super(url, handler);
        if (channel == null) {
            throw new IllegalArgumentException("netty channel == null;");
        }
        this.channel = channel;
    }

    static NettyChannel getOrAddChannel(org.jboss.netty.channel.Channel ch, URL url, ChannelHandler handler) {
        if (ch == null) {
            return null;
        }
        
        // 嘗試從集合中獲取 NettyChannel 實例
        NettyChannel ret = channelMap.get(ch);
        if (ret == null) {
            // 如果 ret = null,則創建一個新的 NettyChannel 實例
            NettyChannel nc = new NettyChannel(ch, url, handler);
            if (ch.isConnected()) {
                // 將  鍵值對存入 channelMap 集合中
                ret = channelMap.putIfAbsent(ch, nc);
            }
            if (ret == null) {
                ret = nc;
            }
        }
        return ret;
    }
}

獲取到 NettyChannel 實例后,即可進行后續的調用。下面看一下 NettyChannel 的 send 方法。

public void send(Object message, boolean sent) throws RemotingException {
    super.send(message, sent);

    boolean success = true;
    int timeout = 0;
    try {
        // 發送消息(包含請求和響應消息)
        ChannelFuture future = channel.write(message);
        
        // sent 的值源于  中 sent 的配置值,有兩種配置值:
        //   1. true: 等待消息發出,消息發送失敗將拋出異常
        //   2. false: 不等待消息發出,將消息放入 IO 隊列,即刻返回
        // 默認情況下 sent = false;
        if (sent) {
            timeout = getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
            // 等待消息發出,若在規定時間沒能發出,success 會被置為 false
            success = future.await(timeout);
        }
        Throwable cause = future.getCause();
        if (cause != null) {
            throw cause;
        }
    } catch (Throwable e) {
        throw new RemotingException(this, "Failed to send message ...");
    }

    // 若 success 為 false,這里拋出異常
    if (!success) {
        throw new RemotingException(this, "Failed to send message ...");
    }
}

經歷多次調用,到這里請求數據的發送過程就結束了,過程漫長。為了便于大家閱讀代碼,這里以 DemoService 為例,將 sayHello 方法的整個調用路徑貼出來。

proxy0#sayHello(String)
  —> InvokerInvocationHandler#invoke(Object, Method, Object[])
    —> MockClusterInvoker#invoke(Invocation)
      —> AbstractClusterInvoker#invoke(Invocation)
        —> FailoverClusterInvoker#doInvoke(Invocation, List>, LoadBalance)
          —> Filter#invoke(Invoker, Invocation)  // 包含多個 Filter 調用
            —> ListenerInvokerWrapper#invoke(Invocation) 
              —> AbstractInvoker#invoke(Invocation) 
                —> DubboInvoker#doInvoke(Invocation)
                  —> ReferenceCountExchangeClient#request(Object, int)
                    —> HeaderExchangeClient#request(Object, int)
                      —> HeaderExchangeChannel#request(Object, int)
                        —> AbstractPeer#send(Object)
                          —> AbstractClient#send(Object, boolean)
                            —> NettyChannel#send(Object, boolean)
                              —> NioClientSocketChannel#write(Object)

在 Netty 中,出站數據在發出之前還需要進行編碼操作,接下來我們來分析一下請求數據的編碼邏輯。

2.2.2 請求編碼

在分析請求編碼邏輯之前,我們先來看一下 Dubbo 數據包結構。

Dubbo 數據包分為消息頭和消息體,消息頭用于存儲一些元信息,比如魔數(Magic),數據包類型(Request/Response),消息體長度(Data Length)等。消息體中用于存儲具體的調用消息,比如方法名稱,參數列表等。下面簡單列舉一下消息頭的內容。

偏移量(Bit) 字段 取值
0 ~ 7 魔數高位 0xda00
8 ~ 15 魔數低位 0xbb
16 數據包類型 0 - Response, 1 - Request
17 調用方式 僅在第16位被設為1的情況下有效,0 - 單向調用,1 - 雙向調用
18 事件標識 0 - 當前數據包是請求或響應包,1 - 當前數據包是心跳包
19 ~ 23 序列化器編號 2 - Hessian2Serialization
3 - JavaSerialization
4 - CompactedJavaSerialization
6 - FastJsonSerialization
7 - NativeJavaSerialization
8 - KryoSerialization
9 - FstSerialization
24 ~ 31 狀態 20 - OK
30 - CLIENT_TIMEOUT
31 - SERVER_TIMEOUT
40 - BAD_REQUEST
50 - BAD_RESPONSE
......
32 ~ 95 請求編號 共8字節,運行時生成
96 ~ 127 消息體長度 運行時計算

了解了 Dubbo 數據包格式,接下來我們就可以探索編碼過程了。這次我們開門見山,直接分析編碼邏輯所在類。如下:

public class ExchangeCodec extends TelnetCodec {

    // 消息頭長度
    protected static final int HEADER_LENGTH = 16;
    // 魔數內容
    protected static final short MAGIC = (short) 0xdabb;
    protected static final byte MAGIC_HIGH = Bytes.short2bytes(MAGIC)[0];
    protected static final byte MAGIC_LOW = Bytes.short2bytes(MAGIC)[1];
    protected static final byte FLAG_REQUEST = (byte) 0x80;
    protected static final byte FLAG_TWOWAY = (byte) 0x40;
    protected static final byte FLAG_EVENT = (byte) 0x20;
    protected static final int SERIALIZATION_MASK = 0x1f;
    private static final Logger logger = LoggerFactory.getLogger(ExchangeCodec.class);

    public Short getMagicCode() {
        return MAGIC;
    }

    @Override
    public void encode(Channel channel, ChannelBuffer buffer, Object msg) throws IOException {
        if (msg instanceof Request) {
            // 對 Request 對象進行編碼
            encodeRequest(channel, buffer, (Request) msg);
        } else if (msg instanceof Response) {
            // 對 Response 對象進行編碼,后面分析
            encodeResponse(channel, buffer, (Response) msg);
        } else {
            super.encode(channel, buffer, msg);
        }
    }

    protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request req) throws IOException {
        Serialization serialization = getSerialization(channel);

        // 創建消息頭字節數組,長度為 16
        byte[] header = new byte[HEADER_LENGTH];

        // 設置魔數
        Bytes.short2bytes(MAGIC, header);

        // 設置數據包類型(Request/Response)和序列化器編號
        header[2] = (byte) (FLAG_REQUEST | serialization.getContentTypeId());

        // 設置通信方式(單向/雙向)
        if (req.isTwoWay()) {
            header[2] |= FLAG_TWOWAY;
        }
        
        // 設置事件標識
        if (req.isEvent()) {
            header[2] |= FLAG_EVENT;
        }

        // 設置請求編號,8個字節,從第4個字節開始設置
        Bytes.long2bytes(req.getId(), header, 4);

        // 獲取 buffer 當前的寫位置
        int savedWriteIndex = buffer.writerIndex();
        // 更新 writerIndex,為消息頭預留 16 個字節的空間
        buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);
        ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);
        // 創建序列化器,比如 Hessian2ObjectOutput
        ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
        if (req.isEvent()) {
            // 對事件數據進行序列化操作
            encodeEventData(channel, out, req.getData());
        } else {
            // 對請求數據進行序列化操作
            encodeRequestData(channel, out, req.getData(), req.getVersion());
        }
        out.flushBuffer();
        if (out instanceof Cleanable) {
            ((Cleanable) out).cleanup();
        }
        bos.flush();
        bos.close();
        
        // 獲取寫入的字節數,也就是消息體長度
        int len = bos.writtenBytes();
        checkPayload(channel, len);

        // 將消息體長度寫入到消息頭中
        Bytes.int2bytes(len, header, 12);

        // 將 buffer 指針移動到 savedWriteIndex,為寫消息頭做準備
        buffer.writerIndex(savedWriteIndex);
        // 從 savedWriteIndex 下標處寫入消息頭
        buffer.writeBytes(header);
        // 設置新的 writerIndex,writerIndex = 原寫下標 + 消息頭長度 + 消息體長度
        buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len);
    }
    
    // 省略其他方法
}

以上就是請求對象的編碼過程,該過程首先會通過位運算將消息頭寫入到 header 數組中。然后對 Request 對象的 data 字段執行序列化操作,序列化后的數據最終會存儲到 ChannelBuffer 中。序列化操作執行完后,可得到數據序列化后的長度 len,緊接著將 len 寫入到 header 指定位置處。最后再將消息頭字節數組 header 寫入到 ChannelBuffer 中,整個編碼過程就結束了。本節的最后,我們再來看一下 Request 對象的 data 字段序列化過程,也就是 encodeRequestData 方法的邏輯,如下:

public class DubboCodec extends ExchangeCodec implements Codec2 {
    
    protected void encodeRequestData(Channel channel, ObjectOutput out, Object data, String version) throws IOException {
        RpcInvocation inv = (RpcInvocation) data;

        // 依次序列化 dubbo version、path、version
        out.writeUTF(version);
        out.writeUTF(inv.getAttachment(Constants.PATH_KEY));
        out.writeUTF(inv.getAttachment(Constants.VERSION_KEY));

        // 序列化調用方法名
        out.writeUTF(inv.getMethodName());
        // 將參數類型轉換為字符串,并進行序列化
        out.writeUTF(ReflectUtils.getDesc(inv.getParameterTypes()));
        Object[] args = inv.getArguments();
        if (args != null)
            for (int i = 0; i < args.length; i++) {
                // 對運行時參數進行序列化
                out.writeObject(encodeInvocationArgument(channel, inv, i));
            }
        
        // 序列化 attachments
        out.writeObject(inv.getAttachments());
    }
}

至此,關于服務消費方發送請求的過程就分析完了,接下來我們來看一下服務提供方是如何接收請求的。

2.3 服務提供方接收請求

前面說過,默認情況下 Dubbo 使用 Netty 作為底層的通信框架。Netty 檢測到有數據入站后,首先會通過解碼器對數據進行解碼,并將解碼后的數據傳遞給下一個入站處理器的指定方法。所以在進行后續的分析之前,我們先來看一下數據解碼過程。

2.3.1 請求解碼

這里直接分析請求數據的解碼邏輯,忽略中間過程,如下:

public class ExchangeCodec extends TelnetCodec {
    
    @Override
    public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {
        int readable = buffer.readableBytes();
        // 創建消息頭字節數組
        byte[] header = new byte[Math.min(readable, HEADER_LENGTH)];
        // 讀取消息頭數據
        buffer.readBytes(header);
        // 調用重載方法進行后續解碼工作
        return decode(channel, buffer, readable, header);
    }

    @Override
    protected Object decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header) throws IOException {
        // 檢查魔數是否相等
        if (readable > 0 && header[0] != MAGIC_HIGH
                || readable > 1 && header[1] != MAGIC_LOW) {
            int length = header.length;
            if (header.length < readable) {
                header = Bytes.copyOf(header, readable);
                buffer.readBytes(header, length, readable - length);
            }
            for (int i = 1; i < header.length - 1; i++) {
                if (header[i] == MAGIC_HIGH && header[i + 1] == MAGIC_LOW) {
                    buffer.readerIndex(buffer.readerIndex() - header.length + i);
                    header = Bytes.copyOf(header, i);
                    break;
                }
            }
            // 通過 telnet 命令行發送的數據包不包含消息頭,所以這里
            // 調用 TelnetCodec 的 decode 方法對數據包進行解碼
            return super.decode(channel, buffer, readable, header);
        }
        
        // 檢測可讀數據量是否少于消息頭長度,若小于則立即返回 DecodeResult.NEED_MORE_INPUT
        if (readable < HEADER_LENGTH) {
            return DecodeResult.NEED_MORE_INPUT;
        }

        // 從消息頭中獲取消息體長度
        int len = Bytes.bytes2int(header, 12);
        // 檢測消息體長度是否超出限制,超出則拋出異常
        checkPayload(channel, len);

        int tt = len + HEADER_LENGTH;
        // 檢測可讀的字節數是否小于實際的字節數
        if (readable < tt) {
            return DecodeResult.NEED_MORE_INPUT;
        }
        
        ChannelBufferInputStream is = new ChannelBufferInputStream(buffer, len);

        try {
            // 繼續進行解碼工作
            return decodeBody(channel, is, header);
        } finally {
            if (is.available() > 0) {
                try {
                    StreamUtils.skipUnusedStream(is);
                } catch (IOException e) {
                    logger.warn(e.getMessage(), e);
                }
            }
        }
    }
}

上面方法通過檢測消息頭中的魔數是否與規定的魔數相等,提前攔截掉非常規數據包,比如通過 telnet 命令行發出的數據包。接著再對消息體長度,以及可讀字節數進行檢測。最后調用 decodeBody 方法進行后續的解碼工作,ExchangeCodec 中實現了 decodeBody 方法,但因其子類 DubboCodec 覆寫了該方法,所以在運行時 DubboCodec 中的 decodeBody 方法會被調用。下面我們來看一下該方法的代碼。

public class DubboCodec extends ExchangeCodec implements Codec2 {

    @Override
    protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
        // 獲取消息頭中的第三個字節,并通過邏輯與運算得到序列化器編號
        byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK);
        Serialization s = CodecSupport.getSerialization(channel.getUrl(), proto);
        // 獲取調用編號
        long id = Bytes.bytes2long(header, 4);
        // 通過邏輯與運算得到調用類型,0 - Response,1 - Request
        if ((flag & FLAG_REQUEST) == 0) {
            // 對響應結果進行解碼,得到 Response 對象。這個非本節內容,后面再分析
            // ...
        } else {
            // 創建 Request 對象
            Request req = new Request(id);
            req.setVersion(Version.getProtocolVersion());
            // 通過邏輯與運算得到通信方式,并設置到 Request 對象中
            req.setTwoWay((flag & FLAG_TWOWAY) != 0);
            
            // 通過位運算檢測數據包是否為事件類型
            if ((flag & FLAG_EVENT) != 0) {
                // 設置心跳事件到 Request 對象中
                req.setEvent(Request.HEARTBEAT_EVENT);
            }
            try {
                Object data;
                if (req.isHeartbeat()) {
                    // 對心跳包進行解碼,該方法已被標注為廢棄
                    data = decodeHeartbeatData(channel, deserialize(s, channel.getUrl(), is));
                } else if (req.isEvent()) {
                    // 對事件數據進行解碼
                    data = decodeEventData(channel, deserialize(s, channel.getUrl(), is));
                } else {
                    DecodeableRpcInvocation inv;
                    // 根據 url 參數判斷是否在 IO 線程上對消息體進行解碼
                    if (channel.getUrl().getParameter(
                            Constants.DECODE_IN_IO_THREAD_KEY,
                            Constants.DEFAULT_DECODE_IN_IO_THREAD)) {
                        inv = new DecodeableRpcInvocation(channel, req, is, proto);
                        // 在當前線程,也就是 IO 線程上進行后續的解碼工作。此工作完成后,可將
                        // 調用方法名、attachment、以及調用參數解析出來
                        inv.decode();
                    } else {
                        // 僅創建 DecodeableRpcInvocation 對象,但不在當前線程上執行解碼邏輯
                        inv = new DecodeableRpcInvocation(channel, req,
                                new UnsafeByteArrayInputStream(readMessageData(is)), proto);
                    }
                    data = inv;
                }
                
                // 設置 data 到 Request 對象中
                req.setData(data);
            } catch (Throwable t) {
                // 若解碼過程中出現異常,則將 broken 字段設為 true,
                // 并將異常對象設置到 Reqeust 對象中
                req.setBroken(true);
                req.setData(t);
            }
            return req;
        }
    }
}

如上,decodeBody 對部分字段進行了解碼,并將解碼得到的字段封裝到 Request 中。隨后會調用 DecodeableRpcInvocation 的 decode 方法進行后續的解碼工作。此工作完成后,可將調用方法名、attachment、以及運行時調用參數解析出來。下面我們來看一下 DecodeableRpcInvocation 的 decode 方法邏輯。

public class DecodeableRpcInvocation extends RpcInvocation implements Codec, Decodeable {
    
    @Override
    public Object decode(Channel channel, InputStream input) throws IOException {
        ObjectInput in = CodecSupport.getSerialization(channel.getUrl(), serializationType)
                .deserialize(channel.getUrl(), input);

        // 通過反序列化得到 dubbo version,并保存到 attachments 變量中
        String dubboVersion = in.readUTF();
        request.setVersion(dubboVersion);
        setAttachment(Constants.DUBBO_VERSION_KEY, dubboVersion);

        // 通過反序列化得到 path,version,并保存到 attachments 變量中
        setAttachment(Constants.PATH_KEY, in.readUTF());
        setAttachment(Constants.VERSION_KEY, in.readUTF());

        // 通過反序列化得到調用方法名
        setMethodName(in.readUTF());
        try {
            Object[] args;
            Class[] pts;
            // 通過反序列化得到參數類型字符串,比如 Ljava/lang/String;
            String desc = in.readUTF();
            if (desc.length() == 0) {
                pts = DubboCodec.EMPTY_CLASS_ARRAY;
                args = DubboCodec.EMPTY_OBJECT_ARRAY;
            } else {
                // 將 desc 解析為參數類型數組
                pts = ReflectUtils.desc2classArray(desc);
                args = new Object[pts.length];
                for (int i = 0; i < args.length; i++) {
                    try {
                        // 解析運行時參數
                        args[i] = in.readObject(pts[i]);
                    } catch (Exception e) {
                        if (log.isWarnEnabled()) {
                            log.warn("Decode argument failed: " + e.getMessage(), e);
                        }
                    }
                }
            }
            
            // 設置參數類型數組
            setParameterTypes(pts);

            // 通過反序列化得到原 attachments 的內容
            Map map = (Map) in.readObject(Map.class);
            if (map != null && map.size() > 0) {
                Map attachment = getAttachments();
                if (attachment == null) {
                    attachment = new HashMap();
                }
                // 將 map 與當前對象中的 attachment 集合進行融合
                attachment.putAll(map);
                setAttachments(attachment);
            }
            
            // 對 callback 類型的參數進行處理
            for (int i = 0; i < args.length; i++) {
                args[i] = decodeInvocationArgument(channel, this, pts, i, args[i]);
            }

            // 設置參數列表
            setArguments(args);

        } catch (ClassNotFoundException e) {
            throw new IOException(StringUtils.toString("Read invocation data failed.", e));
        } finally {
            if (in instanceof Cleanable) {
                ((Cleanable) in).cleanup();
            }
        }
        return this;
    }
}

上面的方法通過反序列化將諸如 path、version、調用方法名、參數列表等信息依次解析出來,并設置到相應的字段中,最終得到一個具有完整調用信息的 DecodeableRpcInvocation 對象。

到這里,請求數據解碼的過程就分析完了。此時我們得到了一個 Request 對象,這個對象會被傳送到下一個入站處理器中,我們繼續往下看。

2.3.2 調用服務

解碼器將數據包解析成 Request 對象后,NettyHandler 的 messageReceived 方法緊接著會收到這個對象,并將這個對象繼續向下傳遞。這期間該對象會被依次傳遞給 NettyServer、MultiMessageHandler、HeartbeatHandler 以及 AllChannelHandler。最后由 AllChannelHandler 將該對象封裝到 Runnable 實現類對象中,并將 Runnable 放入線程池中執行后續的調用邏輯。整個調用棧如下:

NettyHandler#messageReceived(ChannelHandlerContext, MessageEvent)
  —> AbstractPeer#received(Channel, Object)
    —> MultiMessageHandler#received(Channel, Object)
      —> HeartbeatHandler#received(Channel, Object)
        —> AllChannelHandler#received(Channel, Object)
          —> ExecutorService#execute(Runnable)    // 由線程池執行后續的調用邏輯

考慮到篇幅,以及很多中間調用的邏輯并非十分重要,所以這里就不對調用棧中的每個方法都進行分析了。這里我們直接分析調用棧中的分析第一個和最后一個調用方法邏輯。如下:

@Sharable
public class NettyHandler extends SimpleChannelHandler {
    
    private final Map channels = new ConcurrentHashMap();

    private final URL url;

    private final ChannelHandler handler;
    
    public NettyHandler(URL url, ChannelHandler handler) {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        if (handler == null) {
            throw new IllegalArgumentException("handler == null");
        }
        this.url = url;
        
        // 這里的 handler 類型為 NettyServer
        this.handler = handler;
    }
    
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
        // 獲取 NettyChannel
        NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);
        try {
            // 繼續向下調用
            handler.received(channel, e.getMessage());
        } finally {
            NettyChannel.removeChannelIfDisconnected(ctx.getChannel());
        }
    }
}

如上,NettyHandler 中的 messageReceived 邏輯比較簡單。首先根據一些信息獲取 NettyChannel 實例,然后將 NettyChannel 實例以及 Request 對象向下傳遞。下面再來看看 AllChannelHandler 的邏輯,在詳細分析代碼之前,我們先來了解一下 Dubbo 中的線程派發模型。

2.3.2.1 線程派發模型

Dubbo 將底層通信框架中接收請求的線程稱為 IO 線程。如果一些事件處理邏輯可以很快執行完,比如只在內存打一個標記,此時直接在 IO 線程上執行該段邏輯即可。但如果事件的處理邏輯比較耗時,比如該段邏輯會發起數據庫查詢或者 HTTP 請求。此時我們就不應該讓事件處理邏輯在 IO 線程上執行,而是應該派發到線程池中去執行。原因也很簡單,IO 線程主要用于接收請求,如果 IO 線程被占滿,將導致它不能接收新的請求。

以上就是線程派發的背景,下面我們再來通過 Dubbo 調用圖,看一下線程派發器所處的位置。

如上圖,紅框中的 Dispatcher 就是線程派發器。需要說明的是,Dispatcher 真實的職責創建具有線程派發能力的 ChannelHandler,比如 AllChannelHandler、MessageOnlyChannelHandler 和 ExecutionChannelHandler 等,其本身并不具備線程派發能力。Dubbo 支持 5 種不同的線程派發策略,下面通過一個表格列舉一下。

策略 用途
all 所有消息都派發到線程池,包括請求,響應,連接事件,斷開事件等
direct 所有消息都不派發到線程池,全部在 IO 線程上直接執行
message 只有請求響應消息派發到線程池,其它消息均在 IO 線程上執行
execution 只有請求消息派發到線程池,不含響應。其它消息均在 IO 線程上執行
connection 在 IO 線程上,將連接斷開事件放入隊列,有序逐個執行,其它消息派發到線程池

默認配置下,Dubbo 使用 all 派發策略,即將所有的消息都派發到線程池中。下面我們來分析一下 AllChannelHandler 的代碼。

public class AllChannelHandler extends WrappedChannelHandler {

    public AllChannelHandler(ChannelHandler handler, URL url) {
        super(handler, url);
    }

    /** 處理連接事件 */
    @Override
    public void connected(Channel channel) throws RemotingException {
        // 獲取線程池
        ExecutorService cexecutor = getExecutorService();
        try {
            // 將連接事件派發到線程池中處理
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
        } catch (Throwable t) {
            throw new ExecutionException(..., " error when process connected event .", t);
        }
    }

    /** 處理斷開事件 */
    @Override
    public void disconnected(Channel channel) throws RemotingException {
        ExecutorService cexecutor = getExecutorService();
        try {
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));
        } catch (Throwable t) {
            throw new ExecutionException(..., "error when process disconnected event .", t);
        }
    }

    /** 處理請求和響應消息,這里的 message 變量類型可能是 Request,也可能是 Response */
    @Override
    public void received(Channel channel, Object message) throws RemotingException {
        ExecutorService cexecutor = getExecutorService();
        try {
            // 將請求和響應消息派發到線程池中處理
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
        } catch (Throwable t) {
            if(message instanceof Request && t instanceof RejectedExecutionException){
                Request request = (Request)message;
                // 如果通信方式為雙向通信,此時將 Server side ... threadpool is exhausted 
                // 錯誤信息封裝到 Response 中,并返回給服務消費方。
                if(request.isTwoWay()){
                    String msg = "Server side(" + url.getIp() + "," + url.getPort() 
                        + ") threadpool is exhausted ,detail msg:" + t.getMessage();
                    Response response = new Response(request.getId(), request.getVersion());
                    response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
                    response.setErrorMessage(msg);
                    // 返回包含錯誤信息的 Response 對象
                    channel.send(response);
                    return;
                }
            }
            throw new ExecutionException(..., " error when process received event .", t);
        }
    }

    /** 處理異常信息 */
    @Override
    public void caught(Channel channel, Throwable exception) throws RemotingException {
        ExecutorService cexecutor = getExecutorService();
        try {
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));
        } catch (Throwable t) {
            throw new ExecutionException(..., "error when process caught event ...");
        }
    }
}

如上,請求對象會被封裝 ChannelEventRunnable 中,ChannelEventRunnable 將會是服務調用過程的新起點。所以接下來我們以 ChannelEventRunnable 為起點向下探索。

2.3.2.2 調用服務

本小節,我們從 ChannelEventRunnable 開始分析,該類的主要代碼如下:

public class ChannelEventRunnable implements Runnable {
    
    private final ChannelHandler handler;
    private final Channel channel;
    private final ChannelState state;
    private final Throwable exception;
    private final Object message;
    
    @Override
    public void run() {
        // 檢測通道狀態,對于請求或響應消息,此時 state = RECEIVED
        if (state == ChannelState.RECEIVED) {
            try {
                // 將 channel 和 message 傳給 ChannelHandler 對象,進行后續的調用
                handler.received(channel, message);
            } catch (Exception e) {
                logger.warn("... operation error, channel is ... message is ...");
            }
        } 
        
        // 其他消息類型通過 switch 進行處理
        else {
            switch (state) {
            case CONNECTED:
                try {
                    handler.connected(channel);
                } catch (Exception e) {
                    logger.warn("... operation error, channel is ...");
                }
                break;
            case DISCONNECTED:
                // ...
            case SENT:
                // ...
            case CAUGHT:
                // ...
            default:
                logger.warn("unknown state: " + state + ", message is " + message);
            }
        }

    }
}

如上,請求和響應消息出現頻率明顯比其他類型消息高,所以這里對該類型的消息進行了針對性判斷。ChannelEventRunnable 僅是一個中轉站,它的 run 方法中并不包含具體的調用邏輯,僅用于將參數傳給其他 ChannelHandler 對象進行處理,該對象類型為 DecodeHandler。

public class DecodeHandler extends AbstractChannelHandlerDelegate {

    public DecodeHandler(ChannelHandler handler) {
        super(handler);
    }

    @Override
    public void received(Channel channel, Object message) throws RemotingException {
        if (message instanceof Decodeable) {
            // 對 Decodeable 接口實現類對象進行解碼
            decode(message);
        }

        if (message instanceof Request) {
            // 對 Request 的 data 字段進行解碼
            decode(((Request) message).getData());
        }

        if (message instanceof Response) {
            // 對 Request 的 result 字段進行解碼
            decode(((Response) message).getResult());
        }

        // 執行后續邏輯
        handler.received(channel, message);
    }

    private void decode(Object message) {
        // Decodeable 接口目前有兩個實現類,
        // 分別為 DecodeableRpcInvocation 和 DecodeableRpcResult
        if (message != null && message instanceof Decodeable) {
            try {
                // 執行解碼邏輯
                ((Decodeable) message).decode();
            } catch (Throwable e) {
                if (log.isWarnEnabled()) {
                    log.warn("Call Decodeable.decode failed: " + e.getMessage(), e);
                }
            }
        }
    }
}

DecodeHandler 主要是包含了一些解碼邏輯。2.2.1 節分析請求解碼時說過,請求解碼可在 IO 線程上執行,也可在線程池中執行,這個取決于運行時配置。DecodeHandler 存在的意義就是保證請求或響應對象可在線程池中被解碼。解碼完畢后,完全解碼后的 Request 對象會繼續向后傳遞,下一站是 HeaderExchangeHandler。

public class HeaderExchangeHandler implements ChannelHandlerDelegate {

    private final ExchangeHandler handler;

    public HeaderExchangeHandler(ExchangeHandler handler) {
        if (handler == null) {
            throw new IllegalArgumentException("handler == null");
        }
        this.handler = handler;
    }

    @Override
    public void received(Channel channel, Object message) throws RemotingException {
        channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
        ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
        try {
            // 處理請求對象
            if (message instanceof Request) {
                Request request = (Request) message;
                if (request.isEvent()) {
                    // 處理事件
                    handlerEvent(channel, request);
                } 
                // 處理普通的請求
                else {
                    // 雙向通信
                    if (request.isTwoWay()) {
                        // 向后調用服務,并得到調用結果
                        Response response = handleRequest(exchangeChannel, request);
                        // 將調用結果返回給服務消費端
                        channel.send(response);
                    } 
                    // 如果是單向通信,僅向后調用指定服務即可,無需返回調用結果
                    else {
                        handler.received(exchangeChannel, request.getData());
                    }
                }
            }      
            // 處理響應對象,服務消費方會執行此處邏輯,后面分析
            else if (message instanceof Response) {
                handleResponse(channel, (Response) message);
            } else if (message instanceof String) {
                // telnet 相關,忽略
            } else {
                handler.received(exchangeChannel, message);
            }
        } finally {
            HeaderExchangeChannel.removeChannelIfDisconnected(channel);
        }
    }

    Response handleRequest(ExchangeChannel channel, Request req) throws RemotingException {
        Response res = new Response(req.getId(), req.getVersion());
        // 檢測請求是否合法,不合法則返回狀態碼為 BAD_REQUEST 的響應
        if (req.isBroken()) {
            Object data = req.getData();

            String msg;
            if (data == null)
                msg = null;
            else if
                (data instanceof Throwable) msg = StringUtils.toString((Throwable) data);
            else
                msg = data.toString();
            res.setErrorMessage("Fail to decode request due to: " + msg);
            // 設置 BAD_REQUEST 狀態
            res.setStatus(Response.BAD_REQUEST);

            return res;
        }
        
        // 獲取 data 字段值,也就是 RpcInvocation 對象
        Object msg = req.getData();
        try {
            // 繼續向下調用
            Object result = handler.reply(channel, msg);
            // 設置 OK 狀態碼
            res.setStatus(Response.OK);
            // 設置調用結果
            res.setResult(result);
        } catch (Throwable e) {
            // 若調用過程出現異常,則設置 SERVICE_ERROR,表示服務端異常
            res.s           
               
                                           
                       
                 
            
                     
             
               

文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。

轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/72896.html

相關文章

  • Dubbo 源碼分析 - 集群容錯之 Cluster

    摘要:集群用途是將多個服務提供者合并為一個,并將這個暴露給服務消費者。比如發請求,接受服務提供者返回的數據等。如果包含,表明對應的服務提供者可能因網絡原因未能成功提供服務。如果不包含,此時還需要進行可用性檢測,比如檢測服務提供者網絡連通性等。 1.簡介 為了避免單點故障,現在的應用至少會部署在兩臺服務器上。對于一些負載比較高的服務,會部署更多臺服務器。這樣,同一環境下的服務提供者數量會大于1...

    denson 評論0 收藏0
  • Dubbo 源碼分析 - 服務導出

    摘要:支持兩種服務導出方式,分別延遲導出和立即導出。本文打算分析服務延遲導出過程,因此不會分析方法。服務導出之前,要進行對一系列的配置進行檢查,以及生成。返回時,表示需要延遲導出。賽程預告,下一站是服務導出的前置工作。 1.服務導出過程 本篇文章,我們來研究一下 Dubbo 導出服務的過程。Dubbo 服務導出過程始于 Spring 容器發布刷新事件,Dubbo 在接收到事件后,會立即執行服...

    劉玉平 評論0 收藏0
  • dubbo源碼解析(四十五)服務引用過程

    摘要:服務引用過程目標從源碼的角度分析服務引用過程。并保留服務提供者的部分配置,比如版本,,時間戳等最后將合并后的配置設置為查詢字符串中。的可以參考源碼解析二十三遠程調用的一的源碼分析。 dubbo服務引用過程 目標:從源碼的角度分析服務引用過程。 前言 前面服務暴露過程的文章講解到,服務引用有兩種方式,一種就是直連,也就是直接指定服務的地址來進行引用,這種方式更多的時候被用來做服務測試,不...

    xiaowugui666 評論0 收藏0
  • dubbo源碼解析——概要篇

    摘要:服務提供者代碼上面這個類會被封裝成為一個實例,并新生成一個實例。這樣當網絡通訊層收到一個請求后,會找到對應的實例,并調用它所對應的實例,從而真正調用了服務提供者的代碼。 這次源碼解析借鑒《肥朝》前輩的dubbo源碼解析,進行源碼學習。總結起來就是先總體,后局部.也就是先把需要注意的概念先拋出來,把整體架構圖先畫出來.讓讀者拿著地圖跟著我的腳步,并且每一步我都提醒,現在我們在哪,我們下一...

    Meathill 評論0 收藏0
  • dubbo源碼解析(四十四)服務暴露過程

    摘要:服務暴露過程目標從源碼的角度分析服務暴露過程。導出服務,包含暴露服務到本地,和暴露服務到遠程兩個過程。其中服務暴露的第八步已經沒有了。將泛化調用版本號或者等信息加入獲得服務暴露地址和端口號,利用內數據組裝成。 dubbo服務暴露過程 目標:從源碼的角度分析服務暴露過程。 前言 本來這一篇一個寫異步化改造的內容,但是最近我一直在想,某一部分的優化改造該怎么去撰寫才能更加的讓讀者理解。我覺...

    light 評論0 收藏0

發表評論

0條評論

Travis

|高級講師

TA的文章

閱讀更多
最新活動
閱讀需要支付1元查看
<