摘要:一直接訪問引入的相關包使用代替給指定配置與訪問本地文件一樣訪問文件實際測試中發現本地如能夠成功讀寫,但是集群模式下如讀寫失敗,原因未知。二通過訪問除了直接讀寫的數據,還可以通過來進行讀寫。
一、直接訪問
1.引入HDFS的相關jar包:
org.apache.beam beam-sdks-java-io-hadoop-file-system 2.1.0
2.使用HadoopFileSystemOptions代替PipelineOptions
public interface WordCountOptions extends HadoopFileSystemOptions { @Description("input file") @Default.String("hdfs://localhost:9000/tmp/words2") String getInputFile(); void setInputFile(String in); @Description("output") @Default.String("hdfs://localhost:9000/tmp/hdfsWordCount") String getOutput(); void setOutput(String out); }
3.給Options指定HDFS配置
Configuration conf=new Configuration(); conf.set("fs.default.name", "hdfs://localhost:9000"); HDFSOption options= PipelineOptionsFactory.fromArgs(args).withValidation() .as(HDFSOption.class); options.setHdfsConfiguration(ImmutableList.of(conf));
4.與訪問本地文件一樣訪問HDFS文件
Pipeline p = Pipeline.create(options); Data = p.apply("Read from HDFS", TextIO.read().from(options.getInputFile()));
實際測試中發現本地runner(如Direct, Flink Local, Spark Local...)能夠成功讀寫HDFS,但是集群模式下(如Flink Cluster, Spark Cluster...)讀寫HDFS失敗,原因未知。
二、通過HBase訪問除了直接讀寫HDFS的數據,還可以通過HBase來進行讀寫。
1.添加相關jar包
org.apache.beam beam-sdks-java-io-hbase ${beam.verson}
2.設置HBase連接信息
Configuration conf = new Configuration(); conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); conf.setStrings("hbase.master.hostname", "localhost"); conf.setStrings("hbase.regionserver.hostname", "localhost");
3.使用上述的conf讀HBase數據
pipe //指定配置和表名 .apply("Read from HBase", HBaseIO.read().withConfiguration(conf).withTableId("test_tb")) .apply(ParDo.of(new DoFn() { @ProcessElement public void processElement(ProcessContext c) { //讀到的數據是HBase API中定義的Result格式,需要按照HBase官方說明進行剝取 Result result = c.element(); String rowkey = Bytes.toString(result.getRow()); System.out.println("row key: "); for(Cell cell : result.listCells()) { System.out.println("qualifier:"+Bytes.toString(CellUtil.cloneQualifier(cell))); System.out.println("value:"+Bytes.toString(CellUtil.cloneValue(cell))); } c.output(rowkey); } }));
4.寫入到HBase
//寫入前需要將string數據封裝為Hbase數據格式mutation .apply(ParDo.of(new DoFn() { @ProcessElement public void processElement(ProcessContext context) { byte[] qual = Bytes.toBytes("qual"); byte[] cf = Bytes.toBytes("cf"); byte[] row = Bytes.toBytes("kafka"); byte[] val = Bytes.toBytes(context.element()); final Charset UTF_8 = Charset.forName("UTF-8"); Mutation mutation = new Put(row).addColumn(cf, qual, val); context.output(mutation); } })) .apply("write to Hbase", HBaseIO.write() .withConfiguration(conf) .withTableId("test_tb"));
經測試,無論本地runner還是集群runner都能成功讀寫。
但是發現一個問題,使用mvn exec:java進行調試成功,而使用shade插件打包成jar運行卻一直報錯,說Mutation沒有指定coder,beam論壇上求助后得到的回復是maven-shade-plugin版本太舊,需要更新到3.0.0以上版本,但我改了3.0的版本之后還是一樣的錯誤。后來添加了ServicesResourceTransformer才解決。
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/67699.html
摘要:最近在用做流上的異常檢測,期間遇到了很多問題,但是發現網上相關的資料很少,基本只能自己啃文檔和瞎嘗試。其中如有錯漏,歡迎指出。即從一條數據中獲得時間戳,然后以的格式返回。丟棄掉中的附加信息使用這一設置時,得到的中的元素是的和組成的鍵值對。 最近在用Apache beam做流上的異常檢測,期間遇到了很多問題,但是發現網上相關的資料很少,基本只能自己啃文檔和瞎嘗試。所以想把自己踩過的坑記錄...
摘要:要說在中常見的函數是哪一個,當然是。是一個實現了接口的抽象類,其中是數據處理方法,強制子類必須實現。以上為學習一天的總結,有錯誤歡迎指正。相同的是這個方法處理的都是中的一個元素。 在閱讀本文前,可先看一下官方的WordCount代碼, 對Apache Beam有大概的了解。 要說在Apache Beam中常見的函數是哪一個,當然是apply()。常見的寫法如下: [Final Outp...
摘要:需要注意的是和方法生成的觸發器是連續的而不是一次性的。其他的還有一次性觸發器將一次性觸發器變為連續型觸發器,觸發后再次等待觸發。例如與一起用可以實現每個數據到達后的分鐘進行處理,經常用于全局窗口,可以用觸發器來設置停止條件。 本文參考Apache Beam官方編程手冊 可以結合官方的Mobile Game 代碼閱讀本文。 在默認情況下,Apache Beam是不分窗的,也就是采用Gl...
摘要:與用于與的轉換。其中方法返回的是在中的位置下標。對于設置了多個觸發器的,自動選擇最后一個觸發的結算結果。其他不是線程安全的,一般建議處理方法是冪等的。 Combine與GroupByKey GroupByKey是把相關key的元素聚合到一起,通常是形成一個Iterable的value,如: cat, [1,5,9] dog, [5,2] and, [1,2,6] Combine是對聚...
閱讀 2856·2021-10-14 09:42
閱讀 3174·2019-08-30 15:52
閱讀 3240·2019-08-30 14:02
閱讀 1102·2019-08-29 15:42
閱讀 529·2019-08-29 13:20
閱讀 1157·2019-08-29 12:24
閱讀 469·2019-08-26 10:20
閱讀 680·2019-08-23 18:31