摘要:方法首先初始化一個回調(diào)函數(shù),這是當一個成為之后就會調(diào)用的一個用于初始化一系列變量的方法,包括拓撲如何在集群上分配,拓撲狀態(tài)更新,清除函數(shù),還有監(jiān)控線程等。
寫在前面的話,筆者第一次閱讀框架源碼,所以可能有些地方理解錯誤或者沒有詳細解釋,如果在閱讀過程發(fā)現(xiàn)錯誤很歡迎在文章下面評論指出。文章后續(xù)會陸續(xù)更新,可以關注或者收藏,轉發(fā)請先私信我,謝謝。對了,筆者看的是2.2.1這個版本
概述??JStorm是一個分布式的實時計算引擎,是阿里巴巴根據(jù)storm的流處理模型進行重寫的一個框架,支持相同的邏輯模型(也就是拓撲結構),然后底層的實現(xiàn)卻大有不同。不過本文并不是打算對兩個框架進行比較,接下來我會從源碼的角度上來解析JStorm是如何工作的。
??作為第一個篇章,筆者先來介紹下nimbus以及它啟動的時候做了什么。JStorm的主節(jié)點上運行著nimbus的守護進程,這個進程主要負責與ZK通信,分發(fā)代碼,給集群中的從節(jié)點分配任務,監(jiān)視集群狀態(tài)等等。此外nimbus需要維護的所有狀態(tài)都會存儲在ZK中,JStorm為了減少對ZK的訪問次數(shù)做了一些緩存,這個后續(xù)代碼分析會說到。以上是nimbus功能的簡介,接下來我們從源碼的角度看看Nimbus到底做了什么。首先在Nimbus啟動的時候:
//設置主線程由于未捕獲異常而突然中止時調(diào)用的默認程序 Thread.setDefaultUncaughtExceptionHandler(new DefaultUncaughtExceptionHandler()); //加載集群的配置信息 Map config = Utils.readStormConfig(); //這下面這個方法內(nèi)部注釋掉了,筆者暫時沒有太在意,后續(xù)再補充 JStormServerUtils.startTaobaoJvmMonitor(); //創(chuàng)建一個NimbusServer實例 NimbusServer instance = new NimbusServer(); //創(chuàng)建一個默認的nimbus啟動類 INimbus iNimbus = new DefaultInimbus(); //開始進行實際的初始化 instance.launchServer(config, iNimbus);
??其實在DefaultUncaughtExceptionHandler中也并沒有太多的處理操作,簡單判斷是否是內(nèi)存溢出,然后正常關閉,否則就是異常直接拋出然后中斷。讀取配置的過程就不詳細講解了。NimbusServer這個類主要封裝了一些用于操作Nimbus的成員變量和方法,Nimbus的啟動操作基本都是定義在這個類內(nèi)的(上述代碼就是這個類中的main方法所定義的)。
??最重要的方法是launchServer,接下來就詳細的解說這個方法的作用,首先來看下launchServer這個方法內(nèi)部的代碼:
private void launchServer(final Map conf, INimbus inimbus) { LOG.info("Begin to start nimbus with conf " + conf); try { //判斷配置模式是否正確 StormConfig.validate_distributed_mode(conf); createPid(conf); //設置退出時的操作 initShutdownHook(); //這個方法在默認實現(xiàn)中沒有任何操作 inimbus.prepare(conf, StormConfig.masterInimbus(conf)); //創(chuàng)建NimbusData對象 data = createNimbusData(conf, inimbus); //這個方法主要負責處理當nimbus線程稱為leader線程之后的操作 initFollowerThread(conf); int port = ConfigExtension.getNimbusDeamonHttpserverPort(conf); hs = new Httpserver(port, conf); hs.start(); //如果集群是運行在yarn上,也需要做一些初始化操作。 initContainerHBThread(conf); serviceHandler = new ServiceHandler(data); //thrift是一個分布式的RPC框架 initThrift(conf); } catch (Throwable e) { if (e instanceof OutOfMemoryError) { LOG.error("Halting due to Out Of Memory Error..."); } LOG.error("Fail to run nimbus ", e); } finally { cleanup(); } LOG.info("Quit nimbus"); }判斷配置中的模式
??只是判斷配置信息中的一個字段名為“storm.cluster.mode”是否是“distributed”,本地模式下是“l(fā)ocal”。
initShutdownHook??添加退出的時候一些操作,包括設置參數(shù)提醒集群要退出,清除nimbus存儲下的一些工作線程(負責處理通信,分發(fā)代碼,心跳的一系列守護線程),關閉打開的各種資源等。
createNimbusData??這個方法用于創(chuàng)建一個NimbusData的對象,這個對象封裝了Nimbus與ZK通信的一些成員變量。下面會在每個方法內(nèi)部逐漸講到NimbusData的一些成員變量以及他們的作用。首先來看看NimbusData的構造方法。
public NimbusData(final Map conf, INimbus inimbus) throws Exception { this.conf = conf; //兩個方法分別處理打開的文件流和blob傳輸流 createFileHandler(); mkBlobCacheMap(); this.nimbusHostPortInfo = NimbusInfo.fromConf(conf); this.blobStore = BlobStoreUtils.getNimbusBlobStore(conf, nimbusHostPortInfo); this.isLaunchedCleaner = false; this.isLaunchedMonitor = false; this.submittedCount = new AtomicInteger(0); this.stormClusterState = Cluster.mk_storm_cluster_state(conf); createCache(); this.taskHeartbeatsCache = new ConcurrentHashMap>(); //創(chuàng)建一個調(diào)度線程池,默認大小為12 this.scheduExec = Executors.newScheduledThreadPool(SCHEDULE_THREAD_NUM); this.statusTransition = new StatusTransition(this); this.startTime = TimeUtils.current_time_secs(); this.inimubs = inimbus; localMode = StormConfig.local_mode(conf); this.metricCache = new JStormMetricCache(conf, this.stormClusterState); this.clusterName = ConfigExtension.getClusterName(conf); pendingSubmitTopologies = new TimeCacheMap (JStormUtils.MIN_10); topologyTaskTimeout = new ConcurrentHashMap (); tasksHeartbeat = new ConcurrentHashMap (); this.metricsReporter = new JStormMetricsReporter(this); this.metricRunnable = ClusterMetricsRunnable.mkInstance(this); String configUpdateHandlerClass = ConfigExtension.getNimbusConfigUpdateHandlerClass(conf); this.configUpdateHandler = (ConfigUpdateHandler) Utils.newInstance(configUpdateHandlerClass); if (conf.containsKey(Config.NIMBUS_TOPOLOGY_ACTION_NOTIFIER_PLUGIN)) { String string = (String) conf.get(Config.NIMBUS_TOPOLOGY_ACTION_NOTIFIER_PLUGIN); nimbusNotify = (ITopologyActionNotifierPlugin) Utils.newInstance(string); } else { nimbusNotify = null; } }
??3.1 createFileHandler:在這方法內(nèi)部,實現(xiàn)了一個匿名的內(nèi)部類ExpiredCallback,在其內(nèi)部實現(xiàn)了一個方法叫expire,利用回調(diào)的方式來關閉Channel或者BufferFileInputStream實例對象。
public void createFileHandler() { ExpiredCallback
??然后初始化NimbusData的兩個成員變量uploaders和downloaders,這兩個分別維護需要上傳的通道和需要下載的通道。TimeCacheMap這個類的主要實現(xiàn)邏輯是在其構造函數(shù)內(nèi)部啟動一個守護線程。首先創(chuàng)建一個緩沖區(qū),只要系統(tǒng)不關閉,則在守護線程內(nèi)部不斷的緩沖區(qū)獲取對象,在對象不為空的情況下調(diào)用回調(diào)函數(shù)的expire方法,并執(zhí)行相應的操作,這里具體傳進來的expire方法是關閉Channel或者BufferFileInputStream。
??3.2. mkBlobCacheMap:和上述的方法非常類似,也是申明一個匿名內(nèi)部類,然后初始化幾個成員變量。代碼幾乎和上個方法一樣就不浪費拌面去貼了。這里expire方法中要關閉的是兩個流AtomicOutputStream和BufferInputStream,blobUploaders和blobDownloaders分別存放著上傳和下載所打開的流。blobListers存放上傳和下載的數(shù)據(jù)。
??3.3. 初始化幾個成員變量,包括NimbusInfo(包含了主機名,端口和標志是否是leader),BlobStore(用來存儲blob數(shù)據(jù)的,使用鍵值存儲,阿里提供了兩個不同的blob存儲方式,一種是本地文件系統(tǒng)存儲,一種的hdfs存儲,兩種方式的區(qū)別在于,由于本地文件存儲并不能保證一致性,所以需要ZK介入來保證,這是JStorm的默認配置。如果使用hdfs來存儲,則不需要ZK介入,因為hdfs能保證一致性和正確性),StormClusterState(存儲整個集群的狀態(tài),這個是從ZK上獲取的),為了避免多次向ZK通信,還需要設置緩存信息,任務的心跳信息等等。
??3.4. 初始化好metrics相關的報告線程和監(jiān)聽線程。
??4.1. 方法首先初始化一個回調(diào)函數(shù),這是當一個nimbus成為leader之后就會調(diào)用的一個用于初始化一系列變量的方法,包括拓撲如何在集群上分配,拓撲狀態(tài)更新,清除函數(shù),還有監(jiān)控線程等。后續(xù)會有新的篇章來介紹這個init方法,這里先放這個方法的源碼。
private void init(Map conf) throws Exception { data.init(); NimbusUtils.cleanupCorruptTopologies(data); //拓撲分配 initTopologyAssign(); //狀態(tài)更新 initTopologyStatus(); //清除函數(shù) initCleaner(conf); initMetricRunnable(); if (!data.isLocalMode()) { initMonitor(conf); //mkRefreshConfThread(data); } }
??4.2. 初始化一個Runnable的子類,在構造方法中,首先判斷集群并不是使用本地模式,然后更新ZK上的節(jié)點信息(將nimbus注冊到ZK上)。然后通過ZK獲取集群的狀態(tài)信息,畢竟nimbus是需要維護整個集群的。緊接著判斷是否存在leader,兩次都無法選舉出leader之后,則將ZK上的nimbus信息刪除并退出。如果blobstore使用的是本地文件模式(有本文模式還有hdfs模式兩種)還需要添加一個回調(diào)函數(shù),這個回調(diào)函數(shù)執(zhí)行的操作是,當這個nimbus不是leader的時候,對blob進行同步。此外還需要將那些active的blob存到ZK中,而將死掉的進行清除(原因前文3.3也說到過,本地模式存儲無法保證一致性,所以需要ZK進行維護,而hdfs自帶容錯機制,能保證數(shù)據(jù)的一致性)。
??4.3. 設置該線程為守護線程,并啟動這個線程。run方法首先判斷當前保存在ZK上的集群中是否有l(wèi)eader,如果沒有則選舉當前nimbus為leader線程。如果有了leader線程,則需要判斷是否跟當前的nimbus相同,如果不相同則停止當前的nimbus,畢竟已經(jīng)有l(wèi)eader存在了。如果是相同的,則需要判斷本地的狀態(tài)中,如果還沒有設置為leader,表明當前nimbus還沒有進行初始化,則先設置nimbus為leader然后回調(diào)函數(shù)進行初始化,也就是調(diào)用init(conf)方法。
獲取一個端口(默認的端口是7621)用于構建HttpServer實例對象。可以用于處理和接受tcp連接,啟動一個新的線程進行httpserver的監(jiān)聽。(主要作用或者說在哪里用到尚且不明確)。
??這個方法的主要作用是得知是否能在資源管理器(yarn)上運行jstorm集群,如果可以的話,則需要創(chuàng)建一個新的線程用于處理。(其實這里使用容器的目的是可以在一個物理集群上運行多個不一樣的邏輯集群甚至多個JStorm集群,能動態(tài)調(diào)整邏輯集群分到的資源,此外,資源管理器能提供非常強的可擴展性)。容器線程會被添加到NimbusServer中,后續(xù)使用到的時候再詳細講解。這個容器線程也是守護線程,且馬上就會啟動,這個線程的run方法里面包含兩個處理:
??6.1. handleWriteDir:這個方法的主要作用是清除掉容器上的過期心跳信息,準確的說,如果JStorm集群容器目錄下的心跳信息大于10,則需要清除(從最老的開始)。
??6.2. handlReadDir:這里主要是用于維護本地是否能接受到集群上的hb信息,如果多次超時則要拋出異常。
??thrift是JStorm使用的一個分布式RPC框架。筆者后續(xù)再添加相應的源碼解析。
文章版權歸作者所有,未經(jīng)允許請勿轉載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/66980.html
摘要:下面就來講講第一個初始化操作拓撲分配。如果沒有舊的分配信息,說明拓撲分配類型為。到這里,預分配,創(chuàng)建拓撲分配上下文就完成了。集群下的分配,見下文講解資源準備首先第一步是判斷拓撲分配的類型是否符合要求,不符合則拋出異常。 ??寫在前面的話,筆者第一次閱讀框架源碼,所以可能有些地方理解錯誤或者沒有詳細解釋,如果在閱讀過程發(fā)現(xiàn)錯誤很歡迎在文章下面評論指出。文章后續(xù)會陸續(xù)更新,可以關注或者收藏...
摘要:即使是容器已經(jīng)退出的也可以看到,所以可以通過這種方式來分析非預期的退出。也可以直接通過在容器內(nèi)啟動一個更方便地調(diào)試容器,不必一條條執(zhí)行。和獲得容器中進程的狀態(tài)和在容器里執(zhí)行的效果類似。通過查看容器的詳細信息飯后鏡像和容器的詳細信息。 『重用』容器名 但我們在編寫/調(diào)試Dockerfile的時候我們經(jīng)常會重復之前的command,比如這種docker run --name jstorm-...
摘要:淘寶定制基于,是國內(nèi)第一個優(yōu)化定制且開源的服務器版虛擬機。數(shù)據(jù)庫開源數(shù)據(jù)庫是基于官方版本的一個分支,由阿里云數(shù)據(jù)庫團隊維護,目前也應用于阿里巴巴集團業(yè)務以及阿里云數(shù)據(jù)庫服務。淘寶服務器是由淘寶網(wǎng)發(fā)起的服務器項目。 Java JAVA 研發(fā)框架 SOFAStack SOFAStack(Scalable Open Financial Architecture Stack)是用于快速構建金融...
閱讀 2804·2021-11-24 09:39
閱讀 2777·2021-09-23 11:45
閱讀 3404·2019-08-30 12:49
閱讀 3352·2019-08-30 11:18
閱讀 1908·2019-08-29 16:42
閱讀 3344·2019-08-29 16:35
閱讀 1321·2019-08-29 11:21
閱讀 1912·2019-08-26 13:49