摘要:最近在用做流上的異常檢測,期間遇到了很多問題,但是發現網上相關的資料很少,基本只能自己啃文檔和瞎嘗試。其中如有錯漏,歡迎指出。即從一條數據中獲得時間戳,然后以的格式返回。丟棄掉中的附加信息使用這一設置時,得到的中的元素是的和組成的鍵值對。
最近在用Apache beam做流上的異常檢測,期間遇到了很多問題,但是發現網上相關的資料很少,基本只能自己啃文檔和瞎嘗試。
所以想把自己踩過的坑記錄下來,希望能對大家有所幫助。
其中如有錯漏,歡迎指出。
顧名思義,是從kafka上讀取數據到beam上或者將beam上的數據寫入到kafka中。官方文檔中沒有直接的教程,要在GitHub上的源碼中找到相關使用說明。
Github上的Kafka源碼
這里僅說明讀數據部分。
maven依賴示例
org.apache.beam beam-sdks-java-io-kafka ...
讀數據示例
PCollection> lines = //這里kV后說明kafka中的key和value均為String類型 p.apply(KafkaIO. read() .withBootstrapServers("hadoop1:9092, hadoop2:9092")//必需,設置kafka的服務器地址和端口 .withTopic("mytopic")//必需,設置要讀取的kafka的topic名稱 .withKeyDeserializer(StringDeserializer.class)//必需 .withValueDeserializer(StringDeserializer.class)//必需 .withMaxNumRecords(301) .withTimestampFn(new MyTimestampFunction()) .updateConsumerProperties(ImmutableMap. of("auto.offset.reset", "earliest")) .withoutMetadata() )
以下分別后面非必需的一些設置
1.設置最大記錄條數
.withMaxNumRecords(301)
通過這個函數,可以設置最大讀取的記錄條數。
2.設置PCollection中元素對應的時間戳
.withTimestampFn(new MyTimestampFunction())
當不進行這個設置的時候,beam會根據當前的系統時間為每個元素分配一個時間戳。
而有的時候,我們希望用kafka的數據中自身帶有的時間戳來作為PCollection中元素的時間戳,從而進行后續的窗口操作。這時就需要通過上面的函數來達到這一目的。
其中MyTimestampFunction()是我們自定義的一個函數,其要實現SerializableFunction
即從一條kafka數據中獲得時間戳,然后以Instant(org.joda.time.Instant)的格式返回。
public class MyTimestampFunction implements SerializableFunction, Instant> { public Instant apply(KV input){ String[] temps = input.getValue().split(","); DateTime t = new DateTime(Long.valueOf(temps[1])); return t.toInstant(); } }
3.設置讀kafka數據的順序
updateConsumerProperties(ImmutableMap.of("auto.offset.reset", "earliest"))
KafkaIO默認的數據讀取順序是從最新的數據開始。當我們開發測試的時候,如果沒有一個生產者同步向kafka生產數據,那么這里就拿不到數據。(在這坑了很久,才發現這個原因...)
當我們想實現類似于kafka shell中的--from-beginning的功能的時候,即從最早的數據開始讀,就需要進行這一設置。
這里不僅可以改變讀取數據的順序,按照類似的方式,還可以進行其他設置。
4.丟棄掉kafka中的附加信息
.withoutMetadata()
使用這一設置時,得到的PCollection中的元素是kafka的key和value組成的鍵值對。
當不使用其時,得到的PCollection中的元素是KafkaRecord。會附件很多元數據。
5.其他設置
// custom function for watermark (default is record timestamp) * .withWatermarkFn(new MyWatermarkFunction()) * * // restrict reader to committed messages on Kafka (see method documentation). * .withReadCommitted() *
在源碼的使用說明中還提到另外的兩個設置,但因為暫時沒用到,這里就暫且省略了。
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/35904.html
摘要:一直接訪問引入的相關包使用代替給指定配置與訪問本地文件一樣訪問文件實際測試中發現本地如能夠成功讀寫,但是集群模式下如讀寫失敗,原因未知。二通過訪問除了直接讀寫的數據,還可以通過來進行讀寫。 一、直接訪問 1.引入HDFS的相關jar包: org.apache.beam beam-sdks-java-io-hadoop-file-system 2.1.0...
摘要:要說在中常見的函數是哪一個,當然是。是一個實現了接口的抽象類,其中是數據處理方法,強制子類必須實現。以上為學習一天的總結,有錯誤歡迎指正。相同的是這個方法處理的都是中的一個元素。 在閱讀本文前,可先看一下官方的WordCount代碼, 對Apache Beam有大概的了解。 要說在Apache Beam中常見的函數是哪一個,當然是apply()。常見的寫法如下: [Final Outp...
摘要:需要注意的是和方法生成的觸發器是連續的而不是一次性的。其他的還有一次性觸發器將一次性觸發器變為連續型觸發器,觸發后再次等待觸發。例如與一起用可以實現每個數據到達后的分鐘進行處理,經常用于全局窗口,可以用觸發器來設置停止條件。 本文參考Apache Beam官方編程手冊 可以結合官方的Mobile Game 代碼閱讀本文。 在默認情況下,Apache Beam是不分窗的,也就是采用Gl...
摘要:與用于與的轉換。其中方法返回的是在中的位置下標。對于設置了多個觸發器的,自動選擇最后一個觸發的結算結果。其他不是線程安全的,一般建議處理方法是冪等的。 Combine與GroupByKey GroupByKey是把相關key的元素聚合到一起,通常是形成一個Iterable的value,如: cat, [1,5,9] dog, [5,2] and, [1,2,6] Combine是對聚...
閱讀 1696·2021-10-09 09:44
閱讀 3262·2021-09-27 13:36
閱讀 1519·2021-09-22 15:33
閱讀 1274·2021-09-22 15:23
閱讀 1159·2021-09-06 15:02
閱讀 1695·2019-08-29 16:14
閱讀 2900·2019-08-29 15:26
閱讀 2407·2019-08-28 18:08