之前項目一直用的Flink-1.72版本,大多數(shù)用的流api進行開發(fā)的需求,現(xiàn)在掃描漏洞的時候必須升級到Flink-1.12.0或Flink-1.11.3,所以直接升級到Flink-1.12.0,發(fā)現(xiàn)之前用的api(assignTimestampsAndWatermarks)被設(shè)置為廢棄了。
先來看看項目之前用的:
后來查資料發(fā)現(xiàn)Flink在1.11版本中為了實現(xiàn)水印的通用以及方便,對水印進行了重構(gòu)。
新版本的Flink在類classDataStream
WatermarkStrategy接口繼承了接口TimestampAssignerSupplier
先看一下interfaceTimestampAssignerSupplier
是創(chuàng)建一個TimestampAssigner
有一個longextractTimestamp方法,作用是從Flink消費的記錄中抽取時間,既可以理解為我們?nèi)绻ㄟ^業(yè)務(wù)時間進行統(tǒng)計時,需要通過該方法對來提取記錄的業(yè)務(wù)時間。所以用到業(yè)務(wù)時間的話,一定要根據(jù)自己的業(yè)務(wù)場景對該方法進行具體的實現(xiàn)。否則Flink會提供一個默認的實現(xiàn)RecordTimestampAssigner<>()
而默認實現(xiàn)的內(nèi)容也十分簡單,一起看一下,必須是記錄中已經(jīng)注冊了時間屬性。
接下來interfaceWatermarkGeneratorSupplier
是返回一個WatermarkGenerator
提供了兩個水印發(fā)送的方式,接下來對這兩個方式進行說明:
onEvent:每條記錄進來都會調(diào)用一次這個方法,入?yún)⒂?個,第一個是記錄,第二個是記錄攜帶的時間,如果注冊了時間就會有,第三個參數(shù)時水印發(fā)射器WatermarkOutputoutput,可以通過這個參數(shù)對水印進行發(fā)射,用戶可以根據(jù)自己的業(yè)務(wù)場景來編寫自己的水印生成以及發(fā)射邏輯。該方法的重點是每條記錄都會調(diào)用.
onPeriodicEmit: 該方法是Flink提供的一個定時器方法,每隔一段時間會調(diào)用此方法,入?yún)⑹荳atermarkOutputoutput,用戶可以通過這個方法每隔一段時間發(fā)送一次水印,當記錄數(shù)過多時,每條記錄都發(fā)送一次水印明顯不合適,也影響性能,此時可以通過這個方法進行水印的定時發(fā)送,而onEvent只記錄當前水印而選擇不發(fā)射出去。該方法的參數(shù)配置為env.getConfig().setAutoWatermarkInterval(300L),入?yún)⑹呛撩霐?shù),表示隔多少毫秒向下游算子發(fā)送一次水印。
而WatermarkStrategy中也提供了一些常用的WatermarkGenerator
BoundedOutOfOrdernessWatermarks
使用方法也十分的簡單,提供的是一個靜態(tài)方法,只需直接調(diào)用即可
WatermarkStrategy.
最后結(jié)合項目的需求將原來的使用水印的地方改成如下了
類圖及FLINK水印算子簡要流程
先上類圖,方便理解
接著簡單介紹下流程
首先TimestampsAndWatermarksOperator算子會在open方法中初始化用戶定義的水印邏輯及方式,并且如果需要定時發(fā)送水印會,注冊一個定時器觸發(fā)水印定時發(fā)送。
當元素到達算子后會調(diào)用processElement(StreamRecord
方法很簡單,如果元素已經(jīng)被注冊了時間,就直接獲取時間,或者設(shè)置為LONG.MIN_VALUE,然后根據(jù)用戶定義的timestampAssigner.extractTimestamp從記錄中抽取時間屬性,然后再將時間寫入元素中,最后調(diào)用用戶定義的watermarkGenerator.onEvent方法,根據(jù)用戶的邏輯選擇刷新水印以及是否發(fā)射水印。
上面初始化中提到了,如果需要定時發(fā)送水印,則會注冊一個定時器,而定時器的方法如下
通過onProcessingTime來觸發(fā)定時器的內(nèi)容,而內(nèi)容也十分簡單,先調(diào)用用戶定義的watermarkGenerator.onPeriodicEmit方法發(fā)送水印,然后獲取當前時間,最后注冊當前時間加水印定時發(fā)送間隔的定時觸發(fā)器,等待下次觸發(fā)該方法。
參考資料
https://zhuanlan.zhihu.com/p/158951593
https://blog.csdn.net/zhaoyuqiang/article/details/107453466
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://specialneedsforspecialkids.com/yun/130018.html
摘要:由于配置流是從關(guān)系型數(shù)據(jù)庫中讀取,速度較慢,導致實時數(shù)據(jù)流流入數(shù)據(jù)的時候,配置信息還未發(fā)送,這樣會導致有些實時數(shù)據(jù)讀取不到配置信息。從數(shù)據(jù)庫中解析出來,再去統(tǒng)計近兩周占比。 Flink 學習 https://github.com/zhisheng17/flink-learning 麻煩路過的各位親給這個項目點個 star,太不易了,寫了這么多,算是對我堅持下來的一種鼓勵吧! showI...
摘要:在這種情況下,清除僅指窗口中的數(shù)據(jù)元,而不是窗口元數(shù)據(jù)。紫色圓圈表示流的數(shù)據(jù)元,這些數(shù)據(jù)元由某個鍵在這種情況下是用戶,用戶和用戶劃分。 0 相關(guān)源碼 掌握Flink中三種常用的Time處理方式,掌握Flink中滾動窗口以及滑動窗口的使用,了解Flink中的watermark。 Flink 在流處理工程中支持不同的時間概念。 1 處理時間(Processing time) 執(zhí)行相應算子...
摘要:另外,將機制發(fā)揚光大,對有著非常好的支持。系統(tǒng)也注意到并討論了和的問題。總結(jié)本文分享了四本相關(guān)的書籍和一份領(lǐng)域相關(guān)的論文列表篇,涉及的設(shè)計,實現(xiàn),故障恢復,彈性擴展等各方面。 前言 之前也分享了不少自己的文章,但是對于 Flink 來說,還是有不少新入門的朋友,這里給大家分享點 Flink 相關(guān)的資料(國外數(shù)據(jù) pdf 和流處理相關(guān)的 Paper),期望可以幫你更好的理解 Flink。...
摘要:每小時窗口將包括在系統(tǒng)時鐘指示整個小時之間到達特定操作的所有事件。平行流中的水印水印是在源函數(shù)處生成的,或直接在源函數(shù)之后生成的。源函數(shù)的每個并行子任務(wù)通常獨立生成其水印。由于其輸入流更新其事件時間,因此操作員也是如此。 showImg(https://segmentfault.com/img/remote/1460000017877320?w=1280&h=857); 前言 Flin...
摘要:由于配置流是從關(guān)系型數(shù)據(jù)庫中讀取,速度較慢,導致實時數(shù)據(jù)流流入數(shù)據(jù)的時候,配置信息還未發(fā)送,這樣會導致有些實時數(shù)據(jù)讀取不到配置信息。從數(shù)據(jù)庫中解析出來,再去統(tǒng)計近兩周占比。 showImg(https://segmentfault.com/img/remote/1460000019367651); Flink 學習項目代碼 https://github.com/zhisheng17/f...
閱讀 1346·2023-01-11 13:20
閱讀 1684·2023-01-11 13:20
閱讀 1132·2023-01-11 13:20
閱讀 1858·2023-01-11 13:20
閱讀 4100·2023-01-11 13:20
閱讀 2704·2023-01-11 13:20
閱讀 1385·2023-01-11 13:20
閱讀 3597·2023-01-11 13:20