摘要:需要注意的是和方法生成的觸發器是連續的而不是一次性的。其他的還有一次性觸發器將一次性觸發器變為連續型觸發器,觸發后再次等待觸發。例如與一起用可以實現每個數據到達后的分鐘進行處理,經常用于全局窗口,可以用觸發器來設置停止條件。
本文參考Apache Beam官方編程手冊
可以結合官方的Mobile Game 代碼閱讀本文。
在默認情況下,Apache Beam是不分窗的,也就是采用GlobalWindow,而如果同時也不設置自定義的觸發器,那么Beam會在所有數據都收集到之后才開始對數據進行處理。這通常只能適用于有限數據且對實時性要求不高的情況。當輸入為無限流數據,我們可以
1)設置合適的窗口大小(根據時間戳),在窗口末端進行數據處理;
2)設置觸發器,當條件滿足時觸發,進行數據處理;
3)同時設置窗口和觸發器。
時間戳說明:Beam的數據都是保存在PCollection中。當讀入數據時,PCollection為每個元素都自動生成一個內置的時間戳,對于無限輸入,數據的時間戳不同。而對于有限輸入,由于是同時讀入,所有的元素的時間戳都是一樣的,這時候分窗是沒有意義的(都在一個窗口)。而我們可以手動為每個元素設置時間戳,通常采用數據中已有的時間屬性(比如日志中一般都會帶有事件時間)。可以在DoFn中為數據帶上時間戳,如:
@ProcessElement public void processElement(ProcessContext c) { c.outputWithTimestamp(c.element(), new Instant(XXX)); }窗口類型:
1)全局窗口
就是默認不分窗的情況。
apply(Windows.
2)固定時間大小窗口
最常見的分窗方式,按照時間戳把數據處理窗口分為固定長度。
apply(Windows.
3)滑動窗口
需要設置2個參數,窗口大小和窗口產生周期。窗口之間有重疊,通常用于計算平均數的情況(暫沒用過)
4)會話窗口
一般用于相同key數據聚合,同一個key的數據之間時間間隔較大的會被分到不同的窗口。
**
水位線和超時數據:**
當使用用戶自定義的時間戳時,先處理的數據并不總是時間戳較小的,有可能出現時間戳小的數據在后面才產生的情況。Beam通常會給窗口設定一個處理期限時間(圖中縱軸),當超過這個時間的數據被視為超時數據,而這些期限時間的連線即水位線。
系統會根據實際情況進行預測生成水位線,在默認情況下不對超時數據進行處理,而我們可以通過設置觸發器對超時數據進行額外處理。
觸發器種類1)時間時間觸發器
根據時間戳進行觸發。
.triggering(AfterWatermark.pastEndOfWindow()//水位線到達時觸發一次 .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane() .plusDelayOf(FIVE_MINUTES))//水位線之前,每次觸發后第一個數據來到之后的5分鐘時再觸發 .withLateFirings(AfterProcessingTime.pastFirstElementInPane() .plusDelayOf(TEN_MINUTES)))//水位線之后,每次觸發后第一個數據來到之后的10分鐘時再觸發
以上分別對水位線上中下的3種數據進行不同的處理。需要注意的是withEarlyFirings和withLateFirings方法生成的觸發器是連續的而不是一次性的。
2)處理時間觸發器
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(FIVE_MINUTES)
如方法的字面意思,僅在第一個數據到達后的5分鐘時觸發一次。
3)數據驅動型觸發器
AfterPane.elementCountAtleast(XX)
當處理到XX個時觸發一次。需要注意的是當數據個數小于XX時永遠不會觸發數據處理。
4)混合觸發器
將多個觸發器混合起來,比如1)中的代碼就是3個觸發器混合。其他的還有
①Repeatedly.forever(一次性觸發器)
將一次性觸發器變為連續型觸發器,觸發后再次等待觸發。例如與AfterProcessingTime.pastFirstElementInPane().plusDelayOf(FIVE_MINUTES)一起用可以實現每個數據到達后的5分鐘進行處理,經常用于全局窗口,可以用orFinally(觸發器)來設置停止條件。
②AfterEach.inOrder(觸發器1,觸發器2...)
當觸發器1滿足后等待觸發器2...知道所有觸發器滿足后開始數據處理。
③AfterFirst(觸發器1,觸發器2..)和AfterAll(觸發器1,觸發器2..)
這2個分別為或,與的邏輯。
④orFinally
見①
Accumulating Mode:
If our trigger is set to .accumulatingFiredPanes, the trigger emits the following values each time it fires. Keep in mind that the trigger fires every time three elements arrive:
First trigger firing: [5, 8, 3] Second trigger firing: [5, 8, 3, 15, 19, 23] Third trigger firing: [5, 8, 3, 15, 19, 23, 9, 13, 10]
Discarding Mode:
If our trigger is set to .discardingFiredPanes, the trigger emits the following values on each firing:
First trigger firing: [5, 8, 3] Second trigger firing: [15, 19, 23] Third trigger firing: [9, 13, 10]超時數據處理
.withAllowedLateness(Duration.XXXX(XXX))
可設置允許超時多長時間的數據。
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/67679.html
摘要:與用于與的轉換。其中方法返回的是在中的位置下標。對于設置了多個觸發器的,自動選擇最后一個觸發的結算結果。其他不是線程安全的,一般建議處理方法是冪等的。 Combine與GroupByKey GroupByKey是把相關key的元素聚合到一起,通常是形成一個Iterable的value,如: cat, [1,5,9] dog, [5,2] and, [1,2,6] Combine是對聚...
摘要:最近在用做流上的異常檢測,期間遇到了很多問題,但是發現網上相關的資料很少,基本只能自己啃文檔和瞎嘗試。其中如有錯漏,歡迎指出。即從一條數據中獲得時間戳,然后以的格式返回。丟棄掉中的附加信息使用這一設置時,得到的中的元素是的和組成的鍵值對。 最近在用Apache beam做流上的異常檢測,期間遇到了很多問題,但是發現網上相關的資料很少,基本只能自己啃文檔和瞎嘗試。所以想把自己踩過的坑記錄...
摘要:一直接訪問引入的相關包使用代替給指定配置與訪問本地文件一樣訪問文件實際測試中發現本地如能夠成功讀寫,但是集群模式下如讀寫失敗,原因未知。二通過訪問除了直接讀寫的數據,還可以通過來進行讀寫。 一、直接訪問 1.引入HDFS的相關jar包: org.apache.beam beam-sdks-java-io-hadoop-file-system 2.1.0...
摘要:要說在中常見的函數是哪一個,當然是。是一個實現了接口的抽象類,其中是數據處理方法,強制子類必須實現。以上為學習一天的總結,有錯誤歡迎指正。相同的是這個方法處理的都是中的一個元素。 在閱讀本文前,可先看一下官方的WordCount代碼, 對Apache Beam有大概的了解。 要說在Apache Beam中常見的函數是哪一個,當然是apply()。常見的寫法如下: [Final Outp...
摘要:主頁暫時下線社區暫時下線知識庫自媒體平臺微博知乎簡書博客園我們不是的官方組織機構團體,只是技術棧以及的愛好者合作侵權,請聯系請抄送一份到基礎編程思想和大數據中文文檔中文文檔中文文檔中文文檔中文文檔中文文檔中文文檔中文文檔中文文檔中文文檔區塊 【主頁】 apachecn.org 【Github】@ApacheCN 暫時下線: 社區 暫時下線: cwiki 知識庫 自媒體平臺 ...
閱讀 1225·2021-11-11 16:54
閱讀 877·2021-10-19 11:44
閱讀 1337·2021-09-22 15:18
閱讀 2445·2019-08-29 16:26
閱讀 2946·2019-08-29 13:57
閱讀 3094·2019-08-26 13:32
閱讀 1081·2019-08-26 11:58
閱讀 2328·2019-08-26 10:37