国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專欄INFORMATION COLUMN

入門教程 | 5分鐘從零構建第一個 Flink 應用

Mike617 / 713人閱讀

摘要:接著我們將數據流按照單詞字段即號索引字段做分組,這里可以簡單地使用方法,得到一個以單詞為的數據流。得到的結果數據流,將每秒輸出一次這秒內每個單詞出現的次數。最后一件事就是將數據流打印到控制臺,并開始執行最后的調用是啟動實際作業所必需的。

本文轉載自 Jark’s Blog ,作者伍翀(云邪),Apache Flink Committer,阿里巴巴高級開發工程師。 本文將從開發環境準備、創建 Maven 項目,編寫 Flink 程序、運行程序等方面講述如何迅速搭建第一個 Flink 應用。 在本文中,我們將從零開始,教您如何構建第一個 Flink 應用程序。

開發環境準備

Flink 可以運行在 Linux, Max OS X, 或者是 Windows 上。為了開發 Flink 應用程序,在本地機器上需要有 Java 8.xmaven 環境。

如果有 Java 8 環境,運行下面的命令會輸出如下版本信息:

$ java -version
java version "1.8.0_65"
Java(TM) SE Runtime Environment (build 1.8.0_65-b17)
Java HotSpot(TM) 64-Bit Server VM (build 25.65-b01, mixed mode)
如果有 maven 環境,運行下面的命令會輸出如下版本信息:

$ mvn -version
Apache Maven 3.5.4 (1edded0938998edf8bf061f1ceb3cfdeccf443fe; 2018-06-18T02:33:14+08:00)
Maven home: /Users/wuchong/dev/maven
Java version: 1.8.0_65, vendor: Oracle Corporation, runtime: /Library/Java/JavaVirtualMachines/jdk1.8.0_65.jdk/Contents/Home/jre
Default locale: zh_CN, platform encoding: UTF-8
OS name: "mac os x", version: "10.13.6", arch: "x86_64", family: "mac"
另外我們推薦使用 ItelliJ IDEA (社區免費版已夠用)作為 Flink 應用程序的開發 IDE。Eclipse 雖然也可以,但是 Eclipse 在 Scala 和 Java 混合型項目下會有些已知問題,所以不太推薦 Eclipse。下一章節,我們會介紹如何創建一個 Flink 工程并將其導入 ItelliJ IDEA。
創建 Maven 項目

我們將使用 Flink Maven Archetype 來創建我們的項目結構和一些初始的默認依賴。在你的工作目錄下,運行如下命令來創建項目:

mvn archetype:generate 
    -DarchetypeGroupId=org.apache.flink 
    -DarchetypeArtifactId=flink-quickstart-java 
    -DarchetypeVersion=1.6.1 
    -DgroupId=my-flink-project 
    -DartifactId=my-flink-project 
    -Dversion=0.1 
    -Dpackage=myflink 
    -DinteractiveMode=false

你可以編輯上面的 groupId, artifactId, package 成你喜歡的路徑。使用上面的參數,Maven 將自動為你創建如下所示的項目結構:

$ tree my-flink-project
my-flink-project
├── pom.xml
└── src
    └── main
        ├── java
        │   └── myflink
        │       ├── BatchJob.java
        │       └── StreamingJob.java
        └── resources
            └── log4j.properties

我們的 pom.xml 文件已經包含了所需的 Flink 依賴,并且在 src/main/java 下有幾個示例程序框架。接下來我們將開始編寫第一個 Flink 程序。

編寫 Flink 程序

啟動 IntelliJ IDEA,選擇 "Import Project"(導入項目),選擇 my-flink-project 根目錄下的 pom.xml。根據引導,完成項目導入。

在 src/main/java/myflink 下創建 SocketWindowWordCount.java 文件:

package myflink;

public class SocketWindowWordCount {

  public static void main(String[] args) throws Exception {

  }
}

現在這程序還很基礎,我們會一步步往里面填代碼。注意下文中我們不會將 import 語句也寫出來,因為 IDE會自動將他們添加上去。在本節末尾,我會將完整的代碼展示出來,如果你想跳過下面的步驟,可以直接將最后的完整代碼粘到編輯器中。

Flink 程序的第一步是創建一個 StreamExecutionEnvironment 。這是一個入口類,可以用來設置參數和創建數據源以及提交任務。所以讓我們把它添加到 main 函數中:

StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();

下一步我們將創建一個從本地端口號 9000 的 socket 中讀取數據的數據源:

DataStream text = env.socketTextStream("localhost", 9000, "
");

這創建了一個字符串類型的 DataStream。DataStream 是 Flink 中做流處理的核心 API,上面定義了非常多常見的操作(如,過濾、轉換、聚合、窗口、關聯等)。在本示例中,我們感興趣的是每個單詞在特定時間窗口中出現的次數,比如說5秒窗口。為此,我們首先要將字符串數據解析成單詞和次數(使用Tuple2表示),第一個字段是單詞,第二個字段是次數,次數初始值都設置成了1。我們實現了一個flatmap,因為一行數據中可能有多個單詞。

DataStream> wordCounts = text
        .flatMap(new FlatMapFunction>() {
          @Override
          public void flatMap(String value, Collector> out) {
            for (String word : value.split("s")) {
              out.collect(Tuple2.of(word, 1));
            }
          }
        });

接著我們將數據流按照單詞字段(即0號索引字段)做分組,這里可以簡單地使用 keyBy(int index)方法,得到一個以單詞為 key 的Tuple2數據流。然后我們可以在流上指定想要的窗口,并根據窗口中的數據計算結果。在我們的例子中,我們想要每5秒聚合一次單詞數,每個窗口都是從零開始統計的。

DataStream> windowCounts = wordCounts
        .keyBy(0)
        .timeWindow(Time.seconds(5))
        .sum(1);

第二個調用的 .timeWindow()指定我們想要5秒的翻滾窗口(Tumble)。第三個調用為每個key每個窗口指定了sum聚合函數,在我們的例子中是按照次數字段(即1號索引字段)相加。得到的結果數據流,將每5秒輸出一次這5秒內每個單詞出現的次數。

最后一件事就是將數據流打印到控制臺,并開始執行:

windowCounts.print().setParallelism(1);
env.execute("Socket Window WordCount");

最后的 env.execute調用是啟動實際Flink作業所必需的。所有算子操作(例如創建源、聚合、打印)只是構建了內部算子操作的圖形。只有在execute()被調用時才會在提交到集群上或本地計算機上執行。

下面是完整的代碼,部分代碼經過簡化(代碼在 GitHub 上也能訪問到):

package myflink;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

public class SocketWindowWordCount {

  public static void main(String[] args) throws Exception {

    // 創建 execution environment
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    // 通過連接 socket 獲取輸入數據,這里連接到本地9000端口,如果9000端口已被占用,請換一個端口
    DataStream text = env.socketTextStream("localhost", 9000, "
");

    // 解析數據,按 word 分組,開窗,聚合
    DataStream> windowCounts = text
        .flatMap(new FlatMapFunction>() {
          @Override
          public void flatMap(String value, Collector> out) {
            for (String word : value.split("s")) {
              out.collect(Tuple2.of(word, 1));
            }
          }
        })
        .keyBy(0)
        .timeWindow(Time.seconds(5))
        .sum(1);

    // 將結果打印到控制臺,注意這里使用的是單線程打印,而非多線程
    windowCounts.print().setParallelism(1);

    env.execute("Socket Window WordCount");
  }
}
運行程序

要運行示例程序,首先我們在終端啟動 netcat 獲得輸入流:

nc -lk 9000

如果是 Windows 平臺,可以通過 nmap.org/ncat/ 安裝 ncat 然后運行:

ncat -lk 9000

然后直接運行SocketWindowWordCount的 main 方法。

只需要在 netcat 控制臺輸入單詞,就能在 SocketWindowWordCount 的輸出控制臺看到每個單詞的詞頻統計。如果想看到大于1的計數,請在5秒內反復鍵入相同的單詞。

Cheers !

文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。

轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/6850.html

相關文章

  • 《從0到1學習Flink》—— 介紹Flink中的Stream Windows

    摘要:在每個事件上,觸發器都可以決定觸發即清除刪除窗口并丟棄其內容,或者啟動并清除窗口。請注意,指定的觸發器不會添加其他觸發條件,但會替換當前觸發器。結論對于現代流處理器來說,支持連續數據流上的各種類型的窗口是必不可少的。 showImg(https://segmentfault.com/img/remote/1460000017892799?w=1280&h=720); 前言 目前有許多數...

    jifei 評論0 收藏0
  • Flink入門

    摘要:簡介是一個面向分布式數據流處理和批量數據處理的開源計算平臺,提供支持流處理和批處理兩種類型應用的功能。每一個數據流起始于一個或多個,并終止于一個或多個。 Flink簡介 Apache Flink 是一個面向分布式數據流處理和批量數據處理的開源計算平臺,提供支持流處理和批處理兩種類型應用的功能。 Apache Flink的前身是柏林理工大學一個研究性項目,在2014被Apache孵化器...

    余學文 評論0 收藏0

發表評論

0條評論

Mike617

|高級講師

TA的文章

閱讀更多
最新活動
閱讀需要支付1元查看
<