国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專欄INFORMATION COLUMN

ZStack源碼剖析之核心庫鑒賞——FlowChain

yintaolaowanzi / 2293人閱讀

摘要:下面將開始分析它的源碼。僅僅定義了一個(gè)最小應(yīng)有的行為。更好的選擇由于該庫是為定制而生,故此有一些防御性判斷,源碼顯得略為。

本文首發(fā)于泊浮目的專欄:https://segmentfault.com/blog...
前言

在ZStack(或者說產(chǎn)品化的IaaS軟件)中的任務(wù)通常有很長的執(zhí)行路徑,錯(cuò)誤可能發(fā)生在路徑的任意一處。為了保證系統(tǒng)的正確性,需提供一種較為完善的回滾機(jī)制——在ZStack中,通過一個(gè)工作流引擎,ZStack的每一個(gè)步驟都被包裹在獨(dú)立的工作流中,可以在出錯(cuò)的時(shí)候回滾。此外,通過在配置文件中組裝工作流的方式,關(guān)鍵的執(zhí)行路徑可以被配置,這使得架構(gòu)的耦合度進(jìn)一步降低。

系統(tǒng)解耦合的手段除了之前文章所提到的分層、分割、分布等,還有一個(gè)重要手段是異步,業(yè)務(wù)之間的消息傳遞不是同步調(diào)用,而是將一個(gè)業(yè)務(wù)操作分成多個(gè)階段,每個(gè)階段之間通過共享數(shù)據(jù)的方式異步執(zhí)行進(jìn)行協(xié)作。

這即是一種在業(yè)務(wù)設(shè)計(jì)原則中——流程可定義原則的具象化。接觸過金融行業(yè)的同學(xué)肯定知道,不同的保險(xiǎn)理賠流程是不一樣的。而承保流程和理賠流程是分離的,在需要時(shí)進(jìn)行關(guān)聯(lián),從而可以復(fù)用一些理賠流程,并提供一些個(gè)性化理賠流程。

演示代碼

就以創(chuàng)建VM為例,在ZStack中大致可以分以下幾個(gè)步驟:

    
        
            
                org.zstack.compute.vm.VmImageSelectBackupStorageFlow
                org.zstack.compute.vm.VmAllocateHostFlow
                org.zstack.compute.vm.VmAllocatePrimaryStorageFlow
                org.zstack.compute.vm.VmAllocateVolumeFlow
                org.zstack.compute.vm.VmAllocateNicFlow
                org.zstack.compute.vm.VmInstantiateResourcePreFlow
                org.zstack.compute.vm.VmCreateOnHypervisorFlow
                org.zstack.compute.vm.VmInstantiateResourcePostFlow
            
        

可以說是代碼即文檔了。在這里,ZStack顯式聲明這些Flow在Spring XML中,這些屬性將會被注入到createVmWorkFlowElements中。每一個(gè)Flow都被拆成了一個(gè)個(gè)較小的單元,好處不僅是將業(yè)務(wù)操作分成了多個(gè)階段易于回滾,還是可以有效復(fù)用這些Flow。這也是編程思想中“組合”的體現(xiàn)。

如何使用

除了這種配置型聲明,還可以在代碼中靈活的使用這些FlowChain。在這里,我們將以Case來說明這些FlowChain的用法,避免對ZStack業(yè)務(wù)邏輯不熟悉的讀者看的一頭霧水。

一共有兩種可用的FlowChain:

SimpleFlowChain

ShareFlowChain

SimpleFlowChain

我們先來看一個(gè)Case。

    @Test
    public void test() {
        FlowChain chain = FlowChainBuilder.newShareFlowChain();

        chain.then(new ShareFlow() {
            int a;

            @Override
            public void setup() {
                flow(new NoRollbackFlow() {
                    @Override
                    public void run(FlowTrigger trigger, Map data) {
                        a = 1;
                        increase();
                        trigger.next();
                    }
                });

                flow(new NoRollbackFlow() {
                    @Override
                    public void run(FlowTrigger trigger, Map data) {
                        a = 2;
                        increase();
                        trigger.next();
                    }
                });
            }
        }).done(new FlowDoneHandler(null) {
            @Override
            public void handle(Map data) {
                success = true;
            }
        }).start();

        Assert.assertTrue(success);
        expect(2);
    }

我們可以看到,這就是一個(gè)工作流。完成一個(gè)工作流的時(shí)候(回調(diào)觸發(fā)時(shí))執(zhí)行下一個(gè)工作流——由trigger.next觸發(fā)。不僅如此,還可以添加Rollback屬性。

    @Test
    public void test() throws WorkFlowException {
        final int[] count = {0};

        new SimpleFlowChain()
                .then(new Flow() {
                    @Override
                    public void run(FlowTrigger chain, Map data) {
                        count[0]++;
                        chain.next();
                    }

                    @Override
                    public void rollback(FlowRollback chain, Map data) {
                        count[0]--;
                        chain.rollback();
                    }
                })
                .then(new Flow() {
                    @Override
                    public void run(FlowTrigger chain, Map data) {
                        count[0]++;
                        chain.next();
                    }

                    @Override
                    public void rollback(FlowRollback chain, Map data) {
                        count[0]--;
                        chain.rollback();
                    }
                })
                .then(new Flow() {
                    @Override
                    public void run(FlowTrigger chain, Map data) {
                        chain.fail(null);
                    }

                    @Override
                    public void rollback(FlowRollback chain, Map data) {
                        count[0]--;
                        chain.rollback();
                    }
                })
                .start();

        Assert.assertEquals(-1, count[0]);
    }

rollback由FlowTrigger的fail觸發(fā)。這樣我們可以保證在發(fā)生一些錯(cuò)誤的時(shí)候及時(shí)回滾,防止我們的系統(tǒng)處于一個(gè)有臟數(shù)據(jù)的中間狀態(tài)。同時(shí),Map也可以用來在Flow之間傳遞上下文。

ShareFlowChain
public class TestShareFlow {
    int[] count = {0};
    boolean success;

    private void increase() {
        count[0]++;
    }

    private void decrease() {
        count[0]--;
    }

    private void expect(int ret) {
        Assert.assertEquals(count[0], ret);
    }

    @Test
    public void test() {
        FlowChain chain = FlowChainBuilder.newShareFlowChain();

        chain.then(new ShareFlow() {
            int a;

            @Override
            public void setup() {
                flow(new NoRollbackFlow() {
                    @Override
                    public void run(FlowTrigger trigger, Map data) {
                        a = 1;
                        increase();
                        trigger.next();
                    }
                });

                flow(new NoRollbackFlow() {
                    @Override
                    public void run(FlowTrigger trigger, Map data) {
                        a = 2;
                        increase();
                        trigger.next();
                    }
                });
            }
        }).done(new FlowDoneHandler(null) {
            @Override
            public void handle(Map data) {
                success = true;
            }
        }).start();

        Assert.assertTrue(success);
        expect(2);
    }


    @Before
    public void setUp() throws Exception {
        new BeanConstructor().build();
    }
}

比起SimpleFlowChain,ShareFlowChain則是一個(gè)Inner class,在相同的作用域里,傳遞數(shù)據(jù)變得更加的方便了。

它的實(shí)現(xiàn)

在ZStack中,F(xiàn)lowChain作為核心庫,其實(shí)現(xiàn)也是非常的簡單(可以直接參考SimpleFlowChainShareFlowChain),本質(zhì)就是將任務(wù)放入List中,由內(nèi)部方法進(jìn)行迭代,在此基礎(chǔ)上做了一系列操作。下面將開始分析它的源碼。

從接口說起
public interface FlowChain {
    List getFlows();

    FlowChain insert(Flow flow);

    FlowChain insert(int pos, Flow flow);

    FlowChain setFlowMarshaller(FlowMarshaller marshaller);

    FlowChain then(Flow flow);

    FlowChain done(FlowDoneHandler handler);

    FlowChain error(FlowErrorHandler handler);

    FlowChain Finally(FlowFinallyHandler handler);

    FlowChain setData(Map data);

    FlowChain putData(Map.Entry... es);

    FlowChain setName(String name);

    void setProcessors(List processors);

    Map getData();

    void start();

    FlowChain noRollback(boolean no);

    FlowChain allowEmptyFlow();
}

接口的名字非常的易懂,那么在這里就不多作解釋了。FlowChain僅僅定義了一個(gè)Flow最小應(yīng)有的行為。

//定義了Flow的回滾操作接口
public interface FlowRollback extends AsyncBackup {
    //回滾操作
    void rollback();
    //設(shè)置跳過回滾操作
    void skipRestRollbacks();
}
//定義了觸發(fā)器的行為接口
public interface FlowTrigger extends AsyncBackup {
    //觸發(fā)失敗,調(diào)用errorHandle
    void fail(ErrorCode errorCode);
    //觸發(fā)下一個(gè)flow
    void next();
    //setError后,在下次調(diào)用next的時(shí)才會調(diào)用errorHandle
    void setError(ErrorCode error);
}
源碼解析 Flow
public interface Flow {
    void run(FlowTrigger trigger, Map data);

    void rollback(FlowRollback trigger, Map data);
}

Flow的定義其實(shí)非常的簡單——一組方法。執(zhí)行和對應(yīng)的回滾,一般在ZStack中都以匿名內(nèi)部類的方式傳入。

Chain的用法

在之前的SimpleFlowChain的case中。我們可以看到一系列的鏈?zhǔn)秸{(diào)用,大致如下:

new SimpleFlowChain().then(new flow()).then(new flow()).then(new flow()).start();

then本質(zhì)是往List flows里添加一個(gè)flow。

    public SimpleFlowChain then(Flow flow) {
        flows.add(flow);
        return this;
    }

再來看看start

    @Override
    public void start() {
        // 檢測flow中是否設(shè)置了processors。一般用來打trace
        if (processors != null) {
            for (FlowChainProcessor p : processors) {
                p.processFlowChain(this);
            }
        }
        //如果flows為空但是之前在設(shè)置中允許為空,那么就直接直接done部分的邏輯。不然就報(bào)錯(cuò)
        if (flows.isEmpty() && allowEmptyFlow) {
            callDoneHandler();
            return;
        }

        if (flows.isEmpty()) {
            throw new CloudRuntimeException("you must call then() to add flow before calling start() or allowEmptyFlow() to run empty flow chain on purpose");
        }
        //每個(gè)flow必須有一個(gè)map,用來傳遞上下文
        if (data == null) {
            data = new HashMap();
        }
        //標(biāo)記為已經(jīng)開始
        isStart = true;
        //如果沒有名字的話給flow 取一個(gè)名字,因?yàn)楹苡锌赡苁悄涿褂玫膄low
        if (name == null) {
            name = "anonymous-chain";
        }

        logger.debug(String.format("[FlowChain(%s): %s] starts", id, name));
        //打印trace,方便調(diào)試
        if (logger.isTraceEnabled()) {
            List names = CollectionUtils.transformToList(flows, new Function() {
                @Override
                public String call(Flow arg) {
                    return String.format("%s[%s]", arg.getClass(), getFlowName(arg));
                }
            });
            logger.trace(String.format("execution path:
%s", StringUtils.join(names, " -->
")));
        }
        //生成一個(gè)迭代器
        it = flows.iterator();
        //從it中獲取一個(gè)不需要跳過的flow開始執(zhí)行。如果沒有獲取到,就執(zhí)行done邏輯
        Flow flow = getFirstNotSkippedFlow();
        if (flow == null) {
            // all flows are skipped
            callDoneHandler();
        } else {
            runFlow(flow);
        }
    }

再來看一下runFlow中的代碼

    private void runFlow(Flow flow) {
        try {
            //看報(bào)錯(cuò)信息就可以猜到在做什么防御措施了:如果一個(gè)transaction在一個(gè)flow中沒有被關(guān)閉而跳到下一個(gè)flow時(shí),會拋出異常。這個(gè)防御機(jī)制來自于一個(gè)實(shí)習(xí)生寫的bug,當(dāng)時(shí)被排查出來的時(shí)候花了非常大的力氣——現(xiàn)象非常的詭異。所以現(xiàn)在被寫在了這里。
            if (TransactionSynchronizationManager.isActualTransactionActive()) {
                String flowName = null;
                String flowClassName = null;
                if (currentFlow != null) {
                    flowName = getFlowName(currentFlow);
                    flowClassName = currentFlow.getClass().getName();
                }

                throw new CloudRuntimeException(String.format("flow[%s:%s] opened a transaction but forgot closing it", flowClassName, flowName));
            }
            //toRun就是一個(gè)當(dāng)前要run的flow
            Flow toRun = null;
            if (flowMarshaller != null) {
            //flowMarshaller 實(shí)際上是一個(gè)非常惡心的玩意兒。尤其在一些配置好掉的xml flow突然因?yàn)橐恍l件而改變接下來執(zhí)行的flow令人很無語...但是也提供了一些靈活性。
                toRun = flowMarshaller.marshalTheNextFlow(currentFlow == null ? null : currentFlow.getClass().getName(),
                        flow.getClass().getName(), this, data);
                if (toRun != null) {
                    logger.debug(String.format("[FlowChain(%s): %s] FlowMarshaller[%s] replaces the next flow[%s] to the flow[%s]",
                            id, name, flowMarshaller.getClass(), flow.getClass(), toRun.getClass()));
                }
            }
       
            if (toRun == null) {
                toRun = flow;
            }

            if (CoreGlobalProperty.PROFILER_WORKFLOW) {
                //對flow的監(jiān)視。比如flow的執(zhí)行時(shí)間等
                stopWatch.start(toRun);
            }

            currentFlow = toRun;

            String flowName = getFlowName(currentFlow);
            String info = String.format("[FlowChain(%s): %s] start executing flow[%s]", id, name, flowName);
            logger.debug(info);
            //在flow中還允許定義afterDone afterError afterFinal的行為。稍后將會介紹
            collectAfterRunnable(toRun);
            //終于到了run,這里就是調(diào)用者傳入的行為來決定run中的邏輯
            toRun.run(this, data);
             //fail的邏輯稍后解析
        } catch (OperationFailureException oe) {
            String errInfo = oe.getErrorCode() != null ? oe.getErrorCode().toString() : "";
            logger.warn(errInfo, oe);
            fail(oe.getErrorCode());
        } catch (FlowException fe) {
            String errInfo = fe.getErrorCode() != null ? fe.getErrorCode().toString() : "";
            logger.warn(errInfo, fe);
            fail(fe.getErrorCode());
        } catch (Throwable t) {
            logger.warn(String.format("[FlowChain(%s): %s] unhandled exception when executing flow[%s], start to rollback",
                    id, name, flow.getClass().getName()), t);
            fail(errf.throwableToInternalError(t));
        }
    }

fail

    @Override
    public void fail(ErrorCode errorCode) {
        isFailCalled = true;
        setErrorCode(errorCode);
        //放入Stack中,之后Rollback會根據(jù)Stack中的flow順序來
        rollBackFlows.push(currentFlow);
        //rollback會對this.rollBackFlows中flow按照順序調(diào)用rollback
        rollback();
    }
FlowTrigger
//定義了觸發(fā)器的行為接口
public interface FlowTrigger extends AsyncBackup {
    //觸發(fā)失敗,調(diào)用errorHandle
    void fail(ErrorCode errorCode);
    //觸發(fā)下一個(gè)flow
    void next();
    //setError后,在下次調(diào)用next的時(shí)才會調(diào)用errorHandle
    void setError(ErrorCode error);
}

之前已經(jīng)看過fail的代碼。接下來來看看nextsetError

    @Override
    public void next() {
        //如果flow沒有run起來的情況下,是不能調(diào)用next的
        if (!isStart) {
            throw new CloudRuntimeException(
                    String.format("[FlowChain(%s): %s] you must call start() first, and only call next() in Flow.run()",
                            id, name));
        }
        //當(dāng)rollback開始的時(shí)候也不允許next
        if (isRollbackStart) {
            throw new CloudRuntimeException(
                    String.format("[FlowChain(%s): %s] rollback has started, you can"t call next()", id, name));
        }
        //將當(dāng)前flow的push進(jìn)rollback用的stack
        rollBackFlows.push(currentFlow);

        logger.debug(String.format("[FlowChain(%s): %s] successfully executed flow[%s]", id, name, getFlowName(currentFlow)));
        //獲取下一個(gè)flow。在這里才是真正意義上的next
        Flow flow = getFirstNotSkippedFlow();
        if (flow == null) {
            // no flows, or all flows are skipped
            if (errorCode == null) {
                callDoneHandler();
            } else {
                callErrorHandler(false);
            }
        } else {
            runFlow(flow);
        }
    }

可以看一下getFirstNotSkippedFlow,本質(zhì)上是利用了迭代器的特性。

    private Flow getFirstNotSkippedFlow() {
        Flow flow = null;
        while (it.hasNext()) {
            flow = it.next();
            if (!isSkipFlow(flow)) {
                break;
            }
        }

        return flow;
    }

接下來是setError

    @Override
    public void setError(ErrorCode error) {
        setErrorCode(error);
    }

//往下看
    private void setErrorCode(ErrorCode errorCode) {
        this.errorCode = errorCode;
    }

根據(jù)之前的next邏輯:

        if (flow == null) {
            // no flows, or all flows are skipped
            if (errorCode == null) {
                callDoneHandler();
            } else {
                callErrorHandler(false);
            }
        } else {
            runFlow(flow);
        }

我們可以大致猜想到,如果在next的時(shí)候當(dāng)前error不為空,則調(diào)用錯(cuò)誤handle。這樣在setError后還可以做一些事情。

無論是調(diào)用errorHandle還是doneHandle,都會調(diào)用finalHandle。finalHandle也允許用戶定義這部分的邏輯,使flow更加的靈活。

更好的選擇

由于該庫是為ZStack定制而生,故此有一些防御性判斷,源碼顯得略為verbose。如果有同學(xué)對此感興趣,想將其應(yīng)用到自己的系統(tǒng)中,筆者推薦使用:jdeferred。

Java Deferred/Promise library similar to JQuery

由于JavaScript 中的代碼都是異步調(diào)用的。簡單說,它的思想是,每一個(gè)異步任務(wù)返回一個(gè)Promise對象,該對象有一個(gè)then方法,允許指定回調(diào)函數(shù)。

在這里列出幾個(gè)較為簡單的示范,或者有興趣的讀者也可以參考這里:

import org.jdeferred.DeferredManager;
import org.jdeferred.Promise;
import org.jdeferred.impl.DefaultDeferredManager;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;

import java.util.concurrent.TimeUnit;


public class deferSimpleTest {

    private static int var = 0;
    final DeferredManager dm = new DefaultDeferredManager();

    @After
    public void cleanUp() {
        var = 0;
    }


    @Test
    public void test() {
        Promise p1 = dm.when(() -> {
            var += 1;
        }).then(result -> {
            var += 1;
        });

        Promise p2 = dm.when(() -> {
            var += 1;
        }).then(result -> {
            var += 1;
        });

        dm.when(p1, p2).done(Void -> var += 1);
        Assert.assertEquals(5, var);
    }

    @Test
    public void test2() {
        final DeferredManager dm = new DefaultDeferredManager();

        Promise promise = dm.when(() -> {
                var += 1;
            }).then(result -> {
                var += 1;
            });

        dm.when(promise).done(Void -> var += 1);
        Assert.assertEquals(3, var);
    }

    @Test
    public void testBadCallback() {
        Promise promise = dm.when(() -> {
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        dm.when(promise).done(Void -> {
                    var += 1;
                    throw new RuntimeException("this exception is expected");
                }
        ).fail(Void -> {
            System.out.print("fail!");
            var -= 1;
        });
        Assert.assertEquals(0, var);

    }
}

如果你在使用Java8,那么也可以通過CompletableFuture來得到“類似”的支持。

文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://specialneedsforspecialkids.com/yun/70845.html

相關(guān)文章

  • ZStack源碼剖析模塊鑒賞——LongJob

    摘要:因?yàn)檫@個(gè)狀態(tài)下,是交給一個(gè)線程在執(zhí)行的,見源碼剖析之核心庫鑒賞中的分析。并且允許等行為。上面提到過,允許運(yùn)行暫停取消等行為。維護(hù)和相應(yīng)的之間的關(guān)系。則停止執(zhí)行并觸發(fā)之前的所有。 本文首發(fā)于泊浮目的專欄:https://segmentfault.com/blog... 前言 在ZStack中,當(dāng)用戶在UI上發(fā)起操作時(shí),前端會調(diào)用后端的API對實(shí)際的資源發(fā)起操作請求。但在一個(gè)分布式系統(tǒng)中...

    cheukyin 評論0 收藏0
  • ZStack源碼剖析二次開發(fā)——可擴(kuò)展框架

    摘要:但在實(shí)際的二次開發(fā)中,這些做法未必能夠完全滿足需求。在源碼剖析之核心庫鑒賞一文中,我們了解到是的基礎(chǔ)設(shè)施之一,同時(shí)也允許通過顯示聲明的方式來聲明。同理,一些也可以使用繼承進(jìn)行擴(kuò)展。 本文首發(fā)于泊浮目的專欄:https://segmentfault.com/blog... 前言 在ZStack博文-5.通用插件系統(tǒng)中,官方提出了幾個(gè)較為經(jīng)典的擴(kuò)展方式。但在實(shí)際的二次開發(fā)中,這些做法未必...

    lolomaco 評論0 收藏0
  • ZStack源碼剖析核心鑒賞——Defer

    摘要:本文首發(fā)于泊浮目的專欄在語言中,有一個(gè)關(guān)鍵字叫做其作用是在函數(shù)前執(zhí)行。一般有兩種用法在該函數(shù)拋出異常時(shí)執(zhí)行。在該函數(shù)返回前執(zhí)行。這里的放入來自系統(tǒng)啟動(dòng)時(shí)利用反射所做的一個(gè)行為。因此并不會影響使用時(shí)的性能。 本文首發(fā)于泊浮目的專欄:https://segmentfault.com/blog... 在Go語言中,有一個(gè)關(guān)鍵字叫做defer——其作用是在函數(shù)return前執(zhí)行。在ZStac...

    DevWiki 評論0 收藏0
  • ZStack源碼剖析核心鑒賞——Defer

    摘要:本文首發(fā)于泊浮目的專欄在語言中,有一個(gè)關(guān)鍵字叫做其作用是在函數(shù)前執(zhí)行。一般有兩種用法在該函數(shù)拋出異常時(shí)執(zhí)行。在該函數(shù)返回前執(zhí)行。這里的放入來自系統(tǒng)啟動(dòng)時(shí)利用反射所做的一個(gè)行為。因此并不會影響使用時(shí)的性能。 本文首發(fā)于泊浮目的專欄:https://segmentfault.com/blog... 在Go語言中,有一個(gè)關(guān)鍵字叫做defer——其作用是在函數(shù)return前執(zhí)行。在ZStac...

    ymyang 評論0 收藏0
  • ZStack源碼剖析核心鑒賞——ThreadFacade

    摘要:每個(gè)消息都會被一個(gè)線程消費(fèi),同時(shí)最大并發(fā)量為。然后提交一個(gè)任務(wù)到線程池中,這個(gè)任務(wù)的內(nèi)容是從等待隊(duì)列中取出一個(gè),如果等待隊(duì)列為空,則刪除這個(gè)等待隊(duì)列的。小結(jié)本文分析了的久經(jīng)生產(chǎn)考驗(yàn)的核心組件線程池。 本文首發(fā)于泊浮目的專欄:https://segmentfault.com/blog... 前言 在ZStack中,最基本的執(zhí)行單位不僅僅是一個(gè)函數(shù),也可以是一個(gè)任務(wù)(Task。其本質(zhì)實(shí)現(xiàn)...

    enali 評論0 收藏0

發(fā)表評論

0條評論

最新活動(dòng)
閱讀需要支付1元查看
<