国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專(zhuān)欄INFORMATION COLUMN

dubbo源碼解析(三十五)集群——cluster

gself / 3420人閱讀

摘要:失敗安全,出現(xiàn)異常時(shí),直接忽略。失敗自動(dòng)恢復(fù),在調(diào)用失敗后,返回一個(gè)空結(jié)果給服務(wù)提供者。源碼分析一該類(lèi)實(shí)現(xiàn)了接口,是集群的抽象類(lèi)。

集群——cluster
目標(biāo):介紹dubbo中集群容錯(cuò)的幾種模式,介紹dubbo-cluster下support包的源碼。
前言

集群容錯(cuò)還是很好理解的,就是當(dāng)你調(diào)用失敗的時(shí)候所作出的措施。先來(lái)看看有哪些模式:

圖有點(diǎn)小,見(jiàn)諒,不過(guò)可以瞇著眼睛看稍微能看出來(lái)一點(diǎn),每一個(gè)Cluster實(shí)現(xiàn)類(lèi)都對(duì)應(yīng)著一個(gè)invoker,因?yàn)檫@個(gè)模式啟用的時(shí)間點(diǎn)就是在調(diào)用的時(shí)候,而我在之前的文章里面講過(guò),invoker貫穿來(lái)整個(gè)服務(wù)的調(diào)用。不過(guò)這里除了調(diào)用失敗的一些模式外,還有幾個(gè)特別的模式,他們應(yīng)該說(shuō)成是失敗的措施,而已調(diào)用的方式。

Failsafe Cluster:失敗安全,出現(xiàn)異常時(shí),直接忽略。失敗安全就是當(dāng)調(diào)用過(guò)程中出現(xiàn)異常時(shí),F(xiàn)ailsafeClusterInvoker 僅會(huì)打印異常,而不會(huì)拋出異常。適用于寫(xiě)入審計(jì)日志等操作

Failover Cluster:失敗自動(dòng)切換,當(dāng)調(diào)用出現(xiàn)失敗的時(shí)候,會(huì)自動(dòng)切換集群中其他服務(wù)器,來(lái)獲得invoker重試,通常用于讀操作,但重試會(huì)帶來(lái)更長(zhǎng)延遲。一般都會(huì)設(shè)置重試次數(shù)。

Failfast Cluster:只會(huì)進(jìn)行一次調(diào)用,失敗后立即拋出異常。適用于冪等操作,比如新增記錄。

Failback Cluster:失敗自動(dòng)恢復(fù),在調(diào)用失敗后,返回一個(gè)空結(jié)果給服務(wù)提供者。并通過(guò)定時(shí)任務(wù)對(duì)失敗的調(diào)用記錄并且重傳,適合執(zhí)行消息通知等操作。

Forking Cluster:會(huì)在線程池中運(yùn)行多個(gè)線程,來(lái)調(diào)用多個(gè)服務(wù)器,只要一個(gè)成功即返回。通常用于實(shí)時(shí)性要求較高的讀操作,但需要浪費(fèi)更多服務(wù)資源。一般會(huì)設(shè)置最大并行數(shù)。

Available Cluster:調(diào)用第一個(gè)可用的服務(wù)器,僅僅應(yīng)用于多注冊(cè)中心。

Broadcast Cluster:廣播調(diào)用所有提供者,逐個(gè)調(diào)用,在循環(huán)調(diào)用結(jié)束后,只要任意一臺(tái)報(bào)錯(cuò)就報(bào)錯(cuò)。通常用于通知所有提供者更新緩存或日志等本地資源信息

Mergeable Cluster:該部分在分組聚合講述。

MockClusterWrapper:該部分在本地偽裝講述。

源碼分析 (一)AbstractClusterInvoker

該類(lèi)實(shí)現(xiàn)了Invoker接口,是集群Invoker的抽象類(lèi)。

1.屬性
private static final Logger logger = LoggerFactory
        .getLogger(AbstractClusterInvoker.class);
/**
 * 目錄,包含多個(gè)invoker
 */
protected final Directory directory;

/**
 * 是否需要核對(duì)可用
 */
protected final boolean availablecheck;

/**
 * 是否銷(xiāo)毀
 */
private AtomicBoolean destroyed = new AtomicBoolean(false);

/**
 * 粘滯連接的Invoker
 */
private volatile Invoker stickyInvoker = null;
2.select
protected Invoker select(LoadBalance loadbalance, Invocation invocation, List> invokers, List> selected) throws RpcException {
    // 如果invokers為空,則返回null
    if (invokers == null || invokers.isEmpty())
        return null;
    // 獲得方法名
    String methodName = invocation == null ? "" : invocation.getMethodName();

    // 是否啟動(dòng)了粘滯連接
    boolean sticky = invokers.get(0).getUrl().getMethodParameter(methodName, Constants.CLUSTER_STICKY_KEY, Constants.DEFAULT_CLUSTER_STICKY);
    {
        //ignore overloaded method
        // 如果上一次粘滯連接的調(diào)用不在可選的提供者列合內(nèi),則直接設(shè)置為空
        if (stickyInvoker != null && !invokers.contains(stickyInvoker)) {
            stickyInvoker = null;
        }
        //ignore concurrency problem
        // stickyInvoker不為null,并且沒(méi)在已選列表中,返回上次的服務(wù)提供者stickyInvoker,但之前強(qiáng)制校驗(yàn)可達(dá)性。
        // 由于stickyInvoker不能包含在selected列表中,通過(guò)代碼看,可以得知forking和failover集群策略,用不了sticky屬性
        if (sticky && stickyInvoker != null && (selected == null || !selected.contains(stickyInvoker))) {
            if (availablecheck && stickyInvoker.isAvailable()) {
                return stickyInvoker;
            }
        }
    }
    // 利用負(fù)載均衡選一個(gè)提供者
    Invoker invoker = doSelect(loadbalance, invocation, invokers, selected);

    // 如果啟動(dòng)粘滯連接,則記錄這一次的調(diào)用
    if (sticky) {
        stickyInvoker = invoker;
    }
    return invoker;
}

該方法實(shí)現(xiàn)了使用負(fù)載均衡策略選擇一個(gè)調(diào)用者。首先,使用loadbalance選擇一個(gè)調(diào)用者。如果此調(diào)用者位于先前選擇的列表中,或者如果此調(diào)用者不可用,則重新選擇,否則返回第一個(gè)選定的調(diào)用者。重新選擇,重選的驗(yàn)證規(guī)則:選擇>可用。這條規(guī)則可以保證所選的調(diào)用者最少有機(jī)會(huì)成為之前選擇的列表中的一個(gè),也是保證這個(gè)調(diào)用程序可用。

3.doSelect
private Invoker doSelect(LoadBalance loadbalance, Invocation invocation, List> invokers, List> selected) throws RpcException {
    if (invokers == null || invokers.isEmpty())
        return null;
    // 如果只有一個(gè) ,就直接返回這個(gè)
    if (invokers.size() == 1)
        return invokers.get(0);
    // 如果沒(méi)有指定用哪個(gè)負(fù)載均衡策略,則默認(rèn)用隨機(jī)負(fù)載均衡策略
    if (loadbalance == null) {
        loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE);
    }
    // 調(diào)用負(fù)載均衡選擇
    Invoker invoker = loadbalance.select(invokers, getUrl(), invocation);

    //If the `invoker` is in the  `selected` or invoker is unavailable && availablecheck is true, reselect.
    // 如果選擇的提供者,已在selected中或者不可用則重新選擇
    if ((selected != null && selected.contains(invoker))
            || (!invoker.isAvailable() && getUrl() != null && availablecheck)) {
        try {
            // 重新選擇
            Invoker rinvoker = reselect(loadbalance, invocation, invokers, selected, availablecheck);
            if (rinvoker != null) {
                invoker = rinvoker;
            } else {
                //Check the index of current selected invoker, if it"s not the last one, choose the one at index+1.
                // 如果重新選擇失敗,看下第一次選的位置,如果不是最后,選+1位置.
                int index = invokers.indexOf(invoker);
                try {
                    //Avoid collision
                    // 最后再避免選擇到同一個(gè)invoker
                    invoker = index < invokers.size() - 1 ? invokers.get(index + 1) : invokers.get(0);
                } catch (Exception e) {
                    logger.warn(e.getMessage() + " may because invokers list dynamic change, ignore.", e);
                }
            }
        } catch (Throwable t) {
            logger.error("cluster reselect fail reason is :" + t.getMessage() + " if can not solve, you can set cluster.availablecheck=false in url", t);
        }
    }
    return invoker;
}

該方法是用負(fù)載均衡選擇一個(gè)invoker的主要邏輯。

4.reselect
private Invoker reselect(LoadBalance loadbalance, Invocation invocation,
                            List> invokers, List> selected, boolean availablecheck)
        throws RpcException {

    //Allocating one in advance, this list is certain to be used.
    //預(yù)先分配一個(gè)重選列表,這個(gè)列表是一定會(huì)用到的.
    List> reselectInvokers = new ArrayList>(invokers.size() > 1 ? (invokers.size() - 1) : invokers.size());

    //First, try picking a invoker not in `selected`.
    //先從非select中選
    //把不包含在selected中的提供者,放入重選列表reselectInvokers,讓負(fù)載均衡器選擇
    if (availablecheck) { // invoker.isAvailable() should be checked
        for (Invoker invoker : invokers) {
            if (invoker.isAvailable()) {
                if (selected == null || !selected.contains(invoker)) {
                    reselectInvokers.add(invoker);
                }
            }
        }
        // 在重選列表中用負(fù)載均衡器選擇
        if (!reselectInvokers.isEmpty()) {
            return loadbalance.select(reselectInvokers, getUrl(), invocation);
        }
    } else { // do not check invoker.isAvailable()
        // 不核對(duì)服務(wù)是否可以,把不包含在selected中的提供者,放入重選列表reselectInvokers,讓負(fù)載均衡器選擇
        for (Invoker invoker : invokers) {
            if (selected == null || !selected.contains(invoker)) {
                reselectInvokers.add(invoker);
            }
        }
        if (!reselectInvokers.isEmpty()) {
            return loadbalance.select(reselectInvokers, getUrl(), invocation);
        }
    }
    // Just pick an available invoker using loadbalance policy
    {
        // 如果非selected的列表中沒(méi)有選擇到,則從selected中選擇
        if (selected != null) {
            for (Invoker invoker : selected) {
                if ((invoker.isAvailable()) // available first
                        && !reselectInvokers.contains(invoker)) {
                    reselectInvokers.add(invoker);
                }
            }
        }
        if (!reselectInvokers.isEmpty()) {
            return loadbalance.select(reselectInvokers, getUrl(), invocation);
        }
    }
    return null;
}

該方法是是重新選擇的邏輯實(shí)現(xiàn)。

5.invoke
@Override
public Result invoke(final Invocation invocation) throws RpcException {
    // 核對(duì)是否已經(jīng)銷(xiāo)毀
    checkWhetherDestroyed();
    LoadBalance loadbalance = null;

    // binding attachments into invocation.
    // 獲得上下文的附加值
    Map contextAttachments = RpcContext.getContext().getAttachments();
    // 把附加值放入到會(huì)話域中
    if (contextAttachments != null && contextAttachments.size() != 0) {
        ((RpcInvocation) invocation).addAttachments(contextAttachments);
    }

    // 生成服務(wù)提供者集合
    List> invokers = list(invocation);
    if (invokers != null && !invokers.isEmpty()) {
        // 獲得負(fù)載均衡器
        loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
                .getMethodParameter(RpcUtils.getMethodName(invocation), Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));
    }
    RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
    return doInvoke(invocation, invokers, loadbalance);
}

該方法是invoker接口必備的方法,調(diào)用鏈的邏輯,不過(guò)主要的邏輯在doInvoke方法中,該方法是該類(lèi)的抽象方法,讓子類(lèi)只關(guān)注doInvoke方法。

6.list
protected List> list(Invocation invocation) throws RpcException {
    // 把會(huì)話域中的invoker加入集合
    List> invokers = directory.list(invocation);
    return invokers;
}

該方法是調(diào)用了directory的list方法,從會(huì)話域中獲得所有的Invoker集合。關(guān)于directory我會(huì)在后續(xù)文章講解。

(二)AvailableCluster
public class AvailableCluster implements Cluster {

    public static final String NAME = "available";

    @Override
    public  Invoker join(Directory directory) throws RpcException {

        // 創(chuàng)建一個(gè)AbstractClusterInvoker
        return new AbstractClusterInvoker(directory) {
            @Override
            public Result doInvoke(Invocation invocation, List> invokers, LoadBalance loadbalance) throws RpcException {
                // 遍歷所有的involer,只要有一個(gè)可用就直接調(diào)用。
                for (Invoker invoker : invokers) {
                    if (invoker.isAvailable()) {
                        return invoker.invoke(invocation);
                    }
                }
                throw new RpcException("No provider available in " + invokers);
            }
        };

    }

}

Available Cluster我在上面已經(jīng)講過(guò)了,只要找到一個(gè)可用的,則直接調(diào)用。

(三)BroadcastCluster
public class BroadcastCluster implements Cluster {

    @Override
    public  Invoker join(Directory directory) throws RpcException {
        // 創(chuàng)建一個(gè)BroadcastClusterInvoker
        return new BroadcastClusterInvoker(directory);
    }

}

關(guān)鍵實(shí)現(xiàn)在于BroadcastClusterInvoker。

(四)BroadcastClusterInvoker
public class BroadcastClusterInvoker extends AbstractClusterInvoker {

    private static final Logger logger = LoggerFactory.getLogger(BroadcastClusterInvoker.class);

    public BroadcastClusterInvoker(Directory directory) {
        super(directory);
    }

    @Override
    @SuppressWarnings({"unchecked", "rawtypes"})
    public Result doInvoke(final Invocation invocation, List> invokers, LoadBalance loadbalance) throws RpcException {
        // 檢測(cè)invokers是否為空
        checkInvokers(invokers, invocation);
        // 把invokers放到上下文
        RpcContext.getContext().setInvokers((List) invokers);
        RpcException exception = null;
        Result result = null;
        // 遍歷invokers,逐個(gè)調(diào)用,在循環(huán)調(diào)用結(jié)束后,只要任意一臺(tái)報(bào)錯(cuò)就報(bào)錯(cuò)
        for (Invoker invoker : invokers) {
            try {
                result = invoker.invoke(invocation);
            } catch (RpcException e) {
                exception = e;
                logger.warn(e.getMessage(), e);
            } catch (Throwable e) {
                exception = new RpcException(e.getMessage(), e);
                logger.warn(e.getMessage(), e);
            }
        }
        if (exception != null) {
            throw exception;
        }
        return result;
    }

}
(五)ForkingCluster
public class ForkingCluster implements Cluster {

    public final static String NAME = "forking";

    @Override
    public  Invoker join(Directory directory) throws RpcException {
        // 創(chuàng)建ForkingClusterInvoker
        return new ForkingClusterInvoker(directory);
    }

}
(六)ForkingClusterInvoker
public class ForkingClusterInvoker extends AbstractClusterInvoker {

    /**
     * 線程池
     * Use {@link NamedInternalThreadFactory} to produce {@link com.alibaba.dubbo.common.threadlocal.InternalThread}
     * which with the use of {@link com.alibaba.dubbo.common.threadlocal.InternalThreadLocal} in {@link RpcContext}.
     */
    private final ExecutorService executor = Executors.newCachedThreadPool(
            new NamedInternalThreadFactory("forking-cluster-timer", true));

    public ForkingClusterInvoker(Directory directory) {
        super(directory);
    }

    @Override
    @SuppressWarnings({"unchecked", "rawtypes"})
    public Result doInvoke(final Invocation invocation, List> invokers, LoadBalance loadbalance) throws RpcException {
        try {
            // 檢測(cè)invokers是否為空
            checkInvokers(invokers, invocation);
            final List> selected;
            // 獲取 forks 配置
            final int forks = getUrl().getParameter(Constants.FORKS_KEY, Constants.DEFAULT_FORKS);
            // 獲取超時(shí)配置
            final int timeout = getUrl().getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
            // 如果 forks 配置不合理,則直接將 invokers 賦值給 selected
            if (forks <= 0 || forks >= invokers.size()) {
                selected = invokers;
            } else {
                selected = new ArrayList>();
                // 循環(huán)選出 forks 個(gè) Invoker,并添加到 selected 中
                for (int i = 0; i < forks; i++) {
                    // TODO. Add some comment here, refer chinese version for more details.
                    // 選擇 Invoker
                    Invoker invoker = select(loadbalance, invocation, invokers, selected);
                    if (!selected.contains(invoker)) {//Avoid add the same invoker several times.
                        // 加入到selected集合
                        selected.add(invoker);
                    }
                }
            }
            // 放入上下文
            RpcContext.getContext().setInvokers((List) selected);
            final AtomicInteger count = new AtomicInteger();
            final BlockingQueue ref = new LinkedBlockingQueue();
            // 遍歷 selected 列表
            for (final Invoker invoker : selected) {
                // 為每個(gè) Invoker 創(chuàng)建一個(gè)執(zhí)行線程
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            // 進(jìn)行遠(yuǎn)程調(diào)用
                            Result result = invoker.invoke(invocation);
                            // 將結(jié)果存到阻塞隊(duì)列中
                            ref.offer(result);
                        } catch (Throwable e) {
                            // 僅在 value 大于等于 selected.size() 時(shí),才將異常對(duì)象
                            // 為了防止異常現(xiàn)象覆蓋正常的結(jié)果
                            int value = count.incrementAndGet();
                            if (value >= selected.size()) {
                                // 將異常對(duì)象存入到阻塞隊(duì)列中
                                ref.offer(e);
                            }
                        }
                    }
                });
            }
            try {
                // 從阻塞隊(duì)列中取出遠(yuǎn)程調(diào)用結(jié)果
                Object ret = ref.poll(timeout, TimeUnit.MILLISECONDS);
                // 如果是異常,則拋出
                if (ret instanceof Throwable) {
                    Throwable e = (Throwable) ret;
                    throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0, "Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e);
                }
                return (Result) ret;
            } catch (InterruptedException e) {
                throw new RpcException("Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e);
            }
        } finally {
            // clear attachments which is binding to current thread.
            RpcContext.getContext().clearAttachments();
        }
    }
}
(七)FailbackCluster
public class FailbackCluster implements Cluster {

    public final static String NAME = "failback";

    @Override
    public  Invoker join(Directory directory) throws RpcException {
        // 創(chuàng)建一個(gè)FailbackClusterInvoker
        return new FailbackClusterInvoker(directory);
    }

}
(八)FailbackClusterInvoker 1.屬性
private static final Logger logger = LoggerFactory.getLogger(FailbackClusterInvoker.class);

// 重試間隔
private static final long RETRY_FAILED_PERIOD = 5 * 1000;

/**
 * 定時(shí)器
 * Use {@link NamedInternalThreadFactory} to produce {@link com.alibaba.dubbo.common.threadlocal.InternalThread}
 * which with the use of {@link com.alibaba.dubbo.common.threadlocal.InternalThreadLocal} in {@link RpcContext}.
 */
private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2,
        new NamedInternalThreadFactory("failback-cluster-timer", true));

/**
 * 失敗集合
 */
private final ConcurrentMap> failed = new ConcurrentHashMap>();
/**
 * future
 */
private volatile ScheduledFuture retryFuture;
2.doInvoke
@Override
protected Result doInvoke(Invocation invocation, List> invokers, LoadBalance loadbalance) throws RpcException {
    try {
        // 檢測(cè)invokers是否為空
        checkInvokers(invokers, invocation);
        // 選擇出invoker
        Invoker invoker = select(loadbalance, invocation, invokers, null);
        // 調(diào)用
        return invoker.invoke(invocation);
    } catch (Throwable e) {
        logger.error("Failback to invoke method " + invocation.getMethodName() + ", wait for retry in background. Ignored exception: "
                + e.getMessage() + ", ", e);
        // 如果失敗,則加入到失敗隊(duì)列,等待重試
        addFailed(invocation, this);
        return new RpcResult(); // ignore
    }
}

該方法是選擇invoker調(diào)用的邏輯,在拋出異常的時(shí)候,做了失敗重試的機(jī)制,主要實(shí)現(xiàn)在addFailed。

3.addFailed
private void addFailed(Invocation invocation, AbstractClusterInvoker router) {
    if (retryFuture == null) {
        // 鎖住
        synchronized (this) {
            if (retryFuture == null) {
                // 創(chuàng)建定時(shí)任務(wù),每隔5秒執(zhí)行一次
                retryFuture = scheduledExecutorService.scheduleWithFixedDelay(new Runnable() {

                    @Override
                    public void run() {
                        // collect retry statistics
                        try {
                            // 對(duì)失敗的調(diào)用進(jìn)行重試
                            retryFailed();
                        } catch (Throwable t) { // Defensive fault tolerance
                            logger.error("Unexpected error occur at collect statistic", t);
                        }
                    }
                }, RETRY_FAILED_PERIOD, RETRY_FAILED_PERIOD, TimeUnit.MILLISECONDS);
            }
        }
    }
    // 添加 invocation 和 invoker 到 failed 中
    failed.put(invocation, router);
}

該方法做的事創(chuàng)建了定時(shí)器,然后把失敗的調(diào)用放入到集合中。

4.retryFailed
void retryFailed() {
    // 如果失敗隊(duì)列為0,返回
    if (failed.size() == 0) {
        return;
    }
    // 遍歷失敗隊(duì)列
    for (Map.Entry> entry : new HashMap>(
            failed).entrySet()) {
        // 獲得會(huì)話域
        Invocation invocation = entry.getKey();
        // 獲得invoker
        Invoker invoker = entry.getValue();
        try {
            // 重新調(diào)用
            invoker.invoke(invocation);
            // 從失敗隊(duì)列中移除
            failed.remove(invocation);
        } catch (Throwable e) {
            logger.error("Failed retry to invoke method " + invocation.getMethodName() + ", waiting again.", e);
        }
    }
}

這個(gè)方法是調(diào)用失敗的invoker重新調(diào)用的機(jī)制。

(九)FailfastCluster
public class FailfastCluster implements Cluster {

    public final static String NAME = "failfast";

    @Override
    public  Invoker join(Directory directory) throws RpcException {
        // 創(chuàng)建FailfastClusterInvoker
        return new FailfastClusterInvoker(directory);
    }

}
(十)FailfastClusterInvoker
public class FailfastClusterInvoker extends AbstractClusterInvoker {

    public FailfastClusterInvoker(Directory directory) {
        super(directory);
    }

    @Override
    public Result doInvoke(Invocation invocation, List> invokers, LoadBalance loadbalance) throws RpcException {
        // 檢測(cè)invokers是否為空
        checkInvokers(invokers, invocation);
        // 選擇一個(gè)invoker
        Invoker invoker = select(loadbalance, invocation, invokers, null);
        try {
            // 調(diào)用
            return invoker.invoke(invocation);
        } catch (Throwable e) {
            if (e instanceof RpcException && ((RpcException) e).isBiz()) { // biz exception.
                // 拋出異常
                throw (RpcException) e;
            }
            // 拋出異常
            throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0, "Failfast invoke providers " + invoker.getUrl() + " " + loadbalance.getClass().getSimpleName() + " select from all providers " + invokers + " for service " + getInterface().getName() + " method " + invocation.getMethodName() + " on consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e);
        }
    }
}

邏輯比較簡(jiǎn)單,調(diào)用拋出異常就直接拋出。

(十一)FailoverCluster
public class FailoverCluster implements Cluster {

    public final static String NAME = "failover";

    @Override
    public  Invoker join(Directory directory) throws RpcException {
        // 創(chuàng)建FailoverClusterInvoker
        return new FailoverClusterInvoker(directory);
    }

}
(十二)FailoverClusterInvoker
public class FailoverClusterInvoker extends AbstractClusterInvoker {

    private static final Logger logger = LoggerFactory.getLogger(FailoverClusterInvoker.class);

    public FailoverClusterInvoker(Directory directory) {
        super(directory);
    }

    @Override
    @SuppressWarnings({"unchecked", "rawtypes"})
    public Result doInvoke(Invocation invocation, final List> invokers, LoadBalance loadbalance) throws RpcException {
        // 復(fù)制一個(gè)invoker集合
        List> copyinvokers = invokers;
        // 檢測(cè)是否為空
        checkInvokers(copyinvokers, invocation);
        // 獲取重試次數(shù)
        int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
        if (len <= 0) {
            len = 1;
        }
        // retry loop.
        // 記錄最后一個(gè)異常
        RpcException le = null; // last exception.
        List> invoked = new ArrayList>(copyinvokers.size()); // invoked invokers.
        Set providers = new HashSet(len);
        // 循環(huán)調(diào)用,失敗重試
        for (int i = 0; i < len; i++) {
            //Reselect before retry to avoid a change of candidate `invokers`.
            //NOTE: if `invokers` changed, then `invoked` also lose accuracy.
            // 在進(jìn)行重試前重新列舉 Invoker,這樣做的好處是,如果某個(gè)服務(wù)掛了,
            // 通過(guò)調(diào)用 list 可得到最新可用的 Invoker 列表
            if (i > 0) {
                checkWhetherDestroyed();
                copyinvokers = list(invocation);
                // check again
                // 檢測(cè)copyinvokers 是否為空
                checkInvokers(copyinvokers, invocation);
            }
            // 通過(guò)負(fù)載均衡選擇invoker
            Invoker invoker = select(loadbalance, invocation, copyinvokers, invoked);
            // 添加到 invoker 到 invoked 列表中
            invoked.add(invoker);
            // 設(shè)置 invoked 到 RPC 上下文中
            RpcContext.getContext().setInvokers((List) invoked);
            try {
                // 調(diào)用目標(biāo) Invoker 的 invoke 方法
                Result result = invoker.invoke(invocation);
                if (le != null && logger.isWarnEnabled()) {
                    logger.warn("Although retry the method " + invocation.getMethodName()
                            + " in the service " + getInterface().getName()
                            + " was successful by the provider " + invoker.getUrl().getAddress()
                            + ", but there have been failed providers " + providers
                            + " (" + providers.size() + "/" + copyinvokers.size()
                            + ") from the registry " + directory.getUrl().getAddress()
                            + " on the consumer " + NetUtils.getLocalHost()
                            + " using the dubbo version " + Version.getVersion() + ". Last error is: "
                            + le.getMessage(), le);
                }
                return result;
            } catch (RpcException e) {
                if (e.isBiz()) { // biz exception.
                    throw e;
                }
                le = e;
            } catch (Throwable e) {
                le = new RpcException(e.getMessage(), e);
            } finally {
                providers.add(invoker.getUrl().getAddress());
            }
        }
        // 若重試失敗,則拋出異常
        throw new RpcException(le != null ? le.getCode() : 0, "Failed to invoke the method "
                + invocation.getMethodName() + " in the service " + getInterface().getName()
                + ". Tried " + len + " times of the providers " + providers
                + " (" + providers.size() + "/" + copyinvokers.size()
                + ") from the registry " + directory.getUrl().getAddress()
                + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "
                + Version.getVersion() + ". Last error is: "
                + (le != null ? le.getMessage() : ""), le != null && le.getCause() != null ? le.getCause() : le);
    }

}

該類(lèi)實(shí)現(xiàn)了失敗重試的容錯(cuò)策略,當(dāng)調(diào)用失敗的時(shí)候,記錄下異常,然后循環(huán)調(diào)用下一個(gè)選擇出來(lái)的invoker,直到重試次數(shù)用完,拋出最后一次的異常。

(十三)FailsafeCluster
public class FailsafeCluster implements Cluster {

    public final static String NAME = "failsafe";

    @Override
    public  Invoker join(Directory directory) throws RpcException {
        // 創(chuàng)建FailsafeClusterInvoker
        return new FailsafeClusterInvoker(directory);
    }

}
(十四)FailsafeClusterInvoker
public class FailsafeClusterInvoker extends AbstractClusterInvoker {
    private static final Logger logger = LoggerFactory.getLogger(FailsafeClusterInvoker.class);

    public FailsafeClusterInvoker(Directory directory) {
        super(directory);
    }

    @Override
    public Result doInvoke(Invocation invocation, List> invokers, LoadBalance loadbalance) throws RpcException {
        try {
            // 檢測(cè)invokers是否為空
            checkInvokers(invokers, invocation);
            // 選擇一個(gè)invoker
            Invoker invoker = select(loadbalance, invocation, invokers, null);
            // 調(diào)用
            return invoker.invoke(invocation);
        } catch (Throwable e) {
            // 如果失敗打印異常,返回一個(gè)空結(jié)果
            logger.error("Failsafe ignore exception: " + e.getMessage(), e);
            return new RpcResult(); // ignore
        }
    }
}

邏輯比較簡(jiǎn)單,就是不拋出異常,只是打印異常。

后記
該部分相關(guān)的源碼解析地址:https://github.com/CrazyHZM/i...

該文章講解了集群中關(guān)于cluster實(shí)現(xiàn)的部分,講了幾種調(diào)用方式和容錯(cuò)策略。接下來(lái)我將開(kāi)始對(duì)集群模塊關(guān)于配置規(guī)則部分進(jìn)行講解。

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://specialneedsforspecialkids.com/yun/77383.html

相關(guān)文章

  • dubbo源碼解析(四十六)消費(fèi)端發(fā)送請(qǐng)求過(guò)程

    摘要:可以參考源碼解析二十四遠(yuǎn)程調(diào)用協(xié)議的八。十六的該類(lèi)也是用了適配器模式,該類(lèi)主要的作用就是增加了心跳功能,可以參考源碼解析十遠(yuǎn)程通信層的四。二十的可以參考源碼解析十七遠(yuǎn)程通信的一。 2.7大揭秘——消費(fèi)端發(fā)送請(qǐng)求過(guò)程 目標(biāo):從源碼的角度分析一個(gè)服務(wù)方法調(diào)用經(jīng)歷怎么樣的磨難以后到達(dá)服務(wù)端。 前言 前一篇文章講到的是引用服務(wù)的過(guò)程,引用服務(wù)無(wú)非就是創(chuàng)建出一個(gè)代理。供消費(fèi)者調(diào)用服務(wù)的相關(guān)方法。...

    fish 評(píng)論0 收藏0
  • dubbo源碼解析三十九)集群——merger

    摘要:源碼分析一創(chuàng)建該類(lèi)實(shí)現(xiàn)了接口,是分組集合的集群實(shí)現(xiàn)。三工廠類(lèi),獲得指定類(lèi)型的對(duì)象。后記該部分相關(guān)的源碼解析地址該文章講解了集群中關(guān)于分組聚合實(shí)現(xiàn)的部分。接下來(lái)我將開(kāi)始對(duì)集群模塊關(guān)于路由部分進(jìn)行講解。 集群——merger 目標(biāo):介紹dubbo中集群的分組聚合,介紹dubbo-cluster下merger包的源碼。 前言 按組合并返回結(jié)果 ,比如菜單服務(wù),接口一樣,但有多種實(shí)現(xiàn),用gro...

    lscho 評(píng)論0 收藏0
  • dubbo源碼解析(四十五)服務(wù)引用過(guò)程

    摘要:服務(wù)引用過(guò)程目標(biāo)從源碼的角度分析服務(wù)引用過(guò)程。并保留服務(wù)提供者的部分配置,比如版本,,時(shí)間戳等最后將合并后的配置設(shè)置為查詢(xún)字符串中。的可以參考源碼解析二十三遠(yuǎn)程調(diào)用的一的源碼分析。 dubbo服務(wù)引用過(guò)程 目標(biāo):從源碼的角度分析服務(wù)引用過(guò)程。 前言 前面服務(wù)暴露過(guò)程的文章講解到,服務(wù)引用有兩種方式,一種就是直連,也就是直接指定服務(wù)的地址來(lái)進(jìn)行引用,這種方式更多的時(shí)候被用來(lái)做服務(wù)測(cè)試,不...

    xiaowugui666 評(píng)論0 收藏0
  • dubbo源碼解析三十八)集群——LoadBalance

    摘要:集群目標(biāo)介紹中集群的負(fù)載均衡,介紹下包的源碼。源碼分析一該類(lèi)實(shí)現(xiàn)了接口,是負(fù)載均衡的抽象類(lèi),提供了權(quán)重計(jì)算的功能。四該類(lèi)是負(fù)載均衡基于一致性的邏輯實(shí)現(xiàn)。 集群——LoadBalance 目標(biāo):介紹dubbo中集群的負(fù)載均衡,介紹dubbo-cluster下loadBalance包的源碼。 前言 負(fù)載均衡,說(shuō)的通俗點(diǎn)就是要一碗水端平。在這個(gè)時(shí)代,公平是很重要的,在網(wǎng)絡(luò)請(qǐng)求的時(shí)候同樣是這個(gè)...

    不知名網(wǎng)友 評(píng)論0 收藏0
  • dubbo源碼解析三十七)集群——directory

    摘要:首先將根據(jù)路由規(guī)則服務(wù)提供者和配置規(guī)則三種類(lèi)型分開(kāi),分別放入三個(gè)集合,然后對(duì)每個(gè)集合進(jìn)行修改或者通知設(shè)置禁止訪問(wèn)置空關(guān)閉所有的關(guān)閉禁止訪問(wèn)引用老的傳入的為空,說(shuō)明是路由規(guī)則或配置規(guī)則發(fā)生改變,此時(shí)是空的,直接使用。 集群——directory 目標(biāo):介紹dubbo中集群的目錄,介紹dubbo-cluster下directory包的源碼。 前言 我在前面的文章中也提到了Directory...

    blastz 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

gself

|高級(jí)講師

TA的文章

閱讀更多
最新活動(dòng)
閱讀需要支付1元查看
<