国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專欄INFORMATION COLUMN

Spark Streaming遇到問題分析

stormzhang / 809人閱讀

摘要:遇到問題分析之后搞了個還沒仔細了解可參考的與的有區別及并發控制先看看的,與的這幾個概念。一個可以認為就是會最終輸出一個結果的一條由組織而成的計算。在中,我們通過使用新極大地增強對狀態流處理的支持。

Spark Streaming遇到問題分析

1、Spark2.0之后搞了個Structured Streaming

還沒仔細了解,可參考:https://github.com/lw-lin/Coo...

2、Spark的Job與Streaming的Job有區別及Streaming Job并發控制:

先看看Spark Streaming 的 JobSet, Job,與 Spark Core 的 Job, Stage, TaskSet, Task 這幾個概念。

[Spark Streaming]

JobSet 的全限定名是:org.apache.spark.streaming.scheduler.JobSet

Job 的全限定名是:org.apache.spark.streaming.scheduler.Job

[Spark Core]

Job 沒有一個對應的實體類,主要是通過 jobId:Int 來表示一個具體的 job

Stage 的全限定名是:org.apache.spark.scheduler.Stage

TaskSet 的全限定名是:org.apache.spark.scheduler.TaskSet

Task 的全限定名是:org.apache.spark.scheduler.Task
Spark Core 的 Job, Stage, Task 就是我們“日常”談論 Spark任務時所說的那些含義,而且在 Spark 的 WebUI 上有非常好的體現,比如下圖就是 1 個 Job 包含duo1 個 Stage;3 個 Stage 各包含 8, 2, 4 個 Task。而 TaskSet 則是 Spark Core 的內部代碼里用的類,是 Task 的集合,和 Stage 是同義的。

Spark Core中:一個RDD DAG Graph可以生成一個或多個Job。一個Job可以認為就是會最終輸出一個結果RDD的一條由RDD組織而成的計算。Job在spark里應用里是一個被調度的單位。
Streaming中:一個batch的數據對應一個DStreamGraph,而一個DStreamGraph包含一或多個關于DStream的輸出操作,每一個輸出對應于一個Job,一個DStreamGraph對應一個JobSet,里面包含一個或多個Job。用下圖表示如下:

生產的JobSet會提交給JobScheduler去執行,JobScheduler包含了一個線程池,通過spark.streaming.concurrentJobs參數來控制其大小,也就是可以并發執行的job數,默認是1.不過這個參數的設置以集群中executor機器的cpu core為準,比如集群中有2臺4核executor,那么spark.streaming.concurrentJobs可以設置為2x4=8. 同時你還可以控制調度策略:spark.scheduler.mode (FIFO/FAIR) 默認是FIFO

我們的問題就是,運行一段時間之后發現處理速度跟不上了,后來才發現原來這個參數默認是1,而我們代碼中對于每個batch有兩個輸出操作,這樣會產生兩個job,而同一時間只能執行一個job,慢慢地處理速度就跟不上生產速度了。所以實際中,請根據具體情況調整該參數。

此處參考:

https://github.com/lw-lin/Coo...

http://blog.csdn.net/xueba207...

http://www.jianshu.com/p/ab38...

3、 Spark Streaming緩存數據清理:

調用cache有兩種情況,一種是調用DStream.cache,第二種是RDD.cache。事實上他們是完全一樣的。

DStream的cache 動作只是將DStream的變量storageLevel 設置為MEMORY_ONLY_SER,然后在產生(或者獲取)RDD的時候,調用RDD的persit方法進行設置。所以DStream.cache 產生的效果等價于RDD.cache(也就是你自己調用foreachRDD 將RDD 都設置一遍)

注意,當你調用dstream.cache緩存數據的時候,Streaming在該batch處理完畢后,默認會立即清除這個緩存,通過參數spark.streaming.unpersist 你是可以決定是否手工控制是否需要對cache住的數據進行清理.

參考:
http://www.jianshu.com/p/f068...
https://github.com/apache/spa...

4、Streaming中維持狀態

有兩種方式:updateStateByKey和mapWithState(Spark 1.6以后新增的)。
推薦使用mapWithState, 實際使用中,我發現updateStateByKey會慢慢拖慢處理速度,問題描述與該情況類似:http://comments.gmane.org/gma...

許多復雜流處理流水線程序必須將狀態保持一段時間,例如,如果你想實時了解網站用戶行為,你需要將網站上各“用戶會話(user session)”信息保存為持久狀態并根據用戶的行為對這一狀態進行持續更新。這種有狀態的流計算可以在Spark Streaming中使用updateStateByKey 方法實現。

在Spark 1.6 中,我們通過使用新API mapWithState極大地增強對狀態流處理的支持。該新的API提供了通用模式的內置支持,而在以前使用updateStateByKey 方法來實現這一相同功能(如會話超時)需要進行手動編碼和優化。因此,mapWithState 方法較之于updateStateByKey方法,有十倍之多的性能提升

使用mapWithState方法進行狀態流處理
盡管現有DStream中updateStateByKey方法能夠允許用戶執行狀態計算,但使用mapWithState方法能夠讓用戶更容易地表達程序邏輯,同時讓性能提升10倍之多。讓我們通過一個例子對mapWithState方法的優勢進行闡述。

假設我們要根據用戶歷史動作對某一網站的用戶行為進行實時分析,對各個用戶,我們需要保持用戶動作的歷史信息,然后根據這些歷史信息得到用戶的行為模型并輸出到下游的數據存儲當中。
在Spark Streaming中構建此應用程序時,我們首先需要獲取用戶動作流作為輸入(例如通過Kafka或Kinesis),然后使用mapWithState 方法對輸入進行轉換操作以生成用戶模型流,最后將處理后的數據流保存到數據存儲當中。

mapWithState方法可以通過下面的抽象方式進行理解,假設它是將用戶動作和當前用戶會話作為輸入的一個算子(operator),基于某個輸入動作,該算子能夠有選擇地更新用戶會話,然后輸出更新后的用戶模型作為下游操作的輸入。開發人員在定義mapWithState方法時可以指定該更新函數。

首先我們定義狀態數據結構及狀態更新函數:

def stateUpdateFunction(
userId: UserId,
newData: UserAction, 
stateData: State[UserSession]): UserModel = {

    val currentSession = stateData.get()// 獲取當前會話數據
    val updatedSession = ...  // 使用newData計算更新后的會話
    stateData.update(updatedSession) // 更新會話數據     

    val userModel = ...  // 使用updatedSession計算模型
    return userModel   // 將模型發送給下游操作
}
// 用去動作構成的Stream,用戶ID作為key
val userActions = ...  // key-value元組(UserId, UserAction)構成的stream
// 待提交的數據流
val userModels = userActions.mapWithState(StateSpec.function(stateUpdateFunction))
//--------------------------------------------------------------------------------------

//java的例子
Function3, State, Void> mappingFunction =
new Function3, State, Void>() {
    @Override
    public Void call(String key,Optional value, State state) {
        // Use state.exists(), state.get(), state.update() and state.remove()
        // to manage state, and return the necessary string
        LiveInfo info=value.orNull();
        if(info!=null){
            state.update(info.getChannel()+":::"+info.getTime());
        }
        return null;
    }
};
//處理計數
samples
//先將ip作為key
.mapPartitionsToPair((v)->{
    List> list=new ArrayList<>();
    while(v.hasNext()){
        Tuple2 tmpv = v.next();
        String channelName=tmpv._1();
        String ip=tmpv._2().getIp();
        list.add(new Tuple2(ip,tmpv._2()));
    }
    return list.iterator();
})
//更新狀態
.mapWithState(
    StateSpec.function(mappingFunction)
        //4小時沒有更新則剔除
        .timeout(Durations.minutes(4*60))
)
//獲得狀態快照流
.stateSnapshots()
//后續操作

mapWithState的新特性和性能改進

原生支持會話超時
許多基于會話的應用程序要求具備超時機制,當某個會話在一定的時間內(如用戶沒有顯式地注銷而結束會話)沒有接收到新數據時就應該將其關閉,與使用updateStateByKey方法時需要手動進行編碼實現所不同的是,開發人員可以通過mapWithState方法直接指定其超時時間。

userActions.mapWithState(StateSpec.function(stateUpdateFunction).timeout(Minutes(10)))

除超時機制外,開發人員也可以設置程序啟動時的分區模式和初始狀態信息。

任意數據都能夠發送到下游
與updateStateByKey方法不同,任意數據都可以通過狀態更新函數將數據發送到下游操作,這一點已經在前面的例子中有說明(例如通過用戶會話狀態返回用戶模型),此外,最新狀態的快照也能夠被訪問。

val userSessionSnapshots = userActions.mapWithState(statSpec).snapshotStream()

變量userSessionSnapshots 為一個DStream,其中各個RDD為各批(batch)數據處理后狀態更新會話的快照,該DStream與updateStateByKey方法返回的DStream是等同的。

更高的性能
最后,與updateStateByKey方法相比,使用mapWithState方法能夠得到6倍的低延遲同時維護的key狀態數量要多10倍。

此部分參考:

http://blog.csdn.net/lively19...

http://blog.csdn.net/zengxiao...

https://databricks.gitbooks.i...

5、如何理解時間窗口?時間窗口中數據是否會存在重復?


上圖里 batch duration = 1, window length = 3, sliding interval = 2
任何情況下 Job Submit 是以 batch duration 為準, 對于 window 操作,每隔 sliding interval 才去實際生成 RDD(每隔batch都會生成一個RDD,只是到windowDStream的時候做了合并,生成UnionRDD或者PartitionerAwareUnionRDD,最后輸出一個RDD),每次計算的結果包括 window length 個 batch 的數據。

是否會存在重復?看下面兩張圖:

答案是:取決于你怎么設置窗口的兩個參數

(窗口長度)window length – 窗口覆蓋的時間長度

(滑動距離)sliding interval – 窗口啟動的時間間隔

更深入請參考:

https://github.com/lw-lin/Coo...

http://concord.io/posts/windo...

http://www.cnblogs.com/haozhe...

6、WAL(Write Ahead Log,預寫日志)與容錯機制

WAL是在 1.2 版本中就添加的特性。作用就是,將數據通過日志的方式寫到可靠的存儲,比如 HDFS、s3,在 driver 或 worker failure 時可以從在可靠存儲上的日志文件恢復數據。WAL 在 driver 端和 executor 端都有應用。
WAL使用在文件系統和數據庫用于數據操作的持久性,先把數據寫到一個持久化的日志中,然后對數據做操作,如果操作過程中系統掛了,恢復的時候可以重新讀取日志文件再次進行操作。
對于像kafka和flume這些使用接收器來接收數據的數據源。接收器作為一個長時間的任務運行在executor中,負責從數據源接收數據,如果數據源支持的話,向數據源確認接收到數據,然后把數據存儲在executor的內存中,然后driver在exector上運行任務處理這些數據。如果wal啟用了,所有接收到的數據會保存到一個日志文件中去(HDFS), 這樣保存接收數據的持久性,此外,只有在數據寫入到log中之后接收器才向數據源確認,這樣drive重啟后那些保存在內存中但是沒有寫入到log中的數據將會重新發送,這兩點保證的數據的無丟失。

啟用WAL:

給streamingContext設置checkpoint的目錄,該目錄必須是HADOOP支持的文件系統,用來保存WAL和做Streaming的checkpoint

spark.streaming.receiver.writeAheadLog.enable 設置為true

正常流程圖:

解析

1:藍色的箭頭表示接收的數據,接收器把數據流打包成塊,存儲在executor的內存中,如果開啟了WAL,將會把數據寫入到存在容錯文件系統的日志文件中

2:青色的箭頭表示提醒driver, 接收到的數據塊的元信息發送給driver中的StreamingContext, 這些元數據包括:executor內存中數據塊的引用ID和日志文件中數據塊的偏移信息

3:紅色箭頭表示處理數據,每一個批處理間隔,StreamingContext使用塊信息用來生成RDD和jobs. SparkContext執行這些job用于處理executor內存中的數據塊

4:黃色箭頭表示checkpoint這些計算,以便于恢復。流式處理會周期的被checkpoint到文件中

當一個失敗的driver重啟以后,恢復流程如下:

1:黃色的箭頭用于恢復計算,checkpointed的信息是用于重啟driver,重新構造上下文和重啟所有的receiver

2: 青色箭頭恢復塊元數據信息,所有的塊信息對已恢復計算很重要

3:重新生成未完成的job(紅色箭頭),會使用到2恢復的元數據信息

4:讀取保存在日志中的塊(藍色箭頭),當job重新執行的時候,塊數據將會直接從日志中讀取,

5:重發沒有確認的數據(紫色的箭頭)。緩沖的數據沒有寫到WAL中去將會被重新發送。

1、WAL在 driver 端的應用
用于寫日志的對象 writeAheadLogOption: WriteAheadLog在 StreamingContext 中的 JobScheduler 中的 ReceiverTracker 的 ReceivedBlockTracker 構造函數中被創建,ReceivedBlockTracker 用于管理已接收到的 blocks 信息。需要注意的是,這里只需要啟用 checkpoint 就可以創建該 driver 端的 WAL 管理實例,將 spark.streaming.receiver.writeAheadLog.enable 設置為 true。
首選需要明確的是,ReceivedBlockTracker 通過 WAL 寫入 log 文件的內容是3種事件(當然,會進行序列化):

BlockAdditionEvent(receivedBlockInfo: ReceivedBlockInfo);即新增了一個 block 及該 block 的具體信息,包括 streamId、blockId、數據條數等

BatchAllocationEvent(time: Time, allocatedBlocks: AllocatedBlocks);即為某個 batchTime 分配了哪些 blocks 作為該 batch RDD 的數據源

BatchCleanupEvent(times: Seq[Time]);即清理了哪些 batchTime 對應的 blocks

2、WAL 在 executor 端的應用
Receiver 接收到的數據會源源不斷的傳遞給 ReceiverSupervisor,是否啟用 WAL 機制(即是否將 spark.streaming.receiver.writeAheadLog.enable 設置為 true)會影響 ReceiverSupervisor 在存儲 block 時的行為:

不啟用 WAL:你設置的StorageLevel是什么,就怎么存儲。比如MEMORY_ONLY只會在內存中存一份,MEMORY_AND_DISK會在內存和磁盤上各存一份等

啟用 WAL:在StorageLevel指定的存儲的基礎上,寫一份到 WAL 中。存儲一份在 WAL 上,更不容易丟數據但性能損失也比較大

3、WAL 使用建議
關于是否要啟用 WAL,要視具體的業務而定:

若可以接受一定的數據丟失,則不需要啟用 WAL,WAL開啟了以后會減少Spark Streaming處理數據的吞吐,因為所有接收的數據會被寫到到容錯的文件系統上,這樣文件系統的吞吐和網絡帶寬將成為瓶頸。

若完全不能接受數據丟失,那就需要同時啟用 checkpoint 和 WAL,checkpoint 保存著執行進度(比如已生成但未完成的 jobs),WAL 中保存著 blocks 及 blocks 元數據(比如保存著未完成的 jobs 對應的 blocks 信息及 block 文件)。同時,這種情況可能要在數據源和 Streaming Application 中聯合來保證 exactly once 語義

此處參考:
http://www.jianshu.com/p/5e09...
http://www.cnblogs.com/gaoxin...

7、HDFS操作的一系列問題

1、ls操作
列出子目錄和文件(不包括嵌套層級):listStatus(path,filter)
列出所有(包括嵌套層級):listFiles(path,true)

Path file = new Path(HDFSConst.live_path);
FileStatus[] statuslist = hdfs.listStatus(file,
        (v) -> {
            return v.getName().contains(prefix)
                    && !v.getName().endsWith(HDFSConst.processSubffix);
        });
List paths = new ArrayList();
for (FileStatus status : statuslist) {
    Path tmp = new Path(status.getPath().toString());
    RemoteIterator statusIter = hdfs.listFiles(tmp,
            true);
    boolean shouldAdd = true;
    while (statusIter.hasNext()) {
        LocatedFileStatus status2 = statusIter.next();
        if (status2.getPath().toString().contains("/_temporary/")) {
            shouldAdd = false;
            break;
        }
    }
    if (shouldAdd) {
        paths.add(tmp);
    }
}

2、hadoop No FileSystem for scheme:
問題來源:
This is a typical case of the maven-assembly plugin breaking things.
Different JARs (hadoop-commons for LocalFileSystem, hadoop-hdfs for DistributedFileSystem) each contain a different file called org.apache.hadoop.fs.FileSystem in their META-INFO/services directory. This file lists the canonical classnames of the filesystem implementations they want to declare (This is called a Service Provider Interface implemented via java.util.ServiceLoader, see org.apache.hadoop.FileSystem line 2622).
When we use maven-assembly-plugin, it merges all our JARs into one, and all META-INFO/services/org.apache.hadoop.fs.FileSystem overwrite each-other. Only one of these files remains (the last one that was added). In this case, the FileSystem list from hadoop-commons overwrites the list from hadoop-hdfs, so DistributedFileSystem was no longer declared.

How we fixed it
After loading the Hadoop configuration, but just before doing anything FileSystem-related

hadoopConfig.set("fs.hdfs.impl", 
        org.apache.hadoop.hdfs.DistributedFileSystem.class.getName()
    );
    hadoopConfig.set("fs.file.impl",
        org.apache.hadoop.fs.LocalFileSystem.class.getName()
    );

不要使用maven-assembly-plugin,使用maven shade插件:


    org.apache.maven.plugins
    maven-shade-plugin
    2.3
    
      
        package
        
          shade
        
        
          
            
          
        
      
    
  

3、Wrong FS: hdfs… expected: file:///

Configuration configuration = new Configuration();
FileSystem fs = FileSystem.get(new URI("hdfs://my-master:8020"), configuration);
Path filePath = new Path();
FSDataInputStream fsDataInputStream = fs.open(filePath);
BufferedReader br = new BufferedReader(new InputStreamReader(fsDataInputStream));

此處參考:
https://stackoverflow.com/que...
https://stackoverflow.com/que...
https://stackoverflow.com/que...

補充:關于DAG的一些概念和零散資料

RDD DAG(有向無環圖,Directed Acyclic Graph):每一個操作生成一個rdd,rdd之間連一條邊,最后這些rdd和他們之間的邊組成一個有向無環圖,就是這個dag。不只是spark,現在很多計算引擎都是dag模型的.(有向指的是 RDD 之間的依賴關系,無環是因為 RDD 中數據是不可變的)
在Spark作業調度系統中,調度的前提是判斷多個作業任務的依賴關系,這些作業任務之間可能存在因果的依賴關系,也就是說有些任務必須先獲得執行,然后相關的依賴任務才能執行,但是任務之間顯然不應出現任何直接或間接的循環依賴關系,所以本質上這種關系適合用DAG表示。
DAGscheduler簡單來說就是負責任務的邏輯調度,負責將作業拆分成不同階段的具有依賴關系的多批任務。DAGscheduler最重要的任務之一就是計算作業和任務的依賴關系,制定調度邏輯。

spark中rdd經過若干次transform操作,由于transform操作是lazy的,因此,當rdd進行action操作時,rdd間的轉換關系也會被提交上去,得到rdd內部的依賴關系,進而根據依賴,劃分出不同的stage。
DAG是有向無環圖,一般用來描述任務之間的先后關系,spark中的DAG就是rdd內部的轉換關系,這些轉換關系會被轉換成依賴關系,進而被劃分成不同階段,從而描繪出任務的先后順序。

有向無環圖(Directed Acyclic Graph, DAG)是有向圖的一種,字面意思的理解就是圖中沒有環。常常被用來表示事件之間的驅動依賴關系,管理任務之間的調度。

在圖論中,如果一個有向圖無法從任意頂點出發經過若干條邊回到該點,則這個圖是一個有向無環圖(DAG圖)。
因為有向圖中一個點經過兩種路線到達另一個點未必形成環,因此有向無環圖未必能轉化成樹,但任何有向樹均為有向無環圖。

拓撲排序是對DAG的頂點進行排序,使得對每一條有向邊(u, v),均有u(在排序記錄中)比v先出現。亦可理解為對某點v而言,只有當v的所有源點均出現了,v才能出現。

下圖給出的頂點排序不是拓撲排序,因為頂點D的鄰接點E比其先出現:

DAG可用于對數學和 計算機科學中得一些不同種類的結構進行建模。
由于受制于某些任務必須比另一些任務較早執行的限制,必須排序為一個隊 列的任務集合可以由一個DAG圖來呈現,其中每個頂點表示一個任務,每條邊表示一種限制約束,拓撲排序算法可以用來生成一個有效的序列。
DAG也可以用來模擬信息沿著一個一 致性的方向通過處理器網絡的過程。
DAG中得可達性關系構成了一個局 部順序,任何有限的局部順序可以由DAG使用可達性來呈現。
http://www.cnblogs.com/en-hen...

Spark Streaming 的 模塊 1 DAG 靜態定義 要解決的問題就是如何把計算邏輯描述為一個 RDD DAG 的“模板”,在后面 Job 動態生成的時候,針對每個 batch,都將根據這個“模板”生成一個 RDD DAG 的實例。

文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。

轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/67399.html

相關文章

  • Spark Streaming 到 Apache Flink : 實時數據流在愛奇藝的演進

    摘要:在移動端,愛奇藝月度總有效時長億小時,穩居中國榜第三名。愛奇藝的峰值事件數達到萬秒,在正確性容錯性能延遲吞吐量擴展性等方面均遇到不小的挑戰。從到愛奇藝主要使用的是和來進行流式計算。作者:陳越晨 整理:劉河 本文將為大家介紹Apache Flink在愛奇藝的生產與實踐過程。你可以借此了解到愛奇藝引入Apache Flink的背景與挑戰,以及平臺構建化流程。主要內容如下: 愛奇藝在實時計算方...

    econi 評論0 收藏0
  • Spark 快速入門

    摘要:數據科學任務主要是數據分析領域,數據科學家要負責分析數據并建模,具備統計預測建模機器學習等方面的經驗,以及一定的使用或語言進行編程的能力。監控運行時性能指標信息。 Spark Spark 背景 什么是 Spark 官網:http://spark.apache.org Spark是一種快速、通用、可擴展的大數據分析引擎,2009年誕生于加州大學伯克利分校AMPLab,2010年開源,20...

    wangshijun 評論0 收藏0

發表評論

0條評論

stormzhang

|高級講師

TA的文章

閱讀更多
最新活動
閱讀需要支付1元查看
<