大數(shù)據(jù)夢(mèng)工廠( 0011 - YARN核心設(shè)計(jì)解析)


1 - YARN RPC架構(gòu)設(shè)計(jì)

YARN RPC Server 處理流程大致可以分為四個(gè)階段:建立連接、接收請(qǐng)求、處理請(qǐng)求和返回結(jié)果。各階段實(shí)現(xiàn)如下圖所示:

1.1 - 建立連接

整個(gè) YARN RPC Server 只有一個(gè) Listener 線程,且包含一個(gè) Selector 對(duì)象,用于監(jiān)聽(tīng) OP_ACCEPT 事件。統(tǒng)一負(fù)責(zé)監(jiān)聽(tīng)是否有來(lái)自各個(gè)客戶端的 RPC 連接請(qǐng)求到達(dá),并采用輪詢策略選擇一個(gè) Reader 線程處理新連接。

1.2 - 接收請(qǐng)求

當(dāng) Listener 完成客戶端的連接之后,通過(guò)輪詢方式找到一個(gè) Reader 線程處理,并將新的 RPC 請(qǐng)求封裝成固定的格式(Call 類),放到一個(gè)共享隊(duì)列(callQueue)中。可同時(shí)存在多個(gè) Reader 線程,且包含一個(gè) Selector 對(duì)象,用于監(jiān)聽(tīng) OP_READ 事件。

1.3 - 處理請(qǐng)求

Handler 線程(可同時(shí)存在多個(gè))并行從共享隊(duì)列(callQueue)中讀取 Call 對(duì)象,執(zhí)行對(duì)應(yīng)的函數(shù)調(diào)用,并嘗試直接將結(jié)果返回給對(duì)應(yīng)的客戶端。但某些函數(shù)調(diào)用返回結(jié)果很大或者網(wǎng)絡(luò)速度過(guò)慢,可能難以將結(jié)果一次性發(fā)送到客戶端,此時(shí),Handler 線程就會(huì)為對(duì)應(yīng)客戶端生成一個(gè) Connection 對(duì)象,同時(shí)創(chuàng)建一個(gè) responseQueue 隊(duì)列來(lái)儲(chǔ)存結(jié)果,最后將結(jié)果寫(xiě)到 Responder 線程。

1.4 - 返回結(jié)果

Server 端只有一個(gè) Responder 線程,且包含一個(gè) Selector 對(duì)象,用于監(jiān)聽(tīng) OP_WRITE 事件。當(dāng) Handler 線程沒(méi)能將結(jié)果一次性發(fā)送到對(duì)應(yīng)客戶端時(shí),會(huì)向該 Selector 對(duì)象注冊(cè) OP_WRITE 事件,進(jìn)而由 Responder 線程采用異步方式繼續(xù)發(fā)送未發(fā)送完成的結(jié)果。

1.5 - RPC 參數(shù)調(diào)優(yōu)

Hadoop RPC 主要的配置參數(shù)如下:
1、Reader 線程數(shù)量
由參數(shù) ipc.server.read.threadpool.size 配置,默認(rèn)是 1。默認(rèn)情況下,一個(gè) RPC Server 只包含一個(gè) Reader 線程。

2、每個(gè) Handler 線程對(duì)應(yīng)的最大 Call 數(shù)量
由參數(shù) ipc.server.handler.queue.size 配置,默認(rèn)是 100。默認(rèn)情況下,每個(gè) Handler 線程對(duì)應(yīng)的 Call 對(duì)列長(zhǎng)度為 100。例如:如果 Handler 線程數(shù)是 10,則整個(gè) Call 隊(duì)列(即共享隊(duì)列 callQueue)最大長(zhǎng)度為:100 x 10 = 1000

3、Handler 線程數(shù)量
在 HDFS 的 NameNode 中對(duì)應(yīng)的 Handler 數(shù)量由參數(shù) dfs.datanode.handler.count 配置,默認(rèn)是 10。
在 YARN 的 ResourceManager 中對(duì)應(yīng)的 Handler 數(shù)量由參數(shù) yarn.resourcemanager.resource-tracker.client.thread-count 配置,默認(rèn)是 50。

4、客戶端最大重試次數(shù)
由參數(shù) ipc.client.connect.max.retries 配置,默認(rèn)是 10。也就是會(huì)連續(xù)重試 10 次。

2 - YARN通信協(xié)議

RPC 協(xié)議是連接各個(gè)組件的 “大動(dòng)脈”。在 YARN 中,任何兩個(gè)需要相互通信的組件之間只有一個(gè) RPC 協(xié)議,而對(duì)于任何一個(gè) RPC 協(xié)議,通信雙方有一端是 Client,另一端是 Server,且總是 Client 主動(dòng)連接 Server 的。因此,YARN 實(shí)際上采用的是拉模式(pull-based)通信模型。如下圖所示:

YARN 主要由以下幾個(gè) RPC 協(xié)議組成:

  • ApplicationClientProtocol:JobClient(作業(yè)提交客戶端)與 RM 之間的協(xié)議。JobClient 通過(guò)該 RPC 協(xié)議提交應(yīng)用程序、 查詢應(yīng)用程序狀態(tài)等。
  • ResourceTrackerProtocol:NM 與 RM 之間的協(xié)議。NM 通過(guò)該 RPC 協(xié)議向 RM 注冊(cè),并定時(shí)發(fā)送心跳信息,匯報(bào)當(dāng)前節(jié)點(diǎn)的資源使用情況和 Container 運(yùn)行情況。
  • ApplicationMasterProtocol:AM 與 RM 之間的協(xié)議。AM 通過(guò)該 RPC 協(xié)議向 RM 注冊(cè)和撤銷(xiāo)自己,并為各個(gè)任務(wù)申請(qǐng)資源。
  • ContainerManagementProtocol:AM 與 NM 之間的協(xié)議。 AM 通過(guò)該 RPC 要求 NM 啟動(dòng)或者停止 Container,獲取各個(gè) Container 的使用狀態(tài)等信息。
  • ResourceManagerAdministrationProtocol:Admin 與 RM 之間的通信協(xié)議。Admin 通過(guò)該 RPC 協(xié)議更新系統(tǒng)配置文件。例如:節(jié)點(diǎn)黑白名單、用戶隊(duì)列權(quán)限等。
  • HAServiceProtocol:Active RM 和 Standby RM 之間的通信協(xié)議。提供狀態(tài)監(jiān)控和 Failover 的 HA 服務(wù)。
  • TaskUmbilicalProtocol:YarnChild 和 MRAppMaster 之間的通信協(xié)議。用于 MRAppMaster 監(jiān)控跟蹤 YarnChild 的運(yùn)行狀態(tài),YarnChild 向 MRAppMaster 拉取 - Task 任務(wù)信息。
  • MRClientProtocol:JobClient 和 AM 之間的通信協(xié)議。用于客戶端拉取應(yīng)用程序的執(zhí)行狀態(tài),以及應(yīng)用程序返回執(zhí)行結(jié)果給 JobClient。
  • ApplicationHistoryProtocol:JobClient 和 JobHistory Server 之間的通信協(xié)議。用于獲取已完成應(yīng)用程序的信息等。

3 - YARN Service工作機(jī)制

對(duì)于生命周期較長(zhǎng)的對(duì)象,使用服務(wù)的對(duì)象管理模型進(jìn)行管理。該模型主要特點(diǎn)如下:

  • 將每個(gè)被服務(wù)化的對(duì)象分為 4 個(gè)狀態(tài):NOTINITED(被創(chuàng)建)、INITED(已初始化)、STARTED(已啟動(dòng))、STOPPED(已停止)。
  • 任何服務(wù)狀態(tài)變化都可以觸發(fā)另外一些動(dòng)作。
  • 可通過(guò)組合的方式對(duì)任意服務(wù)進(jìn)行組合,以便進(jìn)行統(tǒng)一管理。也就是說(shuō),一個(gè)父 Service 可能會(huì)有多個(gè)子 Service。

3.1 - YARN 服務(wù)模型的類圖

YARN 中關(guān)于服務(wù)模型的類圖位于包 org.apache.hadoop.service 中,如下圖所示:

在 YARN 中,會(huì)有非常多的服務(wù)對(duì)象,且都實(shí)現(xiàn)了接口 Service,定義了服務(wù)初始化、啟動(dòng)、停止等操作。YARN 中所有對(duì)象,如果是組合服務(wù),直接繼承 CompositeService 類,否則繼承 AbstractService 類。如下圖所示:

ResourceManager 是一個(gè)組合服務(wù),包括 ClientRMService、ApplicationMasterLauncher、ApplicationMasterService 等服務(wù)對(duì)象。

NodeManager 也屬于組合服務(wù),它們內(nèi)部包含多個(gè)單一服務(wù)和組合服務(wù),以實(shí)現(xiàn)對(duì)內(nèi)部多種服務(wù)的統(tǒng)一管理。

3.2 - Service 的定義

public interface Service extends Closeable {  public enum STATE {    NOTINITED(0, "NOTINITED"),    INITED(1, "INITED"),    STARTED(2, "STARTED"),    STOPPED(3, "STOPPED");  }  // 服務(wù)初始化  void init(Configuration config);  // 服務(wù)啟動(dòng)  void start();  // 服務(wù)停止  void stop();  // 服務(wù)關(guān)閉  void close() throws IOException;}

4 - YARN AsyncDispatcher事件模型

4.1 - 事件處理模型

YARN 采用了事件驅(qū)動(dòng)的并發(fā)模型,其核心服務(wù)是一個(gè)中央異步調(diào)度器(AsyncDispatcher)。包括 ResourceManager、NodeManager、MRAppMaster 等,它們共同維護(hù)了一個(gè)事件(Event)與事件處理器(EventHandler)的映射表,用來(lái)處理各個(gè)事件。其事件處理模型如下圖所示:

并發(fā)處理流程包括 5 個(gè)步驟:
1、各業(yè)務(wù)類型的處理請(qǐng)求以 Event 的形式提交到事件隊(duì)列(Event Queue)中;
2、AsyncDispatcher 創(chuàng)建 HandlerThread 線程消費(fèi)事件隊(duì)列,并將 Event 傳遞給對(duì)應(yīng)的 EventHandler;
3、該 EventHandler 可能將 Event 轉(zhuǎn)發(fā)給另外一個(gè) EventHandler,也有可能轉(zhuǎn)發(fā)給帶有有限狀態(tài)機(jī)(StateMachine)的 EventHandler;
4、將 StateMachine 的處理結(jié)果以 Event 的形式輸出到 AsyncDispatcher;
5、如果有新的 Event 會(huì)再次被 AsyncDispatcher 轉(zhuǎn)發(fā)給下一個(gè) EventHandler,直至處理完成(達(dá)到終止條件)。

例如: MRAppMaster 內(nèi)部包含一個(gè)中央異步調(diào)度器(AsyncDispatcher),并注冊(cè)了 TaskAttemptEvent/TaskAttemptImplTaskEvent/TaskImplJobEvent/JobImpl 等一系列事件/事件處理器,由中央異步調(diào)度器統(tǒng)一管理和調(diào)度。

4.2 - 事件與事件處理器

通過(guò)引入服務(wù)化和事件驅(qū)動(dòng)的設(shè)計(jì)思想,使得 YARN 具有低耦合、高內(nèi)聚的特點(diǎn),各個(gè)模塊只需要完成各自的功能,而模塊之間則采用事件相互關(guān)聯(lián)。事件與事件處理器的的類圖位于包 org.apache.hadoop.yarn.event 中,如下圖所示:

ResourceManager 內(nèi)部事件與事件處理器交互圖如下:

5 - YARN StateMachine 狀態(tài)機(jī)

狀態(tài)機(jī)(StateMachine)是由一組狀態(tài)組成:

  • 初始狀態(tài)
  • 中間狀態(tài)
  • 最終狀態(tài)

當(dāng)狀態(tài)機(jī)從初始狀態(tài)開(kāi)始運(yùn)行,經(jīng)過(guò)一系列中間狀態(tài)后,到達(dá)最終狀態(tài)時(shí)退出。也就是說(shuō),在一個(gè)狀態(tài)機(jī)中,每個(gè)狀態(tài)都可以接收一組特定事件,并根據(jù)具體的事件類型轉(zhuǎn)換到另一個(gè)狀態(tài)。當(dāng)狀態(tài)機(jī)轉(zhuǎn)換到最終狀態(tài)時(shí),則退出。

5.1 - 狀態(tài)機(jī)轉(zhuǎn)換方式

在 YARN 中,每種狀態(tài)轉(zhuǎn)換(doTransition() 方法執(zhí)行狀態(tài)轉(zhuǎn)換,addTransition() 方法注冊(cè)狀態(tài)轉(zhuǎn)換)由一個(gè)四元組表示,分別是:

  • 轉(zhuǎn)換前狀態(tài)(preState)
  • 轉(zhuǎn)換后狀態(tài)(postState)
  • 事件(event)
  • 回調(diào)函數(shù)(hook)

YARN 定義了三種狀態(tài)轉(zhuǎn)換方式,具體如下:
1、一個(gè)初始狀態(tài)、一個(gè)最終狀態(tài)、一種事件
該方式表示經(jīng)過(guò)處理之后,無(wú)論如何,進(jìn)入到一個(gè)唯一狀態(tài)。

初始狀態(tài):最終狀態(tài):事件 = 1:1:1

2、 一個(gè)初始狀態(tài)、多個(gè)最終狀態(tài)、一種事件
該方式表示不同的邏輯處理結(jié)果,可能導(dǎo)致進(jìn)入不同的狀態(tài)。

初始狀態(tài):最終狀態(tài):事件 = 1:N:1

3、一個(gè)初始狀態(tài)、一個(gè)最終狀態(tài)、多種事件
該方式表示多個(gè)不同的事件,可能觸發(fā)到多個(gè)不同狀態(tài)的轉(zhuǎn)換。

初始狀態(tài):最終狀態(tài):事件 = 1:1:N

5.2 - 狀態(tài)機(jī)類

YARN 實(shí)現(xiàn)了一個(gè)非常簡(jiǎn)單的狀態(tài)機(jī)庫(kù),在 org.apache.hadoop.yarn.state 包中。

YARN 對(duì)外提供了一個(gè)狀態(tài)機(jī)工廠 StatemachineFactory,它提供多種 addTransition() 方法供用戶添加各種狀態(tài)轉(zhuǎn)移,一旦狀態(tài)機(jī)添加完畢后,可通過(guò)調(diào)用 installTopology() 完成一個(gè)狀態(tài)機(jī)的構(gòu)建。如下圖所示:

5.3 - 狀態(tài)機(jī)可視化

YARN 中實(shí)現(xiàn)了多個(gè)狀態(tài)機(jī)對(duì)象,包括:

  • ResourceManager 中的 RMAppImpl、RMAppAttemptImpl、RMContainerImpl 和 RMNodeImpl 等。
  • NodeManager 中的 ApplicationImpl、ContainerImpl 和 LocalizedResource 等。
  • MRAppMaster 中的 JobImpl、TaskImpl 和 TaskAttemptImpl 等。

為了便于查看這些狀態(tài)機(jī)的狀態(tài)變化以及相關(guān)事件,YARN 提供了一個(gè)狀態(tài)機(jī)可視化工具,具體操作步驟如下:
1、將狀態(tài)機(jī)轉(zhuǎn)化為 graphviz(.gv) 格式的文件,在源代碼根目錄下進(jìn)行編譯

[root@hadoop-01 hadoop-2.10.1-src]# mvn compile -Pvisualize

生成 3 個(gè) *.gv 文件:

[root@hadoop-01 hadoop-2.10.1-src]# ls -l *.gv-rw-r--r-- 1 root root 16698 Sep 10 09:37 MapReduce.gv-rw-r--r-- 1 root root 12075 Sep 10 09:35 NodeManager.gv-rw-r--r-- 1 root root 14641 Sep 10 09:35 ResourceManager.gv

2、使用可視化包 graphviz 中的相關(guān)命令生成狀態(tài)機(jī)圖

[root@hadoop-01 hadoop-2.10.1-src]# dot -Tpng ResourceManager.gv > ResourceManager.png[root@hadoop-01 hadoop-2.10.1-src]# dot -Tpng NodeManager.gv > NodeManager.png[root@hadoop-01 hadoop-2.10.1-src]# dot -Tpng MapReduce.gv > MapReduce.png

如果尚未安裝 graphviz 包,操作該步驟之前先要安裝該包,Centos-7.x 安裝命令如下:

[root@hadoop-01 hadoop-2.10.1-src]# yum install graphviz

ResourceManager 狀態(tài)機(jī)如下圖所示:

NodeManager 狀態(tài)機(jī)如下圖所示:

MapReduce 狀態(tài)機(jī)如下圖所示:

每一個(gè)狀態(tài)機(jī),其實(shí)本身也是一個(gè)事件處理器(EventHandler)。


::: hljs-center
掃一掃,我們的故事就開(kāi)始了。
:::