spark Dstreams-程序部署
點擊上方“IT那活兒”,關注后了解更多內容,不管IT什么活兒,干就完了!!!
為了運行一個spark streaming應用程序,需要滿足以下條件 :
1.1 使用集群管理器管理集群:
這是基本的要求。
1.2 打成jar包:
你必須將你的應用程序編譯成jar包,使用spark-submit啟動程序,然而如果你的程序使用的是高級數據源(例如kafka),你必須將kafka依賴打進jar包。
1.3 為執行節點配置足夠的內存:
因為接收到的數據必須保存在內存,所以執行節點必須有足夠的內存來存儲數據,如果要執行10分鐘的窗口操作,系統必須在內存中保留至少10分鐘的數據,因此應用程序的內存需求取決于其中使用的操作。
1.4 配置檢查點:
如果流應用程序需要,則必須將Hadoop API兼容容錯存儲(例如HDFS、S3等)中的目錄配置為檢查點目錄,并且流應用程序的寫入方式應確保檢查點信息可用于故障恢復。
1.5 配置應用驅動程序的的自動重啟:
為了從驅動程序故障中自動修復,用于運行流應用程序的部署基礎結構必須監視驅動程序進程,并在驅動程序失敗時重新啟動驅動程序。不同的集群管理器有不同的工具來實現這一點。
spark standalone:可以提交spark程序以以spark standalone方式運行,也就是說應用程序在一個節點運行,而且可以指示standalone集群管理器監督驅動程序,如果驅動程序由于非零退出代碼或運行驅動程序的節點故障而失敗,則重新啟動它。
YARN:YARN支持自動重啟應用程序的類似機制。
Mesos:Marathon已經被用來實現這一目標。
1.6 配置預寫日志(write-ahead logs):
自spark1.2,我們已經引入了預寫日志以實現強大的容錯保證,如果啟用它,所有從receiver接收到的數據都會寫入配置檢查點目錄中的預寫日志。
這可以防止驅動程序恢復時的數據丟失,從而確保零數據丟失,可以通過設置spark.streaming.receiver.writeAheadLog.enable=true來啟用它,然而這可能以單個接收器的接收吞吐為代價,但是這可以通過并行運行更多接收器來彌補。
此外,建議在啟用預寫日志時禁用spark內接收數據的復制,因為該日志已存儲在已復制的存儲系統中,這可以通過設置存儲級別為StorageLevel.MEMORY_AND_DISK_SER來實現,使用S3(或任何不支持刷新的文件系統)進行預寫日志時,請記住啟用:
spark.streaming.driver.writeAheadLog.closeFileAfterWrite
和
spark.streaming.receiver.writeAheadLog.closeFileAfterWrite。
1.7 設置最大接收速率:
如果集群資源不夠大,spark streaming應用程序無法以接收數據的速度處理數據,則可以通過設置記錄的最大速率限制來限制接收器的速率,請參閱:
在Spark 1.5中,我們引入了一種稱為背壓的功能,它消除了設置此速率限制的需要,因為Spark Streaming會自動計算速率限制,并在處理條件發生變化時動態調整速率限制??赏ㄟ^將配置參數spark.streaming.backpressure.enabled設置為true來啟用此背壓。
如果你需要升級spark streaming應用程序代碼,有兩種可能的機制。2.1 升級后的Spark Streaming應用程序將啟動,并與現有應用程序并行運行。一旦新的(接收到與舊的相同的數據)被預熱并準備好進入黃金時段,舊的就可以被取下。2.2 現有應用程序正常關閉(有關正常關閉選項,請參閱StreamingContext.stop(…)或JavaStreamingContext.stop(…),以確保在關閉之前完全處理已接收的數據。然后可以啟動升級后的應用程序,該應用程序將從早期應用程序停止的同一點開始處理。請注意,這只能通過支持源端緩沖(如Kafka)的輸入源來實現,因為在上一個應用程序關閉且升級的應用程序尚未啟動時,需要緩沖數據。無法從升級前代碼的早期檢查點信息重新啟動。檢查點信息實質上包含序列化的Scala/Java/Python對象,嘗試使用新的、修改過的類反序列化對象可能會導致錯誤。在這種情況下,使用不同的檢查點目錄啟動升級的應用程序,或者刪除以前的檢查點目錄。
除了Spark的監控功能外,還有Spark streaming特有的其他功能。使用StreamingContext時,Spark web UI會顯示一個附加的流選項卡,其中顯示有關正在運行的接收器(接收器是否處于活動狀態、接收到的記錄數、接收器錯誤等)和已完成批次(批處理時間、排隊延遲等)的統計信息。這可用于監視流應用程序的進度。- processing time:處理每個批次花費的時間
- Scheduling Delay:批在隊列里等待前一批處理完成的時間
如果批次處理時間始終大于批次間隔和/或排隊延遲持續增加,則表明系統無法以生成批次的速度處理批次,并且正在落后。在這種情況下,考慮減少批處理時間。還可以使用StreamingListener接口監控Spark streaming程序的進度,該接口允許您獲取接收器狀態和處理時間。請注意,這是一個開發人員API,將來可能會對其進行改進(即報告更多信息)。
要從集群上的Spark流媒體應用程序中獲得最佳性能,需要進行一些調整。本節介紹了一些可以調整以提高應用程序性能的參數和配置。在高層次上,你需要考慮兩件事:- 設置正確的批大小,以便可以在接收數據時盡快處理數據批(即,數據處理與數據攝取保持同步)。
減少批處理時間可以在Spark中進行許多優化,以最大限度地縮短每個批次的處理時間。通過網絡接收數據(如kafka,socket等)需要將數據反序列化并存儲到spark中,如果數據接收成為系統中的瓶頸,那么考慮數據接收的并行化。請注意,每個輸入數據流都會創建一個接收單個數據流的接收器(在工作機器上運行)。因此,通過創建多個輸入數據流并將其配置為從源接收數據流的不同分區,可以實現接收多個數據流。例如,接收兩個主題數據的單個kafka輸入數據流可以分成兩個kafka輸入流,每個kafka輸入流只接收一個主題。這將運行兩個接收器,允許并行接收數據,從而提高了總體吞吐量。這些多個數據流可以聯合在一起以創建單個數據流。然后,應用于單個輸入數據流的轉換可以應用于統一流??梢赃@樣做:應考慮的另一個參數是接收器的塊間隔,它由配置參數spark.streaming.blockInterval確定。對于大多數接收器,接收到的數據在存儲到Spark的內存中之前會合并成數據塊。每個批處理中的塊數決定了在類似映射的轉換中用于處理接收數據的任務數。每批每個接收器的任務數大約為(批間隔/塊間隔)。例如,200 ms的塊間隔將每2秒批創建10個任務。如果任務數量太少(即,少于每臺機器的核心數量),那么它將是低效的,因為所有可用的核心都不會用于處理數據。要增加給定批處理間隔的任務數,請減少塊間隔。但是,建議的最小塊間隔值約為50 ms,低于該值,任務啟動開銷可能會出現問題。使用多個輸入流/接收器接收數據的另一種方法是顯式地重新劃分輸入數據流(使用inputStream.repartition())。這將在進一步處理之前在群集中指定數量的計算機上分發接收到的數據批。如果在計算的任何階段中使用的并行任務的數量不夠多,那么集群資源可能會利用不足。例如,對于reduceByKey和ReduceByAndWindow等分布式reduce操作,并行任務的默認數量由spark.default.parallelism配置屬性控制。您可以將并行級別作為參數傳遞(請參閱PairDStreamFunctions文檔),或者設置spark.default.parallelism配置屬性以更改默認值。通過調整序列化格式,可以減少數據序列化的開銷。在流式傳輸的情況下,有兩種類型的數據可以被序列化。- 默認情況下,通過接收器接收的輸入數據存儲在具有StorageLevel.memory_DISK_SER_2的執行器內存中。也就是說,數據序列化為字節以減少GC開銷,并復制以容忍執行器故障。此外,數據首先保存在內存中,只有當內存不足以保存流計算所需的所有輸入數據時,才會溢出到磁盤。這種序列化顯然有開銷——接收器必須對接收到的數據進行反序列化,并使用Spark的序列化格式對其重新序列化。
- 流式計算生成的RDD可以保存在內存中。例如,窗口操作將數據保存在內存中,因為它們將被多次處理。但是,與Spark Core默認的StorageLevel.MEMORY_ONLY不同,流式計算生成的持久化RDD默認使用StorageLevel.MEMORY_ONLY_DISK(即序列化)持久化,以最小化GC開銷。
在這兩種情況下,使用Kryo序列化可以減少CPU和內存開銷。在流應用程序需要保留的數據量不大的特定情況下,可以將數據(兩種類型)作為反序列化對象持久化,而不會產生過多的GC開銷。例如,如果使用幾秒鐘的批處理間隔且沒有窗口操作,則可以通過顯式設置相應的存儲級別來嘗試禁用持久化數據中的序列化。這將減少由于序列化而產生的CPU開銷,從而有可能在沒有太多GC開銷的情況下提高性能。如果每秒啟動的任務數很高(例如,每秒50個或更多),則向執行者發送任務的開銷可能很大,并且將很難實現亞秒延遲。執行模式:在standalone模式或粗粒度Mesos模式下運行Spark會導致比細粒度Mesos模式更好的任務啟動時間。這些更改可能會將批處理時間減少100毫秒,從而允許使用次秒級的批處理大小。為了使在集群上運行的Spark Streaming應用程序保持穩定,系統應該能夠以接收數據的速度處理數據。換句話說,批處理速度應該與接收數據速度一樣快,可以通過web UI監控批數據處理時間,其應該小于批處理間隔。根據流計算的性質,所使用的批處理間隔可能會對應用程序在一組固定的群集資源上可以維持的數據速率產生重大影響。例如,讓我們考慮較早的WorddCurnNead示例。對于特定的數據速率,系統可能能夠每2秒(即2秒的批處理間隔)跟蹤報告字數,但不是每500毫秒一次。因此,需要設置批次間隔,以便能夠維持生產中的預期數據速率。為應用程序確定正確的批處理大小的一個好方法是使用保守的批處理間隔(例如,5-10秒)和低數據速率對其進行測試。為了驗證系統是否能夠跟上數據速率,您可以檢查每個處理批次所經歷的端到端延遲值(在Spark driver log4j日志中查找“總延遲”,或使用StreamingListener接口)。如果延遲保持與批量大小相當,則系統是穩定的。否則,如果延遲持續增加,則意味著系統無法跟上,因此不穩定。一旦你有了一個穩定配置的想法,你可以嘗試增加數據速率和/或減少批量大小。請注意,只要延遲降低回較低值(即,小于批量大?。捎谂R時數據速率增加而導致的延遲瞬時增加就可以了。我們將專門討論Spark流應用程序上下文中的一些調優參數。Spark流應用程序所需的集群內存量在很大程度上取決于所使用的轉換類型。例如,如果要對最后10分鐘的數據使用窗口操作,那么集群應該有足夠的內存保留10分鐘的數據。或者,如果您想使用帶有大量鍵的updateStateByKey,那么所需的內存將很高。相反,如果要執行簡單的映射過濾器存儲操作,則所需內存將較低。通常,由于通過接收器接收的數據存儲在StorageLevel.MEMORY_AND_DISK_SER_2中,因此超過內存的數據將溢出到磁盤。這可能會降低streaming應用程序的性能。因此建議根據streaming應用程序的要求提供足夠的內存。最好嘗試在小范圍內查看內存使用情況,并進行相應的估計。內存調優的另一個方面是垃圾收集。對于需要低延遲的流應用程序,不希望JVM垃圾收集導致暫停。- imput data數據和RDD在默認情況下作為序列化字節持久化。與反序列化持久化相比,這減少了內存使用和GC開銷。啟用Kryo序列化進一步減少了序列化大小和內存使用。
- 默認情況下,由數據流轉換生成的所有輸入數據和持久化RDD將自動清除。Spark Streaming根據所使用的轉換決定何時清除數據。例如,如果使用10分鐘的窗口操作,則Spark Streaming將保留最后10分鐘的數據,并主動丟棄較舊的數據。通過設置streamingContext.remember,數據可以保留更長的時間。
- 強烈建議使用并發標記和掃描GC,以保持GC相關暫停始終較低。盡管已知并發GC會降低系統的總體處理吞吐量,但仍建議使用它來實現更一致的批處理時間。確保在驅動程序(使用spark submit中的--driver java選項)和執行器(使用spark配置spark.executor.extraJavaOptions)上設置CMS GC。
接下來討論在spark streaming應用程序中發生故障時行為。為了理解spark streaming的容錯語義,讓我們先看下RDD的基本容錯語義。- RDD是一個不可變的、確定的和可重新計算的分布式數據集,每個RDD都會記住創建數據集的依賴。
- 如果RDD的任何分區由于工作節點故障而丟失,則可以使用操作依賴關系從原始容錯數據集重新計算該分區。
- 假設所有的RDD轉換都是確定性的,那么不管Spark集群中是否出現故障,最終轉換的RDD中的數據都將始終相同。
Spark對HDFS或S3等容錯文件系統中的數據進行操作。因此,從容錯數據生成的所有RDD也是容錯的。但是,Spark streaming的情況并非如此,因為在大多數情況下,數據是通過網絡接收的(使用fileStream時除外)。為了為所有生成的RDD實現相同的容錯屬性,在群集中工作節點的多個Spark執行器之間復制接收到的數據(默認復制系數為2)。- 接收和復制的數據—此數據在單個工作節點發生故障時仍然有效,因為它的副本存在于另一個節點上。
- 已接收但已緩沖用于復制的數據—由于未復制此數據,因此恢復此數據的唯一方法是再次從源獲取它。
- 運行executors的任何工作節點都可能失敗,并且這些節點上的所有內存中數據都將丟失。如果任何接收器在發生故障的節點上運行,則其緩沖數據將丟失。
- 如果運行Spark Streaming應用程序的驅動程序節點出現故障,則SparkContext顯然會丟失,所有executor及其內存中的數據也會丟失。
在所有可能的操作條件下(盡管出現故障等),系統可以提供三種類型的保證:- 至少一次:每條記錄將被處理一次或多次。這比最多一次強,因為它確保不會丟失任何數據。但也可能有重復。
- 只有一次:每條記錄將精確處理一次-不會丟失任何數據,也不會多次處理任何數據。這顯然是三者中最有力的保證。
在任何流處理系統中,廣義地說,處理數據有三個步驟。- 輸出數據:最終轉換的數據輸出到外部系統,如文件系統、數據庫、儀表板等。
如果流應用程序必須實現端到端的精確一次保證,那么每個步驟都必須提供精確一次的保證。也就是說,每個記錄必須準確接收一次,準確轉換一次,并準確推送到下游系統一次。讓我們在Spark流的上下文中理解這些步驟的語義。- 接收數據:不同的輸入源提供不同的保證,文章后面將詳細討論。
- 轉換數據:由于RDD提供的保證,所有接收到的數據都將被精確地處理一次。即使出現故障,只要接收到的輸入數據是可訪問的,最終轉換的RDD將始終具有相同的內容。
- 推出數據:默認情況下,輸出操作至少確保一次語義,因為它取決于輸出操作的類型(冪等式或非冪等式)和下游系統的語義(是否支持事務)。但用戶可以實現自己的事務機制,以實現精確的一次性語義。文章后面將詳細討論這一點。
不同的輸入源提供不同的保證,從至少一次到恰好一次不等。如果所有輸入數據都已存在于HDFS等容錯文件系統中,Spark Streaming始終可以從任何故障中恢復并處理所有數據。這給出了精確的一次語義,這意味著所有數據將被精確地處理一次,而不管什么失敗。對于基于接收器的輸入源,容錯語義取決于故障場景和接收器類型。如前所述,有兩種類型的接收器:- 可靠接收器-這些接收器僅在確保已復制接收到的數據后才確認可靠來源。如果這樣的接收器出現故障,源將不會收到緩沖(未復制)數據的確認。因此,如果接收器重新啟動,源將重新發送數據,并且不會因故障而丟失任何數據。
- 不可靠的接收器-此類接收器不發送確認,因此在由于工作人員或驅動程序故障而失敗時可能會丟失數據。
如果工作節點發生故障,則可靠的接收器不會丟失數據。對于不可靠的接收器,接收到但未復制的數據可能會丟失。如果驅動程序節點發生故障,那么除了這些丟失之外,在內存中接收和復制的所有過去的數據都將丟失。這將影響有狀態轉換的結果。為了避免丟失過去接收到的數據,Spark 1.2引入了預寫日志,將接收到的數據保存到容錯存儲器中。通過啟用預寫日志和可靠的接收器,可以實現零數據丟失。在語義方面,它提供了至少一次的保證。在Spark 1.3中,我們引入了一個新的Kafka Direct API,它可以確保Spark Streaming只接收一次所有Kafka數據。此外,如果實現一次輸出操作,則可以實現端到端的一次輸出保證。輸出操作(比如foreachRDD)至少有一次語義,也就是說,如果工作程序發生故障,轉換后的數據可能會多次寫入外部實體。雖然這對于使用saveAs***Files操作保存到文件系統是可以接受的(因為文件將被相同的數據覆蓋),但是可能需要額外的努力來實現一次語義。- 冪等更新:多次嘗試總是寫入相同的數據。例如,saveAs***文件總是將相同的數據寫入生成的文件。
- 事務性更新:所有更新都是以事務方式進行的,因此更新只以原子方式進行一次。實現這一點的一種方法是:
使用批處理時間(在foreachRDD中可用)和RDD的分區索引來創建標識符。此標識符唯一標識流應用程序中的數據。使用標識符以事務方式(即原子方式)更新外部系統。也就是說,如果標識符尚未提交,則以原子方式提交分區數據和標識符。否則,如果已提交,則跳過更新。
為了實現讀取并行性,需要創建多個接收器,即多個數據流。接收器在執行器中運行。它占據一個核心。確保在預訂接收器插槽后有足夠的內核進行處理,即spark.cores.max應考慮接收器插槽。接收器以循環方式分配給執行者。每個塊間隔生成一個新的數據塊。在N個塊間隔內創建N個數據塊。這些塊由當前executor的塊管理器分配給其他executor的塊管理器。之后,驅動程序上運行的網絡輸入跟蹤器將被告知塊位置,以便進一步處理。6.3 在驅動程序(driver)上為批處理間隔期間創建的塊創建RDD。批處理間隔期間生成的塊是分區RDD。每個分區都是spark中的一項任務。塊間隔(blockInterval)==批處理間隔(batchinterval)意味著創建了單個分區,并且可能在本地對其進行處理。6.4 塊上的map任務在具有塊的executor(一個接收塊,另一個復制塊)中處理,而不考慮塊間隔,除非非本地調度開始。擁有更大的區塊間隔意味著更大的區塊。較高的spark.locality.wait值會增加在本地節點上處理塊的機會。需要在這兩個參數之間找到平衡,以確保在本地處理較大的塊。6.5 您可以通過調用inputDstream.repartition(n)來定義分區的數量,而不是依賴于batchInterval和blockInterval。這將隨機重新排列RDD中的數據,以創建n個分區。是的,為了更大的并行性。雖然是以洗牌為代價的。RDD的處理由驅動程序的jobscheduler作為作業進行調度。在給定的時間點,只有一個作業處于活動狀態。因此,如果一個作業正在執行,其他作業將排隊。6.6 如果有兩個數據流,將形成兩個RDD,并將創建兩個作業,這兩個作業將一個接一個地安排。為了避免這種情況,可以合并兩個數據流。這將確保為數據流的兩個RDD形成一個unionRDD。然后將此unionRDD視為單個作業。但是,RDD的分區不受影響。6.7 如果批處理時間超過batchinterval,那么很明顯,接收器的內存將開始填滿,并最終引發異常(很可能是BlockNotFoundException)。目前,無法暫停接收器。使用SparkConf配置spark.streaming.receiver.maxRate,可以限制接收器的速率。
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/129642.html