摘要:源碼分析一創(chuàng)建該類(lèi)實(shí)現(xiàn)了接口,是分組集合的集群實(shí)現(xiàn)。三工廠類(lèi),獲得指定類(lèi)型的對(duì)象。后記該部分相關(guān)的源碼解析地址該文章講解了集群中關(guān)于分組聚合實(shí)現(xiàn)的部分。接下來(lái)我將開(kāi)始對(duì)集群模塊關(guān)于路由部分進(jìn)行講解。
集群——merger
目標(biāo):介紹dubbo中集群的分組聚合,介紹dubbo-cluster下merger包的源碼。前言
按組合并返回結(jié)果 ,比如菜單服務(wù),接口一樣,但有多種實(shí)現(xiàn),用group區(qū)分,現(xiàn)在消費(fèi)方需從每種group中調(diào)用一次返回結(jié)果,合并結(jié)果返回,這樣就可以實(shí)現(xiàn)聚合菜單項(xiàng)。這個(gè)時(shí)候就要用到分組聚合。
源碼分析 (一)MergeableClusterpublic class MergeableCluster implements Cluster { public static final String NAME = "mergeable"; @Override publicInvoker join(Directory directory) throws RpcException { // 創(chuàng)建MergeableClusterInvoker return new MergeableClusterInvoker (directory); } }
該類(lèi)實(shí)現(xiàn)了Cluster接口,是分組集合的集群實(shí)現(xiàn)。
(二)MergeableClusterInvoker該類(lèi)是分組聚合的實(shí)現(xiàn)類(lèi),其中最關(guān)機(jī)的就是invoke方法。
@Override @SuppressWarnings("rawtypes") public Result invoke(final Invocation invocation) throws RpcException { // 獲得invoker集合 List> invokers = directory.list(invocation); /** * 獲得是否merger */ String merger = getUrl().getMethodParameter(invocation.getMethodName(), Constants.MERGER_KEY); // 如果沒(méi)有設(shè)置需要聚合,則只調(diào)用一個(gè)invoker的 if (ConfigUtils.isEmpty(merger)) { // If a method doesn"t have a merger, only invoke one Group // 只要有一個(gè)可用就返回 for (final Invoker invoker : invokers) { if (invoker.isAvailable()) { return invoker.invoke(invocation); } } return invokers.iterator().next().invoke(invocation); } // 返回類(lèi)型 Class> returnType; try { // 獲得返回類(lèi)型 returnType = getInterface().getMethod( invocation.getMethodName(), invocation.getParameterTypes()).getReturnType(); } catch (NoSuchMethodException e) { returnType = null; } // 結(jié)果集合 Map > results = new HashMap >(); // 循環(huán)invokers for (final Invoker invoker : invokers) { // 獲得每次調(diào)用的future Future future = executor.submit(new Callable () { @Override public Result call() throws Exception { // 回調(diào),把返回結(jié)果放入future return invoker.invoke(new RpcInvocation(invocation, invoker)); } }); // 加入集合 results.put(invoker.getUrl().getServiceKey(), future); } Object result = null; List resultList = new ArrayList (results.size()); // 獲得超時(shí)時(shí)間 int timeout = getUrl().getMethodParameter(invocation.getMethodName(), Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); // 遍歷每一個(gè)結(jié)果 for (Map.Entry > entry : results.entrySet()) { Future future = entry.getValue(); try { // 獲得調(diào)用返回的結(jié)果 Result r = future.get(timeout, TimeUnit.MILLISECONDS); if (r.hasException()) { log.error("Invoke " + getGroupDescFromServiceKey(entry.getKey()) + " failed: " + r.getException().getMessage(), r.getException()); } else { // 加入集合 resultList.add(r); } } catch (Exception e) { throw new RpcException("Failed to invoke service " + entry.getKey() + ": " + e.getMessage(), e); } } // 如果為空,則返回空的結(jié)果 if (resultList.isEmpty()) { return new RpcResult((Object) null); } else if (resultList.size() == 1) { // 如果只有一個(gè)結(jié)果,則返回該結(jié)果 return resultList.iterator().next(); } // 如果返回類(lèi)型是void,也就是沒(méi)有返回值,那么返回空結(jié)果 if (returnType == void.class) { return new RpcResult((Object) null); } // 根據(jù)方法來(lái)合并,將調(diào)用返回結(jié)果的指定方法進(jìn)行合并 if (merger.startsWith(".")) { merger = merger.substring(1); Method method; try { // 獲得方法 method = returnType.getMethod(merger, returnType); } catch (NoSuchMethodException e) { throw new RpcException("Can not merge result because missing method [ " + merger + " ] in class [ " + returnType.getClass().getName() + " ]"); } // 有 Method ,進(jìn)行合并 if (!Modifier.isPublic(method.getModifiers())) { method.setAccessible(true); } // 從集合中移除 result = resultList.remove(0).getValue(); try { // 方法返回類(lèi)型匹配,合并時(shí),修改 result if (method.getReturnType() != void.class && method.getReturnType().isAssignableFrom(result.getClass())) { for (Result r : resultList) { result = method.invoke(result, r.getValue()); } } else { // 方法返回類(lèi)型不匹配,合并時(shí),不修改 result for (Result r : resultList) { method.invoke(result, r.getValue()); } } } catch (Exception e) { throw new RpcException("Can not merge result: " + e.getMessage(), e); } } else { // 基于 Merger Merger resultMerger; // 如果是默認(rèn)的方式 if (ConfigUtils.isDefault(merger)) { // 獲得該類(lèi)型的合并方式 resultMerger = MergerFactory.getMerger(returnType); } else { // 如果不是默認(rèn)的,則配置中指定獲得Merger的實(shí)現(xiàn)類(lèi) resultMerger = ExtensionLoader.getExtensionLoader(Merger.class).getExtension(merger); } if (resultMerger != null) { List
前面部分在講獲得調(diào)用的結(jié)果,后面部分是對(duì)結(jié)果的合并,合并有兩種方式,根據(jù)配置不同可用分為基于方法的合并和基于merger的合并。
(三)MergerFactoryMerger 工廠類(lèi),獲得指定類(lèi)型的Merger 對(duì)象。
public class MergerFactory { /** * Merger 對(duì)象緩存 */ private static final ConcurrentMap, Merger>> mergerCache = new ConcurrentHashMap , Merger>>(); /** * 獲得指定類(lèi)型的Merger對(duì)象 * @param returnType * @param * @return */ public static Merger getMerger(Class returnType) { Merger result; // 如果類(lèi)型是集合 if (returnType.isArray()) { // 獲得類(lèi)型 Class type = returnType.getComponentType(); // 從緩存中獲得該類(lèi)型的Merger對(duì)象 result = mergerCache.get(type); // 如果為空,則 if (result == null) { // 初始化所有的 Merger 擴(kuò)展對(duì)象,到 mergerCache 緩存中。 loadMergers(); // 從集合中取出對(duì)應(yīng)的Merger對(duì)象 result = mergerCache.get(type); } // 如果結(jié)果為空,則直接返回ArrayMerger的單例 if (result == null && !type.isPrimitive()) { result = ArrayMerger.INSTANCE; } } else { // 否則直接從mergerCache中取出 result = mergerCache.get(returnType); // 如果為空 if (result == null) { // 初始化所有的 Merger 擴(kuò)展對(duì)象,到 mergerCache 緩存中。 loadMergers(); // 從集合中取出 result = mergerCache.get(returnType); } } return result; } /** * 初始化所有的 Merger 擴(kuò)展對(duì)象,到 mergerCache 緩存中。 */ static void loadMergers() { // 獲得Merger所有的擴(kuò)展對(duì)象名 Set names = ExtensionLoader.getExtensionLoader(Merger.class) .getSupportedExtensions(); // 遍歷 for (String name : names) { // 加載每一個(gè)擴(kuò)展實(shí)現(xiàn),然后放入緩存。 Merger m = ExtensionLoader.getExtensionLoader(Merger.class).getExtension(name); mergerCache.putIfAbsent(ReflectUtils.getGenericClass(m.getClass()), m); } } }
邏輯比較簡(jiǎn)單。
(四)ArrayMerger因?yàn)椴煌念?lèi)型有不同的Merger實(shí)現(xiàn),我們可以來(lái)看看這個(gè)圖片:
可以看到有好多好多,我就講解其中的一種,偷懶一下,其他的麻煩有興趣的去看看源碼了。
public class ArrayMerger implements Merger{ /** * 單例 */ public static final ArrayMerger INSTANCE = new ArrayMerger(); @Override public Object[] merge(Object[]... others) { // 如果長(zhǎng)度為0 則直接返回 if (others.length == 0) { return null; } // 總長(zhǎng) int totalLen = 0; // 遍歷所有需要合并的對(duì)象 for (int i = 0; i < others.length; i++) { Object item = others[i]; // 如果為數(shù)組 if (item != null && item.getClass().isArray()) { // 累加數(shù)組長(zhǎng)度 totalLen += Array.getLength(item); } else { throw new IllegalArgumentException((i + 1) + "th argument is not an array"); } } if (totalLen == 0) { return null; } // 獲得數(shù)組類(lèi)型 Class> type = others[0].getClass().getComponentType(); // 創(chuàng)建長(zhǎng)度 Object result = Array.newInstance(type, totalLen); int index = 0; // 遍歷需要合并的對(duì)象 for (Object array : others) { // 遍歷每個(gè)數(shù)組中的數(shù)據(jù) for (int i = 0; i < Array.getLength(array); i++) { // 加入到最終結(jié)果中 Array.set(result, index++, Array.get(array, i)); } } return (Object[]) result; } }
是不是很簡(jiǎn)單,就是循環(huán)合并就可以了。
后記該部分相關(guān)的源碼解析地址:https://github.com/CrazyHZM/i...
該文章講解了集群中關(guān)于分組聚合實(shí)現(xiàn)的部分。接下來(lái)我將開(kāi)始對(duì)集群模塊關(guān)于路由部分進(jìn)行講解。
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://specialneedsforspecialkids.com/yun/77431.html
摘要:大揭秘異步化改造目標(biāo)從源碼的角度分析的新特性中對(duì)于異步化的改造原理。看源碼解析四十六消費(fèi)端發(fā)送請(qǐng)求過(guò)程講到的十四的,在以前的邏輯會(huì)直接在方法中根據(jù)配置區(qū)分同步異步單向調(diào)用。改為關(guān)于可以參考源碼解析十遠(yuǎn)程通信層的六。 2.7大揭秘——異步化改造 目標(biāo):從源碼的角度分析2.7的新特性中對(duì)于異步化的改造原理。 前言 dubbo中提供了很多類(lèi)型的協(xié)議,關(guān)于協(xié)議的系列可以查看下面的文章: du...
摘要:簡(jiǎn)單來(lái)說(shuō)就是應(yīng)對(duì)出錯(cuò)情況采取的策略。由于重試,重試次數(shù)過(guò)多時(shí),帶來(lái)時(shí)延。通常用于實(shí)時(shí)性要求較高的讀操作,但需要浪費(fèi)更多服務(wù)資源。通常用于通知所有提供者更新緩存或日志等本地資源信息。 我們?cè)賮?lái)回顧一下官網(wǎng)的對(duì)于集群容錯(cuò)的架構(gòu)設(shè)計(jì)圖showImg(https://segmentfault.com/img/remote/1460000016972729?w=1000&h=502); Clus...
摘要:可以參考源碼解析二十四遠(yuǎn)程調(diào)用協(xié)議的八。十六的該類(lèi)也是用了適配器模式,該類(lèi)主要的作用就是增加了心跳功能,可以參考源碼解析十遠(yuǎn)程通信層的四。二十的可以參考源碼解析十七遠(yuǎn)程通信的一。 2.7大揭秘——消費(fèi)端發(fā)送請(qǐng)求過(guò)程 目標(biāo):從源碼的角度分析一個(gè)服務(wù)方法調(diào)用經(jīng)歷怎么樣的磨難以后到達(dá)服務(wù)端。 前言 前一篇文章講到的是引用服務(wù)的過(guò)程,引用服務(wù)無(wú)非就是創(chuàng)建出一個(gè)代理。供消費(fèi)者調(diào)用服務(wù)的相關(guān)方法。...
摘要:集群目標(biāo)介紹中集群的負(fù)載均衡,介紹下包的源碼。源碼分析一該類(lèi)實(shí)現(xiàn)了接口,是負(fù)載均衡的抽象類(lèi),提供了權(quán)重計(jì)算的功能。四該類(lèi)是負(fù)載均衡基于一致性的邏輯實(shí)現(xiàn)。 集群——LoadBalance 目標(biāo):介紹dubbo中集群的負(fù)載均衡,介紹dubbo-cluster下loadBalance包的源碼。 前言 負(fù)載均衡,說(shuō)的通俗點(diǎn)就是要一碗水端平。在這個(gè)時(shí)代,公平是很重要的,在網(wǎng)絡(luò)請(qǐng)求的時(shí)候同樣是這個(gè)...
摘要:遠(yuǎn)程調(diào)用開(kāi)篇目標(biāo)介紹之后解讀遠(yuǎn)程調(diào)用模塊的內(nèi)容如何編排介紹中的包結(jié)構(gòu)設(shè)計(jì)以及最外層的的源碼解析。十該類(lèi)就是遠(yuǎn)程調(diào)用的上下文,貫穿著整個(gè)調(diào)用,例如調(diào)用,然后調(diào)用。十五該類(lèi)是系統(tǒng)上下文,僅供內(nèi)部使用。 遠(yuǎn)程調(diào)用——開(kāi)篇 目標(biāo):介紹之后解讀遠(yuǎn)程調(diào)用模塊的內(nèi)容如何編排、介紹dubbo-rpc-api中的包結(jié)構(gòu)設(shè)計(jì)以及最外層的的源碼解析。 前言 最近我面臨著一個(gè)選擇,因?yàn)閐ubbo 2.7.0-...
閱讀 2536·2021-10-12 10:12
閱讀 1714·2019-08-30 15:52
閱讀 2451·2019-08-30 13:04
閱讀 1739·2019-08-29 18:33
閱讀 965·2019-08-29 16:28
閱讀 452·2019-08-29 12:33
閱讀 2061·2019-08-26 13:33
閱讀 2363·2019-08-26 11:36