摘要:過程中,各個節(jié)點上的相同都會先寫入本地磁盤文件中,然后其他節(jié)點需要通過網(wǎng)絡傳輸拉取各個節(jié)點上的磁盤文件中的相同。因此在過程中,可能會發(fā)生大量的磁盤文件讀寫的操作,以及數(shù)據(jù)的網(wǎng)絡傳輸操作。
需要對名為“hello.txt”的HDFS文件進行一次map操作,再進行一次reduce操作。也就是說,需要對一份數(shù)據(jù)執(zhí)行兩次算子操作。
錯誤的做法:
對于同一份數(shù)據(jù)執(zhí)行多次算子操作時,創(chuàng)建多個RDD。//這里執(zhí)行了兩次textFile方法,針對同一個HDFS文件,創(chuàng)建了兩個RDD出來,然后分別對每個RDD都執(zhí)行了一個算子操作。
這種情況下,Spark需要從HDFS上兩次加載hello.txt文件的內容,并創(chuàng)建兩個多帶帶的RDD;//第二次加載HDFS文件以及創(chuàng)建RDD的性能開銷,很明顯是白白浪費掉的。
val rdd1 = sc.textFile("hdfs://master:9000/hello.txt")rdd1.map(...)val rdd2 = sc.textFile("hdfs://master:9000/hello.txt")rdd2.reduce(...)
正確的用法:
對于一份數(shù)據(jù)執(zhí)行多次算子操作時,只使用一個RDD。
錯誤的做法:
有一個
接著由于業(yè)務需要,對rdd1執(zhí)行了一個map操作,創(chuàng)建了一個rdd2,而rdd2中的數(shù)據(jù)僅僅是rdd1中的value值而已,也就是說,rdd2是rdd1的子集。
JavaPairRDD rdd1 = ...JavaRDD rdd2 = rdd1.map(...)
分別對rdd1和rdd2執(zhí)行了不同的算子操作。
rdd1.reduceByKey(...)rdd2.map(...)
正確的做法:
rdd2的數(shù)據(jù)完全就是rdd1的子集而已,卻創(chuàng)建了兩個rdd,并對兩個rdd都執(zhí)行了一次算子操作。
此時會因為對rdd1執(zhí)行map算子來創(chuàng)建rdd2,而多執(zhí)行一次算子操作,進而增加性能開銷。
其實在這種情況下完全可以復用同一個RDD。
我們可以使用rdd1,既做reduceByKey操作,也做map操作。
JavaPairRDD rdd1 = ...rdd1.reduceByKey(...)rdd1.map(tuple._2...)
正確的做法:
cache()方法表示:使用非序列化的方式將RDD中的數(shù)據(jù)全部嘗試持久化到內存中。
此時再對rdd1執(zhí)行兩次算子操作時,只有在第一次執(zhí)行map算子時,才會將這個rdd1從源頭處計算一次。
第二次執(zhí)行reduce算子時,就會直接從內存中提取數(shù)據(jù)進行計算,不會重復計算一個rdd。
val rdd1 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt").cache()rdd1.map(...)rdd1.reduce(...)
序列化的方式可以減少持久化的數(shù)據(jù)對內存/磁盤的占用量,進而避免內存被持久化數(shù)據(jù)占用過多,從而發(fā)生頻繁GC。
val rdd1 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt") .persist(StorageLevel.MEMORY_AND_DISK_SER)rdd1.map(...)rdd1.reduce(...)
注意:通常不建議使用DISK_ONLY和后綴為_2的級別:因為完全基于磁盤文件進行數(shù)據(jù)的讀寫,會導致性能急劇降低,導致網(wǎng)絡較大開銷
如果有可能的話,要盡量避免使用shuffle類算子,最消耗性能的地方就是shuffle過程。
shuffle過程中,各個節(jié)點上的相同key都會先寫入本地磁盤文件中,然后其他節(jié)點需要通過網(wǎng)絡傳輸拉取各個節(jié)點上的磁盤文件中的相同key。而且相同key都拉取到同一個節(jié)點進行聚合操作時,還有可能會因為一個節(jié)點上處理的key過多,導致內存不夠存放,進而溢寫到磁盤文件中。因此在shuffle過程中,可能會發(fā)生大量的磁盤文件讀寫的IO操作,以及數(shù)據(jù)的網(wǎng)絡傳輸操作。磁盤IO和網(wǎng)絡數(shù)據(jù)傳輸也是shuffle性能較差的主要原因。
盡可能避免使用reduceByKey、join、distinct、repartition等會進行shuffle的算子,盡量使用map類的非shuffle算子。
傳統(tǒng)的join操作會導致shuffle操作。
因為兩個RDD中,相同的key都需要通過網(wǎng)絡拉取到一個節(jié)點上,由一個task進行join操作。
val rdd3 = rdd1.join(rdd2)
Broadcast+map的join操作,不會導致shuffle操作。
使用Broadcast將一個數(shù)據(jù)量較小的RDD作為廣播變量。
val rdd2Data = rdd2.collect()val rdd2DataBroadcast = sc.broadcast(rdd2Data)val rdd3 = rdd1.map(rdd2DataBroadcast...)
注意:以上操作,建議僅僅在rdd2的數(shù)據(jù)量比較少(比如幾百M,或者一兩G)的情況下使用。因為每個Executor的內存中,都會駐留一份rdd2的全量數(shù)據(jù)。
如果因為業(yè)務需要,一定要使用shuffle操作,無法用map類的算子來替代,那么盡量使用可以map-side預聚合的算子,類似于MapReduce中的本地combiner。map-side預聚合之后,每個節(jié)點本地就只會有一條相同的key,因為多條相同的key都被聚合起來了。其他節(jié)點在拉取所有節(jié)點上的相同key時,就會大大減少需要拉取的數(shù)據(jù)數(shù)量,從而也就減少了磁盤IO以及網(wǎng)絡傳輸開銷。
建議使用reduceByKey或者aggregateByKey算子來替代掉groupByKey算子
使用reduceByKey/aggregateByKey替代groupByKey : map-side
使用mapPartitions替代普通map : 函數(shù)執(zhí)行頻率
使用foreachPartitions替代foreach : 函數(shù)執(zhí)行頻率
使用filter之后進行coalesce操作 : filter后對分區(qū)進行壓縮
使用repartitionAndSortWithinPartitions替代repartition與sort類操作
repartitionAndSortWithinPartitions是Spark官網(wǎng)推薦的一個算子,官方建議,如果需要在repartition重分區(qū)之后,還要進行排序,建議直接使用repartitionAndSortWithinPartitions算子
有時在開發(fā)過程中,會遇到需要在算子函數(shù)中使用外部變量的場景(尤其是大變量,比如100M以上的大集合),那么此時就應該使用Spark的廣播(Broadcast)功能來提升性能。
默認情況下,Spark會將該變量復制多個副本,通過網(wǎng)絡傳輸?shù)絫ask中,此時每個task都有一個變量副本。如果變量本身比較大的話(比如100M,甚至1G),那么大量的變量副本在網(wǎng)絡中傳輸?shù)男阅荛_銷,以及在各個節(jié)點的Executor中占用過多內存導致的頻繁GC,都會極大地影響性能。
廣播后的變量,會保證每個Executor的內存中,只駐留一份變量副本,而Executor中的task執(zhí)行時共享該Executor中的那份變量副本。
1)在算子函數(shù)中使用到外部變量時,該變量會被序列化后進行網(wǎng)絡傳輸。
2)將自定義的類型作為RDD的泛型類型時(比如JavaRDD,Student是自定義類型),所有自定義類型對象,都會進行序列化。因此這種情況下,也要求自定義的類必須實現(xiàn)Serializable接口。
3)使用可序列化的持久化策略時(比如MEMORY_ONLY_SER),Spark會將RDD中的每個partition都序列化成一個大的字節(jié)數(shù)組。
Spark默認使用的是Java的序列化機制,你可以使用Kryo作為序列化類庫,效率要比Java的序列化機制要高
// 創(chuàng)建SparkConf對象。val conf = new SparkConf().setMaster(...).setAppName(...)// 設置序列化器為KryoSerializer。conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")// 注冊要序列化的自定義類型。conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
當遇到userData和events進行join時,userData比較大,而且join操作比較頻繁,這個時候,可以先將userData調用了 partitionBy()分區(qū),可以極大提高效率。
cogroup()、 groupWith()、join()、leftOuterJoin()、rightOuterJoin()、groupByKey()、reduceByKey()、 combineByKey() 以及 lookup()等都能夠受益
總結:如果遇到一個RDD頻繁和其他RDD進行Shuffle類操作,比如 cogroup()、 groupWith()、join()、leftOuterJoin()、rightOuterJoin()、groupByKey()、reduceByKey()、 combineByKey() 以及 lookup()等,那么最好將該RDD通過partitionBy()操作進行預分區(qū),這些操作在Shuffle過程中會減少Shuffle的數(shù)據(jù)量
Java中,有三種類型比較耗費內存:
1)對象,每個Java對象都有對象頭、引用等額外的信息,因此比較占用內存空間。
2)字符串,每個字符串內部都有一個字符數(shù)組以及長度等額外信息。
3)集合類型,比如HashMap、LinkedList等,因為集合類型內部通常會使用一些內部類來封裝集合元素,比如Map.Entry
Spark官方建議,在Spark編碼實現(xiàn)中,特別是對于算子函數(shù)中的代碼,盡量不要使用上述三種數(shù)據(jù)結構,盡量使用字符串替代對象,使用原始類型(比如Int、Long)替代字符串,使用數(shù)組替代集合類型,這樣盡可能地減少內存占用,從而降低GC頻率,提升性能。
文章版權歸作者所有,未經(jīng)允許請勿轉載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/123982.html
摘要:正如我標題所說,簡歷被拒。看了我簡歷之后說頭條競爭激烈,我背景不夠,點到為止。。三準備面試其實從三月份投遞簡歷開始準備面試到四月份收,也不過個月的時間,但這都是建立在我過去一年的積累啊。 本文是 無精瘋 同學投稿的面試經(jīng)歷 關注微信公眾號:進擊的java程序員K,即可獲取最新BAT面試資料一份 在此感謝 無精瘋 同學的分享 目錄: 印象中的頭條 面試背景 準備面試 ...
摘要:正如我標題所說,簡歷被拒。看了我簡歷之后說頭條競爭激烈,我背景不夠,點到為止。。三準備面試其實從三月份投遞簡歷開始準備面試到四月份收,也不過個月的時間,但這都是建立在我過去一年的積累啊。 本文是 無精瘋 同學投稿的面試經(jīng)歷 關注微信公眾號:進擊的java程序員K,即可獲取最新BAT面試資料一份 在此感謝 無精瘋 同學的分享目錄:印象中的頭條面試背景準備面試頭條一面(Java+項目)頭條...
摘要:創(chuàng)新萌芽期望最頂點下調預期至低點回歸理想生產(chǎn)率平臺。而大數(shù)據(jù)已從頂峰滑落,和云計算接近谷底。對于迅速成長的中國市場,大公司也意味著大數(shù)據(jù)。三家對大數(shù)據(jù)的投入都是不惜余力的。 非商業(yè)轉載請注明作譯者、出處,并保留本文的原始鏈接:http://www.ituring.com.cn/article/177529 董飛,Coursera數(shù)據(jù)工程師。曾先后在創(chuàng)業(yè)公司酷迅,百度基礎架構組...
閱讀 2027·2021-11-19 11:37
閱讀 714·2021-11-11 16:54
閱讀 1161·2021-11-02 14:44
閱讀 3049·2021-09-02 15:40
閱讀 2368·2019-08-30 15:44
閱讀 951·2019-08-29 11:17
閱讀 1059·2019-08-26 14:06
閱讀 1552·2019-08-26 13:47