国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專(zhuān)欄INFORMATION COLUMN

dubbo源碼解析(三十九)集群——merger

lscho / 1581人閱讀

摘要:源碼分析一創(chuàng)建該類(lèi)實(shí)現(xiàn)了接口,是分組集合的集群實(shí)現(xiàn)。三工廠類(lèi),獲得指定類(lèi)型的對(duì)象。后記該部分相關(guān)的源碼解析地址該文章講解了集群中關(guān)于分組聚合實(shí)現(xiàn)的部分。接下來(lái)我將開(kāi)始對(duì)集群模塊關(guān)于路由部分進(jìn)行講解。

集群——merger
目標(biāo):介紹dubbo中集群的分組聚合,介紹dubbo-cluster下merger包的源碼。
前言

按組合并返回結(jié)果 ,比如菜單服務(wù),接口一樣,但有多種實(shí)現(xiàn),用group區(qū)分,現(xiàn)在消費(fèi)方需從每種group中調(diào)用一次返回結(jié)果,合并結(jié)果返回,這樣就可以實(shí)現(xiàn)聚合菜單項(xiàng)。這個(gè)時(shí)候就要用到分組聚合。

源碼分析 (一)MergeableCluster
public class MergeableCluster implements Cluster {

    public static final String NAME = "mergeable";

    @Override
    public  Invoker join(Directory directory) throws RpcException {
        // 創(chuàng)建MergeableClusterInvoker
        return new MergeableClusterInvoker(directory);
    }

}

該類(lèi)實(shí)現(xiàn)了Cluster接口,是分組集合的集群實(shí)現(xiàn)。

(二)MergeableClusterInvoker

該類(lèi)是分組聚合的實(shí)現(xiàn)類(lèi),其中最關(guān)機(jī)的就是invoke方法。

@Override
@SuppressWarnings("rawtypes")
public Result invoke(final Invocation invocation) throws RpcException {
    // 獲得invoker集合
    List> invokers = directory.list(invocation);

    /**
     * 獲得是否merger
     */
    String merger = getUrl().getMethodParameter(invocation.getMethodName(), Constants.MERGER_KEY);
    // 如果沒(méi)有設(shè)置需要聚合,則只調(diào)用一個(gè)invoker的
    if (ConfigUtils.isEmpty(merger)) { // If a method doesn"t have a merger, only invoke one Group
        // 只要有一個(gè)可用就返回
        for (final Invoker invoker : invokers) {
            if (invoker.isAvailable()) {
                return invoker.invoke(invocation);
            }
        }
        return invokers.iterator().next().invoke(invocation);
    }

    // 返回類(lèi)型
    Class returnType;
    try {
        // 獲得返回類(lèi)型
        returnType = getInterface().getMethod(
                invocation.getMethodName(), invocation.getParameterTypes()).getReturnType();
    } catch (NoSuchMethodException e) {
        returnType = null;
    }

    // 結(jié)果集合
    Map> results = new HashMap>();
    // 循環(huán)invokers
    for (final Invoker invoker : invokers) {
        // 獲得每次調(diào)用的future
        Future future = executor.submit(new Callable() {
            @Override
            public Result call() throws Exception {
                // 回調(diào),把返回結(jié)果放入future
                return invoker.invoke(new RpcInvocation(invocation, invoker));
            }
        });
        // 加入集合
        results.put(invoker.getUrl().getServiceKey(), future);
    }

    Object result = null;

    List resultList = new ArrayList(results.size());

    // 獲得超時(shí)時(shí)間
    int timeout = getUrl().getMethodParameter(invocation.getMethodName(), Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
    // 遍歷每一個(gè)結(jié)果
    for (Map.Entry> entry : results.entrySet()) {
        Future future = entry.getValue();
        try {
            // 獲得調(diào)用返回的結(jié)果
            Result r = future.get(timeout, TimeUnit.MILLISECONDS);
            if (r.hasException()) {
                log.error("Invoke " + getGroupDescFromServiceKey(entry.getKey()) + 
                                " failed: " + r.getException().getMessage(), 
                        r.getException());
            } else {
                // 加入集合
                resultList.add(r);
            }
        } catch (Exception e) {
            throw new RpcException("Failed to invoke service " + entry.getKey() + ": " + e.getMessage(), e);
        }
    }

    // 如果為空,則返回空的結(jié)果
    if (resultList.isEmpty()) {
        return new RpcResult((Object) null);
    } else if (resultList.size() == 1) {
        // 如果只有一個(gè)結(jié)果,則返回該結(jié)果
        return resultList.iterator().next();
    }

    // 如果返回類(lèi)型是void,也就是沒(méi)有返回值,那么返回空結(jié)果
    if (returnType == void.class) {
        return new RpcResult((Object) null);
    }

    // 根據(jù)方法來(lái)合并,將調(diào)用返回結(jié)果的指定方法進(jìn)行合并
    if (merger.startsWith(".")) {
        merger = merger.substring(1);
        Method method;
        try {
            // 獲得方法
            method = returnType.getMethod(merger, returnType);
        } catch (NoSuchMethodException e) {
            throw new RpcException("Can not merge result because missing method [ " + merger + " ] in class [ " + 
                    returnType.getClass().getName() + " ]");
        }

        // 有 Method ,進(jìn)行合并
        if (!Modifier.isPublic(method.getModifiers())) {
            method.setAccessible(true);
        }
        // 從集合中移除
        result = resultList.remove(0).getValue();
        try {
            // 方法返回類(lèi)型匹配,合并時(shí),修改 result
            if (method.getReturnType() != void.class
                    && method.getReturnType().isAssignableFrom(result.getClass())) {
                for (Result r : resultList) {
                    result = method.invoke(result, r.getValue());
                }
            } else {
                // 方法返回類(lèi)型不匹配,合并時(shí),不修改 result
                for (Result r : resultList) {
                    method.invoke(result, r.getValue());
                }
            }
        } catch (Exception e) {
            throw new RpcException("Can not merge result: " + e.getMessage(), e);
        }
    } else {
        // 基于 Merger
        Merger resultMerger;
        // 如果是默認(rèn)的方式
        if (ConfigUtils.isDefault(merger)) {
            // 獲得該類(lèi)型的合并方式
            resultMerger = MergerFactory.getMerger(returnType);
        } else {
            // 如果不是默認(rèn)的,則配置中指定獲得Merger的實(shí)現(xiàn)類(lèi)
            resultMerger = ExtensionLoader.getExtensionLoader(Merger.class).getExtension(merger);
        }
        if (resultMerger != null) {
            List rets = new ArrayList(resultList.size());
            // 遍歷返回結(jié)果
            for (Result r : resultList) {
                // 加入到rets
                rets.add(r.getValue());
            }
            // 合并
            result = resultMerger.merge(
                    rets.toArray((Object[]) Array.newInstance(returnType, 0)));
        } else {
            throw new RpcException("There is no merger to merge result.");
        }
    }
    // 返回結(jié)果
    return new RpcResult(result);
}

前面部分在講獲得調(diào)用的結(jié)果,后面部分是對(duì)結(jié)果的合并,合并有兩種方式,根據(jù)配置不同可用分為基于方法的合并和基于merger的合并。

(三)MergerFactory

Merger 工廠類(lèi),獲得指定類(lèi)型的Merger 對(duì)象。

public class MergerFactory {

    /**
     * Merger 對(duì)象緩存
     */
    private static final ConcurrentMap, Merger> mergerCache =
            new ConcurrentHashMap, Merger>();

    /**
     * 獲得指定類(lèi)型的Merger對(duì)象
     * @param returnType
     * @param 
     * @return
     */
    public static  Merger getMerger(Class returnType) {
        Merger result;
        // 如果類(lèi)型是集合
        if (returnType.isArray()) {
            // 獲得類(lèi)型
            Class type = returnType.getComponentType();
            // 從緩存中獲得該類(lèi)型的Merger對(duì)象
            result = mergerCache.get(type);
            // 如果為空,則
            if (result == null) {
                // 初始化所有的 Merger 擴(kuò)展對(duì)象,到 mergerCache 緩存中。
                loadMergers();
                // 從集合中取出對(duì)應(yīng)的Merger對(duì)象
                result = mergerCache.get(type);
            }
            // 如果結(jié)果為空,則直接返回ArrayMerger的單例
            if (result == null && !type.isPrimitive()) {
                result = ArrayMerger.INSTANCE;
            }
        } else {
            // 否則直接從mergerCache中取出
            result = mergerCache.get(returnType);
            // 如果為空
            if (result == null) {
                // 初始化所有的 Merger 擴(kuò)展對(duì)象,到 mergerCache 緩存中。
                loadMergers();
                // 從集合中取出
                result = mergerCache.get(returnType);
            }
        }
        return result;
    }

    /**
     * 初始化所有的 Merger 擴(kuò)展對(duì)象,到 mergerCache 緩存中。
     */
    static void loadMergers() {
        // 獲得Merger所有的擴(kuò)展對(duì)象名
        Set names = ExtensionLoader.getExtensionLoader(Merger.class)
                .getSupportedExtensions();
        // 遍歷
        for (String name : names) {
            // 加載每一個(gè)擴(kuò)展實(shí)現(xiàn),然后放入緩存。
            Merger m = ExtensionLoader.getExtensionLoader(Merger.class).getExtension(name);
            mergerCache.putIfAbsent(ReflectUtils.getGenericClass(m.getClass()), m);
        }
    }

}

邏輯比較簡(jiǎn)單。

(四)ArrayMerger

因?yàn)椴煌念?lèi)型有不同的Merger實(shí)現(xiàn),我們可以來(lái)看看這個(gè)圖片:

可以看到有好多好多,我就講解其中的一種,偷懶一下,其他的麻煩有興趣的去看看源碼了。

public class ArrayMerger implements Merger {

    /**
     * 單例
     */
    public static final ArrayMerger INSTANCE = new ArrayMerger();

    @Override
    public Object[] merge(Object[]... others) {
        // 如果長(zhǎng)度為0  則直接返回
        if (others.length == 0) {
            return null;
        }
        // 總長(zhǎng)
        int totalLen = 0;
        // 遍歷所有需要合并的對(duì)象
        for (int i = 0; i < others.length; i++) {
            Object item = others[i];
            // 如果為數(shù)組
            if (item != null && item.getClass().isArray()) {
                // 累加數(shù)組長(zhǎng)度
                totalLen += Array.getLength(item);
            } else {
                throw new IllegalArgumentException((i + 1) + "th argument is not an array");
            }
        }

        if (totalLen == 0) {
            return null;
        }

        // 獲得數(shù)組類(lèi)型
        Class type = others[0].getClass().getComponentType();

        // 創(chuàng)建長(zhǎng)度
        Object result = Array.newInstance(type, totalLen);
        int index = 0;
        // 遍歷需要合并的對(duì)象
        for (Object array : others) {
            // 遍歷每個(gè)數(shù)組中的數(shù)據(jù)
            for (int i = 0; i < Array.getLength(array); i++) {
                // 加入到最終結(jié)果中
                Array.set(result, index++, Array.get(array, i));
            }
        }
        return (Object[]) result;
    }

}

是不是很簡(jiǎn)單,就是循環(huán)合并就可以了。

后記
該部分相關(guān)的源碼解析地址:https://github.com/CrazyHZM/i...

該文章講解了集群中關(guān)于分組聚合實(shí)現(xiàn)的部分。接下來(lái)我將開(kāi)始對(duì)集群模塊關(guān)于路由部分進(jìn)行講解。

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://specialneedsforspecialkids.com/yun/77431.html

相關(guān)文章

  • dubbo源碼解析(四十八)異步化改造

    摘要:大揭秘異步化改造目標(biāo)從源碼的角度分析的新特性中對(duì)于異步化的改造原理。看源碼解析四十六消費(fèi)端發(fā)送請(qǐng)求過(guò)程講到的十四的,在以前的邏輯會(huì)直接在方法中根據(jù)配置區(qū)分同步異步單向調(diào)用。改為關(guān)于可以參考源碼解析十遠(yuǎn)程通信層的六。 2.7大揭秘——異步化改造 目標(biāo):從源碼的角度分析2.7的新特性中對(duì)于異步化的改造原理。 前言 dubbo中提供了很多類(lèi)型的協(xié)議,關(guān)于協(xié)議的系列可以查看下面的文章: du...

    lijinke666 評(píng)論0 收藏0
  • dubbo源碼解析——cluster

    摘要:簡(jiǎn)單來(lái)說(shuō)就是應(yīng)對(duì)出錯(cuò)情況采取的策略。由于重試,重試次數(shù)過(guò)多時(shí),帶來(lái)時(shí)延。通常用于實(shí)時(shí)性要求較高的讀操作,但需要浪費(fèi)更多服務(wù)資源。通常用于通知所有提供者更新緩存或日志等本地資源信息。 我們?cè)賮?lái)回顧一下官網(wǎng)的對(duì)于集群容錯(cuò)的架構(gòu)設(shè)計(jì)圖showImg(https://segmentfault.com/img/remote/1460000016972729?w=1000&h=502); Clus...

    seal_de 評(píng)論0 收藏0
  • dubbo源碼解析(四十六)消費(fèi)端發(fā)送請(qǐng)求過(guò)程

    摘要:可以參考源碼解析二十四遠(yuǎn)程調(diào)用協(xié)議的八。十六的該類(lèi)也是用了適配器模式,該類(lèi)主要的作用就是增加了心跳功能,可以參考源碼解析十遠(yuǎn)程通信層的四。二十的可以參考源碼解析十七遠(yuǎn)程通信的一。 2.7大揭秘——消費(fèi)端發(fā)送請(qǐng)求過(guò)程 目標(biāo):從源碼的角度分析一個(gè)服務(wù)方法調(diào)用經(jīng)歷怎么樣的磨難以后到達(dá)服務(wù)端。 前言 前一篇文章講到的是引用服務(wù)的過(guò)程,引用服務(wù)無(wú)非就是創(chuàng)建出一個(gè)代理。供消費(fèi)者調(diào)用服務(wù)的相關(guān)方法。...

    fish 評(píng)論0 收藏0
  • dubbo源碼解析三十八)集群——LoadBalance

    摘要:集群目標(biāo)介紹中集群的負(fù)載均衡,介紹下包的源碼。源碼分析一該類(lèi)實(shí)現(xiàn)了接口,是負(fù)載均衡的抽象類(lèi),提供了權(quán)重計(jì)算的功能。四該類(lèi)是負(fù)載均衡基于一致性的邏輯實(shí)現(xiàn)。 集群——LoadBalance 目標(biāo):介紹dubbo中集群的負(fù)載均衡,介紹dubbo-cluster下loadBalance包的源碼。 前言 負(fù)載均衡,說(shuō)的通俗點(diǎn)就是要一碗水端平。在這個(gè)時(shí)代,公平是很重要的,在網(wǎng)絡(luò)請(qǐng)求的時(shí)候同樣是這個(gè)...

    不知名網(wǎng)友 評(píng)論0 收藏0
  • dubbo源碼解析十九)遠(yuǎn)程調(diào)用——開(kāi)篇

    摘要:遠(yuǎn)程調(diào)用開(kāi)篇目標(biāo)介紹之后解讀遠(yuǎn)程調(diào)用模塊的內(nèi)容如何編排介紹中的包結(jié)構(gòu)設(shè)計(jì)以及最外層的的源碼解析。十該類(lèi)就是遠(yuǎn)程調(diào)用的上下文,貫穿著整個(gè)調(diào)用,例如調(diào)用,然后調(diào)用。十五該類(lèi)是系統(tǒng)上下文,僅供內(nèi)部使用。 遠(yuǎn)程調(diào)用——開(kāi)篇 目標(biāo):介紹之后解讀遠(yuǎn)程調(diào)用模塊的內(nèi)容如何編排、介紹dubbo-rpc-api中的包結(jié)構(gòu)設(shè)計(jì)以及最外層的的源碼解析。 前言 最近我面臨著一個(gè)選擇,因?yàn)閐ubbo 2.7.0-...

    jayce 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

最新活動(dòng)
閱讀需要支付1元查看
<