摘要:指定了該迭代器返回元素的類型。這可能導致節點故障后的恢復速度較慢,因為該作業將從最后一個檢查點恢復讀取。監聽的端口過來的數據這個在從到學習上搭建環境并構建運行簡單程序入門文章里用的就是基于的程序。取消一個,也即將中的循環元素的行為終止。
前言
Data Sources 是什么呢?就字面意思其實就可以知道:數據來源。
Flink 做為一款流式計算框架,它可用來做批處理,即處理靜態的數據集、歷史的數據集;也可以用來做流處理,即實時的處理些實時數據流,實時的產生數據流結果,只要數據源源不斷的過來,Flink 就能夠一直計算下去,這個 Data Sources 就是數據的來源地。
Flink 中你可以使用 StreamExecutionEnvironment.addSource(sourceFunction) 來為你的程序添加數據來源。
Flink 已經提供了若干實現好了的 source functions,當然你也可以通過實現 SourceFunction 來自定義非并行的 source 或者實現 ParallelSourceFunction 接口或者擴展 RichParallelSourceFunction 來自定義并行的 source,
FlinkStreamExecutionEnvironment 中可以使用以下幾個已實現的 stream sources,
總的來說可以分為下面幾大類:
基于集合1、fromCollection(Collection) - 從 Java 的 Java.util.Collection 創建數據流。集合中的所有元素類型必須相同。
2、fromCollection(Iterator, Class) - 從一個迭代器中創建數據流。Class 指定了該迭代器返回元素的類型。
3、fromElements(T ...) - 從給定的對象序列中創建數據流。所有對象類型必須相同。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreaminput = env.fromElements( new Event(1, "barfoo", 1.0), new Event(2, "start", 2.0), new Event(3, "foobar", 3.0), ... );
4、fromParallelCollection(SplittableIterator, Class) - 從一個迭代器中創建并行數據流。Class 指定了該迭代器返回元素的類型。
5、generateSequence(from, to) - 創建一個生成指定區間范圍內的數字序列的并行數據流。
基于文件1、readTextFile(path) - 讀取文本文件,即符合 TextInputFormat 規范的文件,并將其作為字符串返回。
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamtext = env.readTextFile("file:///path/to/file");
2、readFile(fileInputFormat, path) - 根據指定的文件輸入格式讀取文件(一次)。
3、readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo) - 這是上面兩個方法內部調用的方法。它根據給定的 fileInputFormat 和讀取路徑讀取文件。根據提供的 watchType,這個 source 可以定期(每隔 interval 毫秒)監測給定路徑的新數據(FileProcessingMode.PROCESS_CONTINUOUSLY),或者處理一次路徑對應文件的數據并退出(FileProcessingMode.PROCESS_ONCE)。你可以通過 pathFilter 進一步排除掉需要處理的文件。
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamstream = env.readFile( myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100, FilePathFilter.createDefaultFilter(), typeInfo);
實現:
在具體實現上,Flink 把文件讀取過程分為兩個子任務,即目錄監控和數據讀取。每個子任務都由多帶帶的實體實現。目錄監控由單個非并行(并行度為1)的任務執行,而數據讀取由并行運行的多個任務執行。后者的并行性等于作業的并行性。單個目錄監控任務的作用是掃描目錄(根據 watchType 定期掃描或僅掃描一次),查找要處理的文件并把文件分割成切分片(splits),然后將這些切分片分配給下游 reader。reader 負責讀取數據。每個切分片只能由一個 reader 讀取,但一個 reader 可以逐個讀取多個切分片。
重要注意:
如果 watchType 設置為 FileProcessingMode.PROCESS_CONTINUOUSLY,則當文件被修改時,其內容將被重新處理。這會打破“exactly-once”語義,因為在文件末尾附加數據將導致其所有內容被重新處理。
如果 watchType 設置為 FileProcessingMode.PROCESS_ONCE,則 source 僅掃描路徑一次然后退出,而不等待 reader 完成文件內容的讀取。當然 reader 會繼續閱讀,直到讀取所有的文件內容。關閉 source 后就不會再有檢查點。這可能導致節點故障后的恢復速度較慢,因為該作業將從最后一個檢查點恢復讀取。
基于 Socket:socketTextStream(String hostname, int port) - 從 socket 讀取。元素可以用分隔符切分。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream> dataStream = env .socketTextStream("localhost", 9999) // 監聽 localhost 的 9999 端口過來的數據 .flatMap(new Splitter()) .keyBy(0) .timeWindow(Time.seconds(5)) .sum(1);
這個在 《從0到1學習Flink》—— Mac 上搭建 Flink 1.6.0 環境并構建運行簡單程序入門 文章里用的就是基于 Socket 的 Word Count 程序。
自定義:addSource - 添加一個新的 source function。例如,你可以 addSource(new FlinkKafkaConsumer011<>(...)) 以從 Apache Kafka 讀取數據
說下上面幾種的特點吧:
1、基于集合:有界數據集,更偏向于本地測試用
2、基于文件:適合監聽文件修改并讀取其內容
3、基于 Socket:監聽主機的 host port,從 Socket 中獲取數據
4、自定義 addSource:大多數的場景數據都是無界的,會源源不斷的過來。比如去消費 Kafka 某個 topic 上的數據,這時候就需要用到這個 addSource,可能因為用的比較多的原因吧,Flink 直接提供了 FlinkKafkaConsumer011 等類可供你直接使用。你可以去看看 FlinkKafkaConsumerBase 這個基礎類,它是 Flink Kafka 消費的最根本的類。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreaminput = env .addSource( new FlinkKafkaConsumer011<>( parameterTool.getRequired("input-topic"), //從參數中獲取傳進來的 topic new KafkaEventSchema(), parameterTool.getProperties()) .assignTimestampsAndWatermarks(new CustomWatermarkExtractor()));
Flink 目前支持如下圖里面常見的 Source:
如果你想自己自定義自己的 Source 呢?
那么你就需要去了解一下 SourceFunction 接口了,它是所有 stream source 的根接口,它繼承自一個標記接口(空接口)Function。
SourceFunction 定義了兩個接口方法:
1、run : 啟動一個 source,即對接一個外部數據源然后 emit 元素形成 stream(大部分情況下會通過在該方法里運行一個 while 循環的形式來產生 stream)。
2、cancel : 取消一個 source,也即將 run 中的循環 emit 元素的行為終止。
正常情況下,一個 SourceFunction 實現這兩個接口方法就可以了。其實這兩個接口方法也固定了一種實現模板。
比如,實現一個 XXXSourceFunction,那么大致的模板是這樣的:(直接拿 FLink 源碼的實例給你看看)
最后本文主要講了下 Flink 的常見 Source 有哪些并且簡單的提了下如何自定義 Source。
關注我轉載請務必注明原創地址為:http://www.54tianzhisheng.cn/2018/10/28/flink-sources/
另外我自己整理了些 Flink 的學習資料,目前已經全部放到微信公眾號了。你可以加我的微信:zhisheng_tian,然后回復關鍵字:Flink 即可無條件獲取到。
相關文章1、《從0到1學習Flink》—— Apache Flink 介紹
2、《從0到1學習Flink》—— Mac 上搭建 Flink 1.6.0 環境并構建運行簡單程序入門
3、《從0到1學習Flink》—— Flink 配置文件詳解
4、《從0到1學習Flink》—— Data Source 介紹
5、《從0到1學習Flink》—— 如何自定義 Data Source ?
6、《從0到1學習Flink》—— Data Sink 介紹
7、《從0到1學習Flink》—— 如何自定義 Data Sink ?
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/72072.html
摘要:從上圖可以看到接口有方法,它有一個抽象類。上面的那些自帶的可以看到都是繼承了抽象類,實現了其中的方法,那么我們要是自己定義自己的的話其實也是要按照這個套路來做的。 showImg(https://segmentfault.com/img/remote/1460000016956595); 前言 再上一篇文章中 《從0到1學習Flink》—— Data Source 介紹 講解了 Fli...
摘要:這些切片稱為窗口。函數允許對常規數據流進行分組。通常,這是非并行數據轉換,因為它在非分區數據流上運行。 showImg(https://segmentfault.com/img/remote/1460000017874226?w=1920&h=1271); 前言 在第一篇介紹 Flink 的文章 《《從0到1學習Flink》—— Apache Flink 介紹》 中就說過 Flink ...
閱讀 1297·2021-11-22 09:34
閱讀 2162·2021-10-08 10:18
閱讀 1724·2021-09-29 09:35
閱讀 2453·2019-08-29 17:20
閱讀 2137·2019-08-29 15:36
閱讀 3398·2019-08-29 13:52
閱讀 775·2019-08-29 12:29
閱讀 1183·2019-08-28 18:10