国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專欄INFORMATION COLUMN

Cobar源碼解析(一)

jiekechoo / 1960人閱讀

摘要:的使用方法就不多介紹了,本文的主要內容是剖析的源代碼。而又有一個私有的靜態變量,以及獲取這個私有靜態變量的靜態方法,顯然,這是一個單例設計模式,使程序運行的時候全局只有一個對象。

簡介

當業務的數據量和訪問量急劇增加的情況下,我們需要對數據進行水平拆分,從而降低單庫的壓力,并且數據的水平拆分需要對業務透明,屏蔽掉水平拆分的細節。并且,前端業務的高并發會導致后端的數據庫連接過多,從而DB的性能低下。

Cobar就是解決這些問題的一款分庫分表中間件,Cobar以proxy的形式位于前端應用和后端數據庫之間,Cobar對前端暴露的接口是MySQL通信協議,其將前端傳輸過來的SQL語句按照sharding規則路由到后端的數據庫實例上,再合并多個實例返回的結果,從而模擬單庫下的數據庫行為。

Cobar的使用方法就不多介紹了,本文的主要內容是剖析Cobar的源代碼。

Cobar的前端連接模型

結構圖如下:

我們先來看CobarServer的代碼:

private CobarServer() {
        this.config = new CobarConfig();
        SystemConfig system = config.getSystem();
        MySQLLexer.setCStyleCommentVersion(system.getParserCommentVersion());
        this.timer = new Timer(NAME + "Timer", true);
        this.initExecutor = ExecutorUtil.create("InitExecutor", system.getInitExecutor());
        this.timerExecutor = ExecutorUtil.create("TimerExecutor", system.getTimerExecutor());
        this.managerExecutor = ExecutorUtil.create("ManagerExecutor", system.getManagerExecutor());
        this.sqlRecorder = new SQLRecorder(system.getSqlRecordCount());
        this.isOnline = new AtomicBoolean(true);
        this.startupTime = TimeUtil.currentTimeMillis();
    }
    

上面是CobarServer的構造函數,它的限定是private的。

private static final CobarServer INSTANCE = new CobarServer();

public static final CobarServer getInstance() {
        return INSTANCE;
    }

而CobarServer又有一個私有的靜態變量INSTANCE,以及獲取這個私有靜態變量的靜態方法,顯然,這是一個單例設計模式,使程序運行的時候全局只有一個CobarServer對象。

我們再來看CobarServer的startup()方法,此方法中構造了一個NIOAcceptor(綁定服務器端口,接受客戶端的連接),

server = new NIOAcceptor(NAME + "Server", system.getServerPort(), sf);

構造了一個接收前端連接的非阻塞Acceptor,讓我們在來看NIOAcceptor類的代碼。

public final class NIOAcceptor extends Thread {
    private static final Logger LOGGER = Logger.getLogger(NIOAcceptor.class);
    private static final AcceptIdGenerator ID_GENERATOR = new AcceptIdGenerator();

    private final int port;
    private final Selector selector;
    private final ServerSocketChannel serverChannel;
    private final FrontendConnectionFactory factory;
    private NIOProcessor[] processors;
    private int nextProcessor;
    private long acceptCount;
     
     public NIOAcceptor(String name, int port, FrontendConnectionFactory factory) throws IOException {
        super.setName(name);
        this.port = port;
        this.selector = Selector.open();    # 生成選擇器
        this.serverChannel = ServerSocketChannel.open();
        this.serverChannel.socket().bind(new InetSocketAddress(port));    # 綁定服務器端口
        this.serverChannel.configureBlocking(false);    # 設置非阻塞模式
        this.serverChannel.register(selector, SelectionKey.OP_ACCEPT);    # 監聽ACCEPT事件,
        this.factory = factory;    # 設置前端連接的工廠
    }
}

以上的代碼都是NIO編程中很常見的操作。下面我們看run()方法,

@Override
    public void run() {
        final Selector selector = this.selector;
        for (;;) {
            ++acceptCount;
            try {
                selector.select(1000L);    # select操作是阻塞的,若沒有監聽到相應的事件,則一直阻塞,直到超過1000毫秒,則返回
                Set keys = selector.selectedKeys();
                try {
                    for (SelectionKey key : keys) {
                        if (key.isValid() && key.isAcceptable()) {
                            accept();        # 接受連接,這個方法很關鍵
                        } else {
                            key.cancel();
                        }
                    }
                } finally {
                    keys.clear();
                }
            } catch (Throwable e) {
                LOGGER.warn(getName(), e);
            }
        }
    }

以上的run方法也是常見的NIO中監聽事件的套路,其中accept()方法是定義的私有函數,accept方法是為了將channel與selector綁定,代碼如下,

private void accept() {
        SocketChannel channel = null;
        try {
            channel = serverChannel.accept();    # 為新的連接分配socket
            channel.configureBlocking(false);    # 設置為非阻塞模式
            # factory將channel進行封裝,進行相應的設置,返回一個FrontendConnection,connection本質上就是一個封裝好的channel
            FrontendConnection c = factory.make(channel);
            c.setAccepted(true);
            c.setId(ID_GENERATOR.getId());    # 為連接設置ID
            NIOProcessor processor = nextProcessor();    # 為連接分配processor,NIOAcceptor中包含了一個NIOProcessor數組,分配的策略即根據下標不斷后移,到達數組末尾后又從數組的起始位置開始分配
            c.setProcessor(processor);
            # 回調NIOProcessor的postRegister方法,而processor的postRegister調用的是NIOReactor類的postRegister方法
            processor.postRegister(c);    
        } catch (Throwable e) {
            closeChannel(channel);
            LOGGER.warn(getName(), e);
        }
    }

讓我來看NIOProcessor的postRegister方法,

public void postRegister(NIOConnection c) {
        reactor.postRegister(c);
}

NIOProcessor類中定義了一個NIOReactor類的成員變量reactor,而postRegister調用的是NIOReactor的postRegister方法。下面讓我們來看NIOReactor的postRegister代碼,

final void postRegister(NIOConnection c) {
        # 只是先將前端連接插入R線程的阻塞隊列中,并沒有立刻將channel與selector進行綁定
        reactorR.registerQueue.offer(c);
        # 喚醒R線程的selector,若之前的select操作沒有返回的話則立即返回
        reactorR.selector.wakeup();
}

既然channel與selector沒有立刻進行綁定,那它們是什么時候綁定的呢?我們來看NIOReactor中內部類R的run()方法,

@Override
        public void run() {
            final Selector selector = this.selector;
            for (;;) {
                ++reactCount;
                try {
                    selector.select(1000L);
                    # 將connection與selector進行綁定
                    register(selector);   
                    Set keys = selector.selectedKeys();
                    try {
                        for (SelectionKey key : keys) {
                            Object att = key.attachment();
                            if (att != null && key.isValid()) {
                                int readyOps = key.readyOps();
                                if ((readyOps & SelectionKey.OP_READ) != 0) {
                                    read((NIOConnection) att);
                                } else if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                                    write((NIOConnection) att);
                                } else {
                                    key.cancel();
                                }
                            } else {
                                key.cancel();
                            }
                        }
                    } finally {
                        keys.clear();
                    }
                } catch (Throwable e) {
                    LOGGER.warn(name, e);
                }
            }
        }

在run方法中,當select方法返回的時候,就會進行channel和selector的綁定,因為當connection插入到阻塞隊列中的時候,會對selector進行wakeup(),即select(1000L)方法會立即返回,所以不必擔心channel會卡一秒鐘才會和selector進行綁定。

我們再來看R線程的register方法,

private void register(Selector selector) {
            NIOConnection c = null;
            # 將R線程阻塞隊列中的所有連接都輪詢取出,與selector進行綁定
            while ((c = registerQueue.poll()) != null) {
                try {
                    c.register(selector);
                } catch (Throwable e) {
                    c.error(ErrorCode.ERR_REGISTER, e);
                }
            }
        }
總結

關于NIOAcceptor為何先將connection放入Reactor的阻塞隊列,而不是直接綁定。筆者的觀點是,如果由NIOAcceptor負責綁定則會造成鎖競爭,selector的register方法會爭用鎖,會導致NIOAcceptor線程和R、W線程競爭selector的鎖,若acceptor中處理綁定connection的邏輯,則NIOAcceptor就不能快速地處理大量的連接,整個系統的吞吐就會降低。所以Cobar中的設計是將connection的綁定放到R線程的阻塞隊列中去,讓R線程來完成connection的綁定工作。

圖就隨意看看吧-.-,有點丑。

以上。

原文鏈接

https://segmentfault.com/a/11...

文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。

轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/70404.html

相關文章

  • Cobar源碼解析(二)

    摘要:如果數據庫檢測到是連續的,則表明沒有串包,如果不連續,則表示串包,數據庫會直接丟棄該連接。源碼分析上一節我們分析到,當一個前端連接過來,并不是直接和綁定,而是先插入到線程的注冊隊列中這樣能釋放的壓力處理更多前端連接。 報文格式 這一節我們來講Cobar Handshake的過程。 MySQL服務端和客戶端交互的所有的包格式都是統一的,報文格式如下圖: showImg(https://s...

    pkwenda 評論0 收藏0
  • 【深度】| 值得收藏的阿里開源技術

    摘要:淘寶定制基于,是國內第一個優化定制且開源的服務器版虛擬機。數據庫開源數據庫是基于官方版本的一個分支,由阿里云數據庫團隊維護,目前也應用于阿里巴巴集團業務以及阿里云數據庫服務。淘寶服務器是由淘寶網發起的服務器項目。 Java JAVA 研發框架 SOFAStack SOFAStack(Scalable Open Financial Architecture Stack)是用于快速構建金融...

    econi 評論0 收藏0

發表評論

0條評論

最新活動
閱讀需要支付1元查看
<