国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專欄INFORMATION COLUMN

《從0到1學(xué)習(xí)Flink》—— 如何自定義 Data Source ?

songze / 1892人閱讀

摘要:從上面自定義的可以看到我們繼承的就是這個(gè)類,那么來(lái)了解一下一個(gè)抽象類,繼承自。該類的子類有三個(gè),兩個(gè)是抽象類,在此基礎(chǔ)上提供了更具體的實(shí)現(xiàn),另一個(gè)是。

前言

在 《從0到1學(xué)習(xí)Flink》—— Data Source 介紹 文章中,我給大家介紹了 Flink Data Source 以及簡(jiǎn)短的介紹了一下自定義 Data Source,這篇文章更詳細(xì)的介紹下,并寫一個(gè) demo 出來(lái)讓大家理解。

Flink Kafka source 準(zhǔn)備工作

我們先來(lái)看下 Flink 從 Kafka topic 中獲取數(shù)據(jù)的 demo,首先你需要安裝好了 FLink 和 Kafka 。

運(yùn)行啟動(dòng) Flink、Zookepeer、Kafka,

好了,都啟動(dòng)了!

maven 依賴


    org.apache.flink
    flink-java
    ${flink.version}
    provided


    org.apache.flink
    flink-streaming-java_${scala.binary.version}
    ${flink.version}
    provided



    org.slf4j
    slf4j-log4j12
    1.7.7
    runtime


    log4j
    log4j
    1.2.17
    runtime



    org.apache.flink
    flink-connector-kafka-0.11_${scala.binary.version}
    ${flink.version}



    com.alibaba
    fastjson
    1.2.51
測(cè)試發(fā)送數(shù)據(jù)到 kafka topic

實(shí)體類,Metric.java

package com.zhisheng.flink.model;

import java.util.Map;

/**
 * Desc:
 * weixi: zhisheng_tian
 * blog: http://www.54tianzhisheng.cn/
 */
public class Metric {
    public String name;
    public long timestamp;
    public Map fields;
    public Map tags;

    public Metric() {
    }

    public Metric(String name, long timestamp, Map fields, Map tags) {
        this.name = name;
        this.timestamp = timestamp;
        this.fields = fields;
        this.tags = tags;
    }

    @Override
    public String toString() {
        return "Metric{" +
                "name="" + name + """ +
                ", timestamp="" + timestamp + """ +
                ", fields=" + fields +
                ", tags=" + tags +
                "}";
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public long getTimestamp() {
        return timestamp;
    }

    public void setTimestamp(long timestamp) {
        this.timestamp = timestamp;
    }

    public Map getFields() {
        return fields;
    }

    public void setFields(Map fields) {
        this.fields = fields;
    }

    public Map getTags() {
        return tags;
    }

    public void setTags(Map tags) {
        this.tags = tags;
    }
}

往 kafka 中寫數(shù)據(jù)工具類:KafkaUtils.java

import com.alibaba.fastjson.JSON;
import com.zhisheng.flink.model.Metric;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

/**
 * 往kafka中寫數(shù)據(jù)
 * 可以使用這個(gè)main函數(shù)進(jìn)行測(cè)試一下
 * weixin: zhisheng_tian 
 * blog: http://www.54tianzhisheng.cn/
 */
public class KafkaUtils {
    public static final String broker_list = "localhost:9092";
    public static final String topic = "metric";  // kafka topic,F(xiàn)link 程序中需要和這個(gè)統(tǒng)一 

    public static void writeToKafka() throws InterruptedException {
        Properties props = new Properties();
        props.put("bootstrap.servers", broker_list);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //key 序列化
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //value 序列化
        KafkaProducer producer = new KafkaProducer(props);

        Metric metric = new Metric();
        metric.setTimestamp(System.currentTimeMillis());
        metric.setName("mem");
        Map tags = new HashMap<>();
        Map fields = new HashMap<>();

        tags.put("cluster", "zhisheng");
        tags.put("host_ip", "101.147.022.106");

        fields.put("used_percent", 90d);
        fields.put("max", 27244873d);
        fields.put("used", 17244873d);
        fields.put("init", 27244873d);

        metric.setTags(tags);
        metric.setFields(fields);

        ProducerRecord record = new ProducerRecord(topic, null, null, JSON.toJSONString(metric));
        producer.send(record);
        System.out.println("發(fā)送數(shù)據(jù): " + JSON.toJSONString(metric));

        producer.flush();
    }

    public static void main(String[] args) throws InterruptedException {
        while (true) {
            Thread.sleep(300);
            writeToKafka();
        }
    }
}

運(yùn)行:

如果出現(xiàn)如上圖標(biāo)記的,即代表能夠不斷的往 kafka 發(fā)送數(shù)據(jù)的。

Flink 程序

Main.java

package com.zhisheng.flink;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;

import java.util.Properties;

/**
 * Desc:
 * weixi: zhisheng_tian
 * blog: http://www.54tianzhisheng.cn/
 */
public class Main {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("zookeeper.connect", "localhost:2181");
        props.put("group.id", "metric-group");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");  //key 反序列化
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("auto.offset.reset", "latest"); //value 反序列化

        DataStreamSource dataStreamSource = env.addSource(new FlinkKafkaConsumer011<>(
                "metric",  //kafka topic
                new SimpleStringSchema(),  // String 序列化
                props)).setParallelism(1);

        dataStreamSource.print(); //把從 kafka 讀取到的數(shù)據(jù)打印在控制臺(tái)

        env.execute("Flink add data source");
    }
}

運(yùn)行起來(lái):

看到?jīng)]程序,F(xiàn)link 程序控制臺(tái)能夠源源不斷的打印數(shù)據(jù)呢。

自定義 Source

上面就是 Flink 自帶的 Kafka source,那么接下來(lái)就模仿著寫一個(gè)從 MySQL 中讀取數(shù)據(jù)的 Source。

首先 pom.xml 中添加 MySQL 依賴


    mysql
    mysql-connector-java
    5.1.34

數(shù)據(jù)庫(kù)建表如下:

DROP TABLE IF EXISTS `student`;
CREATE TABLE `student` (
  `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
  `name` varchar(25) COLLATE utf8_bin DEFAULT NULL,
  `password` varchar(25) COLLATE utf8_bin DEFAULT NULL,
  `age` int(10) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8 COLLATE=utf8_bin;

插入數(shù)據(jù)

INSERT INTO `student` VALUES ("1", "zhisheng01", "123456", "18"), ("2", "zhisheng02", "123", "17"), ("3", "zhisheng03", "1234", "18"), ("4", "zhisheng04", "12345", "16");
COMMIT;

新建實(shí)體類:Student.java

package com.zhisheng.flink.model;

/**
 * Desc:
 * weixi: zhisheng_tian
 * blog: http://www.54tianzhisheng.cn/
 */
public class Student {
    public int id;
    public String name;
    public String password;
    public int age;

    public Student() {
    }

    public Student(int id, String name, String password, int age) {
        this.id = id;
        this.name = name;
        this.password = password;
        this.age = age;
    }

    @Override
    public String toString() {
        return "Student{" +
                "id=" + id +
                ", name="" + name + """ +
                ", password="" + password + """ +
                ", age=" + age +
                "}";
    }

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getPassword() {
        return password;
    }

    public void setPassword(String password) {
        this.password = password;
    }

    public int getAge() {
        return age;
    }

    public void setAge(int age) {
        this.age = age;
    }
}

新建 Source 類 SourceFromMySQL.java,該類繼承 RichSourceFunction ,實(shí)現(xiàn)里面的 open、close、run、cancel 方法:

package com.zhisheng.flink.source;

import com.zhisheng.flink.model.Student;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;


/**
* Desc:
* weixi: zhisheng_tian
* blog: http://www.54tianzhisheng.cn/
*/
public class SourceFromMySQL extends RichSourceFunction {

   PreparedStatement ps;
   private Connection connection;

   /**
    * open() 方法中建立連接,這樣不用每次 invoke 的時(shí)候都要建立連接和釋放連接。
    *
    * @param parameters
    * @throws Exception
    */
   @Override
   public void open(Configuration parameters) throws Exception {
       super.open(parameters);
       connection = getConnection();
       String sql = "select * from Student;";
       ps = this.connection.prepareStatement(sql);
   }

   /**
    * 程序執(zhí)行完畢就可以進(jìn)行,關(guān)閉連接和釋放資源的動(dòng)作了
    *
    * @throws Exception
    */
   @Override
   public void close() throws Exception {
       super.close();
       if (connection != null) { //關(guān)閉連接和釋放資源
           connection.close();
       }
       if (ps != null) {
           ps.close();
       }
   }

   /**
    * DataStream 調(diào)用一次 run() 方法用來(lái)獲取數(shù)據(jù)
    *
    * @param ctx
    * @throws Exception
    */
   @Override
   public void run(SourceContext ctx) throws Exception {
       ResultSet resultSet = ps.executeQuery();
       while (resultSet.next()) {
           Student student = new Student(
                   resultSet.getInt("id"),
                   resultSet.getString("name").trim(),
                   resultSet.getString("password").trim(),
                   resultSet.getInt("age"));
           ctx.collect(student);
       }
   }

   @Override
   public void cancel() {
   }

   private static Connection getConnection() {
       Connection con = null;
           try {
               Class.forName("com.mysql.jdbc.Driver");
               con = DriverManager.getConnection("jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8", "root", "root123456");
           } catch (Exception e) {
               System.out.println("-----------mysql get connection has exception , msg = "+ e.getMessage());
           }
       return con;
   }
}

Flink 程序

package com.zhisheng.flink;

import com.zhisheng.flink.source.SourceFromMySQL;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * Desc:
 * weixi: zhisheng_tian
 * blog: http://www.54tianzhisheng.cn/
 */
public class Main2 {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.addSource(new SourceFromMySQL()).print();

        env.execute("Flink add data sourc");
    }
}

運(yùn)行 Flink 程序,控制臺(tái)日志中可以看見(jiàn)打印的 student 信息。

RichSourceFunction

從上面自定義的 Source 可以看到我們繼承的就是這個(gè) RichSourceFunction 類,那么來(lái)了解一下:

一個(gè)抽象類,繼承自 AbstractRichFunction。為實(shí)現(xiàn)一個(gè) Rich SourceFunction 提供基礎(chǔ)能力。該類的子類有三個(gè),兩個(gè)是抽象類,在此基礎(chǔ)上提供了更具體的實(shí)現(xiàn),另一個(gè)是 ContinuousFileMonitoringFunction。

MessageAcknowledgingSourceBase :它針對(duì)的是數(shù)據(jù)源是消息隊(duì)列的場(chǎng)景并且提供了基于 ID 的應(yīng)答機(jī)制。

MultipleIdsMessageAcknowledgingSourceBase : 在 MessageAcknowledgingSourceBase 的基礎(chǔ)上針對(duì) ID 應(yīng)答機(jī)制進(jìn)行了更為細(xì)分的處理,支持兩種 ID 應(yīng)答模型:session id 和 unique message id。

ContinuousFileMonitoringFunction:這是單個(gè)(非并行)監(jiān)視任務(wù),它接受 FileInputFormat,并且根據(jù) FileProcessingMode 和 FilePathFilter,它負(fù)責(zé)監(jiān)視用戶提供的路徑;決定應(yīng)該進(jìn)一步讀取和處理哪些文件;創(chuàng)建與這些文件對(duì)應(yīng)的 FileInputSplit 拆分,將它們分配給下游任務(wù)以進(jìn)行進(jìn)一步處理。

最后

本文主要講了下 Flink 使用 Kafka Source 的使用,并提供了一個(gè) demo 教大家如何自定義 Source,從 MySQL 中讀取數(shù)據(jù),當(dāng)然你也可以從其他地方讀取,實(shí)現(xiàn)自己的數(shù)據(jù)源 source。可能平時(shí)工作會(huì)比這個(gè)更復(fù)雜,需要大家靈活應(yīng)對(duì)!

關(guān)注我

轉(zhuǎn)載請(qǐng)務(wù)必注明原創(chuàng)地址為:http://www.54tianzhisheng.cn/2018/10/30/flink-create-source/

另外我自己整理了些 Flink 的學(xué)習(xí)資料,目前已經(jīng)全部放到微信公眾號(hào)了。你可以加我的微信:zhisheng_tian,然后回復(fù)關(guān)鍵字:Flink 即可無(wú)條件獲取到。

相關(guān)文章

1、《從0到1學(xué)習(xí)Flink》—— Apache Flink 介紹

2、《從0到1學(xué)習(xí)Flink》—— Mac 上搭建 Flink 1.6.0 環(huán)境并構(gòu)建運(yùn)行簡(jiǎn)單程序入門

3、《從0到1學(xué)習(xí)Flink》—— Flink 配置文件詳解

4、《從0到1學(xué)習(xí)Flink》—— Data Source 介紹

5、《從0到1學(xué)習(xí)Flink》—— 如何自定義 Data Source ?

6、《從0到1學(xué)習(xí)Flink》—— Data Sink 介紹

7、《從0到1學(xué)習(xí)Flink》—— 如何自定義 Data Sink ?

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://specialneedsforspecialkids.com/yun/72116.html

相關(guān)文章

  • 01學(xué)習(xí)Flink》—— Flink Data transformation(轉(zhuǎn)換)

    摘要:這些切片稱為窗口。函數(shù)允許對(duì)常規(guī)數(shù)據(jù)流進(jìn)行分組。通常,這是非并行數(shù)據(jù)轉(zhuǎn)換,因?yàn)樗诜欠謪^(qū)數(shù)據(jù)流上運(yùn)行。 showImg(https://segmentfault.com/img/remote/1460000017874226?w=1920&h=1271); 前言 在第一篇介紹 Flink 的文章 《《從0到1學(xué)習(xí)Flink》—— Apache Flink 介紹》 中就說(shuō)過(guò) Flink ...

    oujie 評(píng)論0 收藏0
  • 01學(xué)習(xí)Flink》—— Data Source 介紹

    摘要:指定了該迭代器返回元素的類型。這可能導(dǎo)致節(jié)點(diǎn)故障后的恢復(fù)速度較慢,因?yàn)樵撟鳂I(yè)將從最后一個(gè)檢查點(diǎn)恢復(fù)讀取。監(jiān)聽(tīng)的端口過(guò)來(lái)的數(shù)據(jù)這個(gè)在從到學(xué)習(xí)上搭建環(huán)境并構(gòu)建運(yùn)行簡(jiǎn)單程序入門文章里用的就是基于的程序。取消一個(gè),也即將中的循環(huán)元素的行為終止。 showImg(https://segmentfault.com/img/remote/1460000016944116); 前言 Data Sou...

    XFLY 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

最新活動(dòng)
閱讀需要支付1元查看
<