国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專欄INFORMATION COLUMN

dubbo源碼解析(四十七)服務端處理請求過程

yzzz / 3419人閱讀

摘要:而存在的意義就是保證請求或響應對象可在線程池中被解碼,解碼完成后,就會分發到的。

2.7大揭秘——服務端處理請求過程
目標:從源碼的角度分析服務端接收到請求后的一系列操作,最終把客戶端需要的值返回。
前言

上一篇講到了消費端發送請求的過程,該篇就要將服務端處理請求的過程。也就是當服務端收到請求數據包后的一系列處理以及如何返回最終結果。我們也知道消費端在發送請求的時候已經做了編碼,所以我們也需要在服務端接收到數據包后,對協議頭和協議體進行解碼。不過本篇不講解如何解碼。有興趣的可以翻翻我以前的文章,有講到關于解碼的邏輯。接下來就開始講解服務端收到請求后的邏輯。

處理過程

假設遠程通信的實現還是用netty4,解碼器將數據包解析成 Request 對象后,NettyHandler 的 messageReceived 方法緊接著會收到這個對象,所以第一步就是NettyServerHandler的channelRead。

(一)NettyServerHandler的channelRead

可以參考《dubbo源碼解析(十七)遠程通信——Netty4》的(三)NettyServerHandler

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    // 看是否在緩存中命中,如果沒有命中,則創建NettyChannel并且緩存。
    NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
    try {
        // 接受消息
        handler.received(channel, msg);
    } finally {
        // 如果通道不活躍或者斷掉,則從緩存中清除
        NettyChannel.removeChannelIfDisconnected(ctx.channel());
    }
}

NettyServerHandler是基于netty4實現的服務端通道處理實現類,而該方法就是用來接收請求,接下來就是執行AbstractPeer的received。

(二)AbstractPeer的received

可以參考《dubbo源碼解析(九)遠程通信——Transport層》的(一)AbstractPeer

public void received(Channel ch, Object msg) throws RemotingException {
    // 如果通道已經關閉,則直接返回
    if (closed) {
        return;
    }
    handler.received(ch, msg);
}

該方法比較簡單,之前也講過AbstractPeer類就做了裝飾模式中裝飾角色,只是維護了通道的正在關閉和關閉完成兩個狀態。然后到了MultiMessageHandler的received

(三)MultiMessageHandler的received

可以參考《dubbo源碼解析(九)遠程通信——Transport層》的(八)MultiMessageHandler

public void received(Channel channel, Object message) throws RemotingException {
    // 如果消息是MultiMessage類型的,也就是多消息類型
    if (message instanceof MultiMessage) {
        // 強制轉化為MultiMessage
        MultiMessage list = (MultiMessage) message;
        // 把各個消息進行發送
        for (Object obj : list) {
            handler.received(channel, obj);
        }
    } else {
        // 直接發送
        handler.received(channel, message);
    }
}

該方法也比較簡單,就是對于多消息的處理。

(四)HeartbeatHandler的received

可以參考《dubbo源碼解析(十)遠程通信——Exchange層》的(二十)HeartbeatHandler。其中就是對心跳事件做了處理。如果不是心跳請求,那么接下去走到AllChannelHandler的received。

(五)AllChannelHandler的received

可以參考《dubbo源碼解析(九)遠程通信——Transport層》的(十一)AllChannelHandler。該類處理的是連接、斷開連接、捕獲異常以及接收到的所有消息都分發到線程池。所以這里的received方法就是把請求分發到線程池,讓線程池去執行該請求。

還記得我在之前文章里面講到到Dispatcher接口嗎,它是一個線程派發器。分別有五個實現:

Dispatcher實現類 對應的handler 用途
AllDispatcher AllChannelHandler 所有消息都派發到線程池,包括請求,響應,連接事件,斷開事件等
ConnectionOrderedDispatcher ConnectionOrderedChannelHandler 在 IO 線程上,將連接和斷開事件放入隊列,有序逐個執行,其它消息派發到線程池
DirectDispatcher 所有消息都不派發到線程池,全部在 IO 線程上直接執行
ExecutionDispatcher ExecutionChannelHandler 只有請求消息派發到線程池,不含響應。其它消息均在 IO 線程上執行
MessageOnlyDispatcher MessageOnlyChannelHandler 只有請求和響應消息派發到線程池,其它消息均在 IO 線程上執行

這些Dispatcher的實現類以及對應的Handler都可以在《dubbo源碼解析(九)遠程通信——Transport層》中查看相關實現。dubbo默認all為派發策略。所以我在這里講了AllChannelHandler的received。把消息送到線程池后,可以看到首先會創建一個ChannelEventRunnable實體。那么接下來就是線程接收并且執行任務了。

(六)ChannelEventRunnable的run

ChannelEventRunnable實現了Runnable接口,主要是用來接收消息事件,并且根據事件的種類來分別執行不同的操作。來看看它的run方法:

public void run() {
    // 如果是接收的消息
    if (state == ChannelState.RECEIVED) {
        try {
            // 直接調用下一個received
            handler.received(channel, message);
        } catch (Exception e) {
            logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
                    + ", message is " + message, e);
        }
    } else {
        switch (state) {
            //如果是連接事件請求
        case CONNECTED:
            try {
                // 執行連接
                handler.connected(channel);
            } catch (Exception e) {
                logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);
            }
            break;
            // 如果是斷開連接事件請求
        case DISCONNECTED:
            try {
                // 執行斷開連接
                handler.disconnected(channel);
            } catch (Exception e) {
                logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);
            }
            break;
            // 如果是發送消息
        case SENT:
            try {
                // 執行發送消息
                handler.sent(channel, message);
            } catch (Exception e) {
                logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
                        + ", message is " + message, e);
            }
            break;
            // 如果是異常
        case CAUGHT:
            try {
                // 執行異常捕獲
                handler.caught(channel, exception);
            } catch (Exception e) {
                logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
                        + ", message is: " + message + ", exception is " + exception, e);
            }
            break;
        default:
            logger.warn("unknown state: " + state + ", message is " + message);
        }
    }

}

可以看到把消息分為了幾種類別,因為請求和響應消息出現頻率明顯比其他類型消息高,也就是RECEIVED,所以多帶帶先做處理,根據不同的類型的消息,會被執行不同的邏輯,我們這里主要看state為RECEIVED的,那么如果是RECEIVED,則會執行下一個received方法。

(七)DecodeHandler的received

可以參考《dubbo源碼解析(九)遠程通信——Transport層》的(七)DecodeHandler。可以看到received方法中根據消息的類型進行不同的解碼。而DecodeHandler 存在的意義就是保證請求或響應對象可在線程池中被解碼,解碼完成后,就會分發到HeaderExchangeHandler的received。

(八)HeaderExchangeHandler的received
public void received(Channel channel, Object message) throws RemotingException {
    // 設置接收到消息的時間戳
    channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
    // 獲得通道
    final ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
    try {
        // 如果消息是Request類型
        if (message instanceof Request) {
            // handle request.
            // 強制轉化為Request
            Request request = (Request) message;
            // 如果該請求是事件心跳事件或者只讀事件
            if (request.isEvent()) {
                // 執行事件
                handlerEvent(channel, request);
            } else {
                // 如果是正常的調用請求,且需要響應
                if (request.isTwoWay()) {
                    // 處理請求
                    handleRequest(exchangeChannel, request);
                } else {
                    // 如果不需要響應,則繼續下一步
                    handler.received(exchangeChannel, request.getData());
                }
            }
        } else if (message instanceof Response) {
            // 處理響應
            handleResponse(channel, (Response) message);
        } else if (message instanceof String) {
            // 如果是telnet相關的請求
            if (isClientSide(channel)) {
                // 如果是客戶端側,則直接拋出異常,因為客戶端側不支持telnet
                Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
                logger.error(e.getMessage(), e);
            } else {
                // 如果是服務端側,則執行telnet命令
                String echo = handler.telnet(channel, (String) message);
                if (echo != null && echo.length() > 0) {
                    channel.send(echo);
                }
            }
        } else {
            // 如果都不是,則繼續下一步
            handler.received(exchangeChannel, message);
        }
    } finally {
        // 移除關閉或者不活躍的通道
        HeaderExchangeChannel.removeChannelIfDisconnected(channel);
    }
}

該方法中就對消息進行了細分,事件請求、正常的調用請求、響應、telnet命令請求等,并且針對不同的消息類型做了不同的邏輯調用。我們這里主要看正常的調用請求。見下一步。

(九)HeaderExchangeHandler的handleRequest
void handleRequest(final ExchangeChannel channel, Request req) throws RemotingException {
    // 創建一個Response實例
    Response res = new Response(req.getId(), req.getVersion());
    // 如果請求被破壞了
    if (req.isBroken()) {
        // 獲得請求的數據包
        Object data = req.getData();

        String msg;
        // 如果數據為空
        if (data == null) {
            //消息設置為空
            msg = null;
            // 如果在這之前已經出現異常,也就是數據為Throwable類型
        } else if (data instanceof Throwable) {
            // 響應消息把異常信息返回
            msg = StringUtils.toString((Throwable) data);
        } else {
            // 返回請求數據
            msg = data.toString();
        }
        res.setErrorMessage("Fail to decode request due to: " + msg);
        // 設置錯誤請求的狀態碼
        res.setStatus(Response.BAD_REQUEST);

        // 發送該消息
        channel.send(res);
        return;
    }
    // find handler by message class.
    // 獲得請求數據 也就是 RpcInvocation 對象
    Object msg = req.getData();
    try {
        // 繼續向下調用 返回一個future
        CompletionStage future = handler.reply(channel, msg);
        future.whenComplete((appResult, t) -> {
            try {
                if (t == null) {
                    //設置調用結果狀態為成功
                    res.setStatus(Response.OK);
                    // 把結果放入響應
                    res.setResult(appResult);
                } else {
                    // 如果服務調用有異常,則設置結果狀態碼為服務錯誤
                    res.setStatus(Response.SERVICE_ERROR);
                    // 把報錯信息放到響應中
                    res.setErrorMessage(StringUtils.toString(t));
                }
                // 發送該響應
                channel.send(res);
            } catch (RemotingException e) {
                logger.warn("Send result to consumer failed, channel is " + channel + ", msg is " + e);
            } finally {
                // HeaderExchangeChannel.removeChannelIfDisconnected(channel);
            }
        });
    } catch (Throwable e) {
        // 如果在執行中拋出異常,則也算服務異常
        res.setStatus(Response.SERVICE_ERROR);
        res.setErrorMessage(StringUtils.toString(e));
        channel.send(res);
    }
}

該方法是處理正常的調用請求,主要做了正常、異常調用的情況處理,并且加入了狀態碼,然后發送執行后的結果給客戶端。接下來看下一步。

(十)DubboProtocol的requestHandler實例的reply

這里我默認是使用dubbo協議,所以執行的是DubboProtocol的requestHandler的reply方法。可以參考《dubbo源碼解析(二十四)遠程調用——dubbo協議》的(三)DubboProtocol

public CompletableFuture reply(ExchangeChannel channel, Object message) throws RemotingException {

    // 如果請求消息不屬于會話域,則拋出異常
    if (!(message instanceof Invocation)) {
        throw new RemotingException(channel, "Unsupported request: "
                + (message == null ? null : (message.getClass().getName() + ": " + message))
                + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
    }

    //強制類型轉化
    Invocation inv = (Invocation) message;
    // 獲得暴露的服務invoker
    Invoker invoker = getInvoker(channel, inv);
    // need to consider backward-compatibility if it"s a callback
    // 如果是回調服務
    if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
        // 獲得 方法定義
        String methodsStr = invoker.getUrl().getParameters().get("methods");
        boolean hasMethod = false;
        // 如果只有一個方法定義
        if (methodsStr == null || !methodsStr.contains(",")) {
            // 設置會話域中是否有一致的方法定義標志
            hasMethod = inv.getMethodName().equals(methodsStr);
        } else {
            // 分割不同的方法
            String[] methods = methodsStr.split(",");
            // 如果方法不止一個,則分割后遍歷查詢,找到了則設置為true
            for (String method : methods) {
                if (inv.getMethodName().equals(method)) {
                    hasMethod = true;
                    break;
                }
            }
        }
        // 如果沒有該方法,則打印告警日志
        if (!hasMethod) {
            logger.warn(new IllegalStateException("The methodName " + inv.getMethodName()
                    + " not found in callback service interface ,invoke will be ignored."
                    + " please update the api interface. url is:"
                    + invoker.getUrl()) + " ,invocation is :" + inv);
            return null;
        }
    }
    // 設置遠程地址
    RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
    // 調用下一個調用鏈
    Result result = invoker.invoke(inv);
    //  返回CompletableFuture
    return result.completionFuture().thenApply(Function.identity());
}

上述代碼有些變化,是最新的代碼,加入了CompletableFuture,這個我會在后續的異步化改造中講到。這里主要關注的是又開始跟客戶端發請求一樣執行invoke調用鏈了。

(十一)ProtocolFilterWrapper的CallbackRegistrationInvoker的invoke

可以直接參考《dubbo源碼解析(四十六)消費端發送請求過程》的(六)ProtocolFilterWrapper的內部類CallbackRegistrationInvoker的invoke。

(十二)ProtocolFilterWrapper的buildInvokerChain方法中的invoker實例的invoke方法。

可以直接參考《dubbo源碼解析(四十六)消費端發送請求過程》的(七)ProtocolFilterWrapper的buildInvokerChain方法中的invoker實例的invoke方法。

(十三)EchoFilter的invoke

可以參考《dubbo源碼解析(二十)遠程調用——Filter》的(八)EchoFilter

public Result invoke(Invoker invoker, Invocation inv) throws RpcException {
    // 如果調用的方法是回聲測試的方法 則直接返回結果,否則 調用下一個調用鏈
    if (inv.getMethodName().equals($ECHO) && inv.getArguments() != null && inv.getArguments().length == 1) {
        // 創建一個默認的AsyncRpcResult返回
        return AsyncRpcResult.newDefaultAsyncResult(inv.getArguments()[0], inv);
    }
    return invoker.invoke(inv);
}

該過濾器就是對回聲測試的調用進行攔截,上述源碼跟連接中源碼唯一區別就是改為了AsyncRpcResult。

(十四)ClassLoaderFilter的invoke

可以參考《dubbo源碼解析(二十)遠程調用——Filter》的(三)ClassLoaderFilter,用來做類加載器的切換。

(十五)GenericFilter的invoke

可以參考《dubbo源碼解析(二十)遠程調用——Filter》的(十一)GenericFilter。

public Result invoke(Invoker invoker, Invocation inv) throws RpcException {
    // 如果是泛化調用
    if ((inv.getMethodName().equals($INVOKE) || inv.getMethodName().equals($INVOKE_ASYNC))
            && inv.getArguments() != null
            && inv.getArguments().length == 3
            && !GenericService.class.isAssignableFrom(invoker.getInterface())) {
        // 獲得請求名字
        String name = ((String) inv.getArguments()[0]).trim();
        // 獲得請求參數類型
        String[] types = (String[]) inv.getArguments()[1];
        // 獲得請求參數
        Object[] args = (Object[]) inv.getArguments()[2];
        try {
            // 獲得方法
            Method method = ReflectUtils.findMethodByMethodSignature(invoker.getInterface(), name, types);
            // 獲得該方法的參數類型
            Class[] params = method.getParameterTypes();
            if (args == null) {
                args = new Object[params.length];
            }
            // 獲得附加值
            String generic = inv.getAttachment(GENERIC_KEY);

            if (StringUtils.isBlank(generic)) {
                generic = RpcContext.getContext().getAttachment(GENERIC_KEY);
            }

            // 如果附加值為空,在用上下文攜帶的附加值
            if (StringUtils.isEmpty(generic)
                    || ProtocolUtils.isDefaultGenericSerialization(generic)) {
                // 直接進行類型轉化
                args = PojoUtils.realize(args, params, method.getGenericParameterTypes());
            } else if (ProtocolUtils.isJavaGenericSerialization(generic)) {
                for (int i = 0; i < args.length; i++) {
                    if (byte[].class == args[i].getClass()) {
                        try (UnsafeByteArrayInputStream is = new UnsafeByteArrayInputStream((byte[]) args[i])) {
                            // 使用nativejava方式反序列化
                            args[i] = ExtensionLoader.getExtensionLoader(Serialization.class)
                                    .getExtension(GENERIC_SERIALIZATION_NATIVE_JAVA)
                                    .deserialize(null, is).readObject();
                        } catch (Exception e) {
                            throw new RpcException("Deserialize argument [" + (i + 1) + "] failed.", e);
                        }
                    } else {
                        throw new RpcException(
                                "Generic serialization [" +
                                        GENERIC_SERIALIZATION_NATIVE_JAVA +
                                        "] only support message type " +
                                        byte[].class +
                                        " and your message type is " +
                                        args[i].getClass());
                    }
                }
            } else if (ProtocolUtils.isBeanGenericSerialization(generic)) {
                for (int i = 0; i < args.length; i++) {
                    if (args[i] instanceof JavaBeanDescriptor) {
                        // 用JavaBean方式反序列化
                        args[i] = JavaBeanSerializeUtil.deserialize((JavaBeanDescriptor) args[i]);
                    } else {
                        throw new RpcException(
                                "Generic serialization [" +
                                        GENERIC_SERIALIZATION_BEAN +
                                        "] only support message type " +
                                        JavaBeanDescriptor.class.getName() +
                                        " and your message type is " +
                                        args[i].getClass().getName());
                    }
                }
            } else if (ProtocolUtils.isProtobufGenericSerialization(generic)) {
                // as proto3 only accept one protobuf parameter
                if (args.length == 1 && args[0] instanceof String) {
                    try (UnsafeByteArrayInputStream is =
                                 new UnsafeByteArrayInputStream(((String) args[0]).getBytes())) {
                        // 用protobuf-json進行反序列化
                        args[0] = ExtensionLoader.getExtensionLoader(Serialization.class)
                                .getExtension("" + GENERIC_SERIALIZATION_PROTOBUF)
                                .deserialize(null, is).readObject(method.getParameterTypes()[0]);
                    } catch (Exception e) {
                        throw new RpcException("Deserialize argument failed.", e);
                    }
                } else {
                    throw new RpcException(
                            "Generic serialization [" +
                                    GENERIC_SERIALIZATION_PROTOBUF +
                                    "] only support one" + String.class.getName() +
                                    " argument and your message size is " +
                                    args.length + " and type is" +
                                    args[0].getClass().getName());
                }
            }
            return invoker.invoke(new RpcInvocation(method, args, inv.getAttachments()));
        } catch (NoSuchMethodException e) {
            throw new RpcException(e.getMessage(), e);
        } catch (ClassNotFoundException e) {
            throw new RpcException(e.getMessage(), e);
        }
    }
    return invoker.invoke(inv);
}

static class GenericListener implements Listener {

    @Override
    public void onResponse(Result appResponse, Invoker invoker, Invocation inv) {
        // 如果是泛化調用
        if ((inv.getMethodName().equals($INVOKE) || inv.getMethodName().equals($INVOKE_ASYNC))
                && inv.getArguments() != null
                && inv.getArguments().length == 3
                && !GenericService.class.isAssignableFrom(invoker.getInterface())) {

            // 獲得序列化方式
            String generic = inv.getAttachment(GENERIC_KEY);
            // 如果為空,默認獲取會話域中的配置
            if (StringUtils.isBlank(generic)) {
                generic = RpcContext.getContext().getAttachment(GENERIC_KEY);
            }

            // 如果回調有異常,直接設置異常
            if (appResponse.hasException() && !(appResponse.getException() instanceof GenericException)) {
                appResponse.setException(new GenericException(appResponse.getException()));
            }
            // 如果是native java形式序列化
            if (ProtocolUtils.isJavaGenericSerialization(generic)) {
                try {
                    UnsafeByteArrayOutputStream os = new UnsafeByteArrayOutputStream(512);
                    // 使用native java形式序列化
                    ExtensionLoader.getExtensionLoader(Serialization.class).getExtension(GENERIC_SERIALIZATION_NATIVE_JAVA).serialize(null, os).writeObject(appResponse.getValue());
                    // 加入結果
                    appResponse.setValue(os.toByteArray());
                } catch (IOException e) {
                    throw new RpcException(
                            "Generic serialization [" +
                                    GENERIC_SERIALIZATION_NATIVE_JAVA +
                                    "] serialize result failed.", e);
                }
            } else if (ProtocolUtils.isBeanGenericSerialization(generic)) {
                // 用JavaBean方式序列化
                appResponse.setValue(JavaBeanSerializeUtil.serialize(appResponse.getValue(), JavaBeanAccessor.METHOD));
            } else if (ProtocolUtils.isProtobufGenericSerialization(generic)) {
                try {
                    UnsafeByteArrayOutputStream os = new UnsafeByteArrayOutputStream(512);
                    // 用protobuf-json進行序列化
                    ExtensionLoader.getExtensionLoader(Serialization.class)
                            .getExtension(GENERIC_SERIALIZATION_PROTOBUF)
                            .serialize(null, os).writeObject(appResponse.getValue());
                    appResponse.setValue(os.toString());
                } catch (IOException e) {
                    throw new RpcException("Generic serialization [" +
                            GENERIC_SERIALIZATION_PROTOBUF +
                            "] serialize result failed.", e);
                }
            } else {
                // 直接進行類型轉化并且設置值
                appResponse.setValue(PojoUtils.generalize(appResponse.getValue()));
            }
        }
    }

    @Override
    public void onError(Throwable t, Invoker invoker, Invocation invocation) {

    }
}

跟連接內的代碼對比,第一個就是對過濾器設計發生了變化,這個我在異步化改造里面會講到,第二個是新增了protobuf-json的泛化序列化方式。

(十六)ContextFilter的invoke

可以參考《dubbo源碼解析(二十)遠程調用——Filter》的(六)ContextFilter,最新代碼幾乎差不多,除了因為對Filter的設計做了修改以外,還有新增了tag路由的相關邏輯,tag相關部分我會在后續文章中講解,該類主要是做了初始化rpc上下文。

(十七)TraceFilter的invoke

可以參考《dubbo源碼解析(二十四)遠程調用——dubbo協議》的(十三)TraceFilter,該過濾器是增強的功能是通道的跟蹤,會在通道內把最大的調用次數和現在的調用數量放進去。方便使用telnet來跟蹤服務的調用次數等。

(十八)TimeoutFilter的invoke

可以參考《dubbo源碼解析(二十)遠程調用——Filter》的(十三)TimeoutFilter,該過濾器是當服務調用超時的時候,記錄告警日志。

(十九)MonitorFilter的invoke
public Result invoke(Invoker invoker, Invocation invocation) throws RpcException {
    // 如果開啟監控
    if (invoker.getUrl().hasParameter(MONITOR_KEY)) {
        // 設置開始監控時間
        invocation.setAttachment(MONITOR_FILTER_START_TIME, String.valueOf(System.currentTimeMillis()));
        // 對同時在線數量加1
        getConcurrent(invoker, invocation).incrementAndGet(); // count up
    }
    return invoker.invoke(invocation); // proceed invocation chain
}
class MonitorListener implements Listener {

    @Override
    public void onResponse(Result result, Invoker invoker, Invocation invocation) {
        // 如果開啟監控
        if (invoker.getUrl().hasParameter(MONITOR_KEY)) {
            // 收集監控對數據,并且更新監控數據
            collect(invoker, invocation, result, RpcContext.getContext().getRemoteHost(), Long.valueOf(invocation.getAttachment(MONITOR_FILTER_START_TIME)), false);
            // 同時在線監控數減1
            getConcurrent(invoker, invocation).decrementAndGet(); // count down
        }
    }

    @Override
    public void onError(Throwable t, Invoker invoker, Invocation invocation) {
        if (invoker.getUrl().hasParameter(MONITOR_KEY)) {
            // 收集監控對數據,并且更新監控數據
            collect(invoker, invocation, null, RpcContext.getContext().getRemoteHost(), Long.valueOf(invocation.getAttachment(MONITOR_FILTER_START_TIME)), true);
            // 同時在線監控數減1
            getConcurrent(invoker, invocation).decrementAndGet(); // count down
        }
    }

    private void collect(Invoker invoker, Invocation invocation, Result result, String remoteHost, long start, boolean error) {
        try {
            // 獲得監控的url
            URL monitorUrl = invoker.getUrl().getUrlParameter(MONITOR_KEY);
            // 通過該url獲得Monitor實例
            Monitor monitor = monitorFactory.getMonitor(monitorUrl);
            if (monitor == null) {
                return;
            }
            // 創建一個統計的url
            URL statisticsURL = createStatisticsUrl(invoker, invocation, result, remoteHost, start, error);
            // 把收集的信息更新并且發送信息
            monitor.collect(statisticsURL);
        } catch (Throwable t) {
            logger.warn("Failed to monitor count service " + invoker.getUrl() + ", cause: " + t.getMessage(), t);
        }
    }

    private URL createStatisticsUrl(Invoker invoker, Invocation invocation, Result result, String remoteHost, long start, boolean error) {
        // ---- service statistics ----
        // 調用服務消耗的時間
        long elapsed = System.currentTimeMillis() - start; // invocation cost
        // 獲得同時監控的數量
        int concurrent = getConcurrent(invoker, invocation).get(); // current concurrent count
        String application = invoker.getUrl().getParameter(APPLICATION_KEY);
        // 獲得服務名
        String service = invoker.getInterface().getName(); // service name
        // 獲得調用的方法名
        String method = RpcUtils.getMethodName(invocation); // method name
        // 獲得組
        String group = invoker.getUrl().getParameter(GROUP_KEY);
        // 獲得版本號
        String version = invoker.getUrl().getParameter(VERSION_KEY);

        int localPort;
        String remoteKey, remoteValue;
        // 如果是消費者端的監控
        if (CONSUMER_SIDE.equals(invoker.getUrl().getParameter(SIDE_KEY))) {
            // ---- for service consumer ----
            // 本地端口為0
            localPort = 0;
            // key為provider
            remoteKey = MonitorService.PROVIDER;
            // value為服務ip
            remoteValue = invoker.getUrl().getAddress();
        } else {
            // ---- for service provider ----
            // 端口為服務端口
            localPort = invoker.getUrl().getPort();
            // key為consumer
            remoteKey = MonitorService.CONSUMER;
            // value為遠程地址
            remoteValue = remoteHost;
        }
        String input = "", output = "";
        if (invocation.getAttachment(INPUT_KEY) != null) {
            input = invocation.getAttachment(INPUT_KEY);
        }
        if (result != null && result.getAttachment(OUTPUT_KEY) != null) {
            output = result.getAttachment(OUTPUT_KEY);
        }

        // 返回一個url
        return new URL(COUNT_PROTOCOL, NetUtils.getLocalHost(), localPort, service + PATH_SEPARATOR + method, MonitorService.APPLICATION, application, MonitorService.INTERFACE, service, MonitorService.METHOD, method, remoteKey, remoteValue, error ? MonitorService.FAILURE : MonitorService.SUCCESS, "1", MonitorService.ELAPSED, String.valueOf(elapsed), MonitorService.CONCURRENT, String.valueOf(concurrent), INPUT_KEY, input, OUTPUT_KEY, output, GROUP_KEY, group, VERSION_KEY, version);
    }

}

在invoke里面只是做了記錄開始監控的時間以及對同時監控的數量加1操作,當結果回調時,會對結果數據做搜集計算,最后通過監控服務記錄和發送最新信息。

(二十)ExceptionFilter的invoke

可以參考《dubbo源碼解析(二十)遠程調用——Filter》的(九)ExceptionFilter,該過濾器主要是對異常的處理。

(二十一)InvokerWrapper的invoke

可以參考《dubbo源碼解析(二十二)遠程調用——Protocol》的(五)InvokerWrapper。該類用了裝飾模式,不過并沒有實現實際的功能增強。

(二十二)DelegateProviderMetaDataInvoker的invoke
public Result invoke(Invocation invocation) throws RpcException {
    return invoker.invoke(invocation);
}

該類也是用了裝飾模式,不過該類是invoker和配置中心的適配類,其中也沒有進行實際的功能增強。

(二十三)AbstractProxyInvoker的invoke

可以參考《dubbo源碼解析(二十三)遠程調用——Proxy》的(二)AbstractProxyInvoker。不過代碼已經有更新了,下面貼出最新的代碼。

public Result invoke(Invocation invocation) throws RpcException {
    try {
        // 執行下一步
        Object value = doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments());
        // 把返回結果用CompletableFuture包裹
        CompletableFuture future = wrapWithFuture(value, invocation);
        // 創建AsyncRpcResult實例
        AsyncRpcResult asyncRpcResult = new AsyncRpcResult(invocation);
        future.whenComplete((obj, t) -> {
            AppResponse result = new AppResponse();
            // 如果拋出異常
            if (t != null) {
                // 屬于CompletionException異常
                if (t instanceof CompletionException) {
                    // 設置異常信息
                    result.setException(t.getCause());
                } else {
                    // 直接設置異常
                    result.setException(t);
                }
            } else {
                // 如果沒有異常,則把結果放入異步結果內
                result.setValue(obj);
            }
            // 完成
            asyncRpcResult.complete(result);
        });
        return asyncRpcResult;
    } catch (InvocationTargetException e) {
        if (RpcContext.getContext().isAsyncStarted() && !RpcContext.getContext().stopAsync()) {
            logger.error("Provider async started, but got an exception from the original method, cannot write the exception back to consumer because an async result may have returned the new thread.", e);
        }
        return AsyncRpcResult.newDefaultAsyncResult(null, e.getTargetException(), invocation);
    } catch (Throwable e) {
        throw new RpcException("Failed to invoke remote proxy method " + invocation.getMethodName() + " to " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}

這里主要是因為異步化改造而出現的代碼變化,我會在異步化改造中講到這部分。現在主要來看下一步。

(二十四)JavassistProxyFactory的getInvoker方法中匿名類的doInvoke

這里默認代理實現方式是Javassist。可以參考《dubbo源碼解析(二十三)遠程調用——Proxy》的(六)JavassistProxyFactory。其中Wrapper 是一個抽象類,其中 invokeMethod 是一個抽象方法。dubbo 會在運行時通過 Javassist 框架為 Wrapper 生成實現類,并實現 invokeMethod 方法,該方法最終會根據調用信息調用具體的服務。以 DemoServiceImpl 為例,Javassist 為其生成的代理類如下。

/** Wrapper0 是在運行時生成的,大家可使用 Arthas 進行反編譯 */
public class Wrapper0 extends Wrapper implements ClassGenerator.DC {
    public static String[] pns;
    public static Map pts;
    public static String[] mns;
    public static String[] dmns;
    public static Class[] mts0;

    // 省略其他方法

    public Object invokeMethod(Object object, String string, Class[] arrclass, Object[] arrobject) throws InvocationTargetException {
        DemoService demoService;
        try {
            // 類型轉換
            demoService = (DemoService)object;
        }
        catch (Throwable throwable) {
            throw new IllegalArgumentException(throwable);
        }
        try {
            // 根據方法名調用指定的方法
            if ("sayHello".equals(string) && arrclass.length == 1) {
                return demoService.sayHello((String)arrobject[0]);
            }
        }
        catch (Throwable throwable) {
            throw new InvocationTargetException(throwable);
        }
        throw new NoSuchMethodException(new StringBuffer().append("Not found method "").append(string).append("" in class com.alibaba.dubbo.demo.DemoService.").toString());
    }
}

然后就是直接調用的是對應的方法了。到此,方法執行完成了。

結果返回

可以看到我上述講到的(八)HeaderExchangeHandler的received和(九)HeaderExchangeHandler的handleRequest有好幾處channel.send方法的調用,也就是當結果返回的返回的時候,會主動發送執行結果給客戶端。當然發送的時候還是會對結果Response 對象進行編碼,編碼邏輯我就先不在這里闡述。

當客戶端接收到這個返回的消息時候,進行解碼后,識別為Response 對象,將該對象派發到線程池中,該過程跟服務端接收到調用請求到邏輯是一樣的,可以參考上述的解析,區別在于到(八)HeaderExchangeHandler的received方法的時候,執行的是handleResponse方法。

(九)HeaderExchangeHandler的handleResponse
static void handleResponse(Channel channel, Response response) throws RemotingException {
    // 如果響應不為空,并且不是心跳事件的響應,則調用received
    if (response != null && !response.isHeartbeat()) {
        DefaultFuture.received(channel, response);
    }
}
(十)DefaultFuture的received

可以參考《dubbo源碼解析(十)遠程通信——Exchange層》的(七)DefaultFuture,不過該類的繼承的是CompletableFuture,因為對異步化的改造,該類已經做了一些變化。

public static void received(Channel channel, Response response) {
    received(channel, response, false);
}

public static void received(Channel channel, Response response, boolean timeout) {
    try {
        // future集合中移除該請求的future,(響應id和請求id一一對應的)
        DefaultFuture future = FUTURES.remove(response.getId());
        if (future != null) {
            //獲得超時
            Timeout t = future.timeoutCheckTask;
            // 如果沒有超時,則取消timeoutCheckTask
            if (!timeout) {
                // decrease Time
                t.cancel();
            }
            // 接收響應結果
            future.doReceived(response);
        } else {
            logger.warn("The timeout response finally returned at "
                    + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
                    + ", response " + response
                    + (channel == null ? "" : ", channel: " + channel.getLocalAddress()
                    + " -> " + channel.getRemoteAddress()));
        }
    } finally {
        // 通道集合移除該請求對應的通道,代表著這一次請求結束
        CHANNELS.remove(response.getId());
    }
}

該方法中主要是對超時的處理,還有對應的請求和響應的匹配,也就是返回相應id的future。

(十一)DefaultFuture的doReceived

可以參考《dubbo源碼解析(十)遠程通信——Exchange層》的(七)DefaultFuture,不過因為運用了CompletableFuture,所以該方法完全重寫了。這部分的改造我也會在異步化改造中講述到。

private void doReceived(Response res) {
    // 如果結果為空,則拋出異常
    if (res == null) {
        throw new IllegalStateException("response cannot be null");
    }
    // 如果結果的狀態碼為ok
    if (res.getStatus() == Response.OK) {
        // 則future調用完成
        this.complete(res.getResult());
    } else if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) {
        // 如果超時,則返回一個超時異常
        this.completeExceptionally(new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage()));
    } else {
        // 否則返回一個RemotingException
        this.completeExceptionally(new RemotingException(channel, res.getErrorMessage()));
    }
}

隨后用戶線程即可從 DefaultFuture 實例中獲取到相應結果。

后記

該文章講解了dubbo調服務端接收到請求后一系列處理的所有步驟以及服務端怎么把結果發送給客戶端,是目前最新代碼的解析。因為里面涉及到很多流程,所以可以自己debug一步一步看一看整個過程。可以看到本文涉及到異步化改造已經很多了,這也是我在講解異步化改造前想先講解這些過程的原因。只有弄清楚這些過程,才能更加理解對于異步化改造的意義。下一篇文將講解異步化改造。

文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。

轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/74869.html

相關文章

  • dubbo源碼解析四十八)異步化改造

    摘要:大揭秘異步化改造目標從源碼的角度分析的新特性中對于異步化的改造原理。看源碼解析四十六消費端發送請求過程講到的十四的,在以前的邏輯會直接在方法中根據配置區分同步異步單向調用。改為關于可以參考源碼解析十遠程通信層的六。 2.7大揭秘——異步化改造 目標:從源碼的角度分析2.7的新特性中對于異步化的改造原理。 前言 dubbo中提供了很多類型的協議,關于協議的系列可以查看下面的文章: du...

    lijinke666 評論0 收藏0
  • dubbo源碼解析四十六)消費發送請求過程

    摘要:可以參考源碼解析二十四遠程調用協議的八。十六的該類也是用了適配器模式,該類主要的作用就是增加了心跳功能,可以參考源碼解析十遠程通信層的四。二十的可以參考源碼解析十七遠程通信的一。 2.7大揭秘——消費端發送請求過程 目標:從源碼的角度分析一個服務方法調用經歷怎么樣的磨難以后到達服務端。 前言 前一篇文章講到的是引用服務的過程,引用服務無非就是創建出一個代理。供消費者調用服務的相關方法。...

    fish 評論0 收藏0
  • dubbo源碼解析四十四)服務暴露過程

    摘要:服務暴露過程目標從源碼的角度分析服務暴露過程。導出服務,包含暴露服務到本地,和暴露服務到遠程兩個過程。其中服務暴露的第八步已經沒有了。將泛化調用版本號或者等信息加入獲得服務暴露地址和端口號,利用內數據組裝成。 dubbo服務暴露過程 目標:從源碼的角度分析服務暴露過程。 前言 本來這一篇一個寫異步化改造的內容,但是最近我一直在想,某一部分的優化改造該怎么去撰寫才能更加的讓讀者理解。我覺...

    light 評論0 收藏0
  • dubbo源碼解析四十三)2.7新特性

    摘要:大揭秘目標了解的新特性,以及版本升級的引導。四元數據改造我們知道以前的版本只有注冊中心,注冊中心的有數十個的鍵值對,包含了一個服務所有的元數據。 DUBBO——2.7大揭秘 目標:了解2.7的新特性,以及版本升級的引導。 前言 我們知道Dubbo在2011年開源,停止更新了一段時間。在2017 年 9 月 7 日,Dubbo 悄悄的在 GitHub 發布了 2.5.4 版本。隨后,版本...

    qqlcbb 評論0 收藏0
  • dubbo源碼解析四十五)服務引用過程

    摘要:服務引用過程目標從源碼的角度分析服務引用過程。并保留服務提供者的部分配置,比如版本,,時間戳等最后將合并后的配置設置為查詢字符串中。的可以參考源碼解析二十三遠程調用的一的源碼分析。 dubbo服務引用過程 目標:從源碼的角度分析服務引用過程。 前言 前面服務暴露過程的文章講解到,服務引用有兩種方式,一種就是直連,也就是直接指定服務的地址來進行引用,這種方式更多的時候被用來做服務測試,不...

    xiaowugui666 評論0 收藏0

發表評論

0條評論

yzzz

|高級講師

TA的文章

閱讀更多
最新活動
閱讀需要支付1元查看
<