国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專欄INFORMATION COLUMN

《從0到1學(xué)習(xí)Flink》—— 如何自定義 Data Sink ?

NikoManiac / 1949人閱讀

摘要:前言前篇文章從到學(xué)習(xí)介紹介紹了,也介紹了自帶的,那么如何自定義自己的呢這篇文章將寫一個(gè)教大家將從的數(shù)據(jù)到中去。

前言

前篇文章 《從0到1學(xué)習(xí)Flink》—— Data Sink 介紹 介紹了 Flink Data Sink,也介紹了 Flink 自帶的 Sink,那么如何自定義自己的 Sink 呢?這篇文章將寫一個(gè) demo 教大家將從 Kafka Source 的數(shù)據(jù) Sink 到 MySQL 中去。

準(zhǔn)備工作

我們先來看下 Flink 從 Kafka topic 中獲取數(shù)據(jù)的 demo,首先你需要安裝好了 FLink 和 Kafka 。

運(yùn)行啟動(dòng) Flink、Zookepeer、Kafka,

好了,都啟動(dòng)了!

數(shù)據(jù)庫(kù)建表
DROP TABLE IF EXISTS `student`;
CREATE TABLE `student` (
  `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
  `name` varchar(25) COLLATE utf8_bin DEFAULT NULL,
  `password` varchar(25) COLLATE utf8_bin DEFAULT NULL,
  `age` int(10) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8 COLLATE=utf8_bin;
實(shí)體類

Student.java

package com.zhisheng.flink.model;

/**
 * Desc:
 * weixin: zhisheng_tian
 * blog: http://www.54tianzhisheng.cn/
 */
public class Student {
    public int id;
    public String name;
    public String password;
    public int age;

    public Student() {
    }

    public Student(int id, String name, String password, int age) {
        this.id = id;
        this.name = name;
        this.password = password;
        this.age = age;
    }

    @Override
    public String toString() {
        return "Student{" +
                "id=" + id +
                ", name="" + name + """ +
                ", password="" + password + """ +
                ", age=" + age +
                "}";
    }

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getPassword() {
        return password;
    }

    public void setPassword(String password) {
        this.password = password;
    }

    public int getAge() {
        return age;
    }

    public void setAge(int age) {
        this.age = age;
    }
}
工具類

工具類往 kafka topic student 發(fā)送數(shù)據(jù)

import com.alibaba.fastjson.JSON;
import com.zhisheng.flink.model.Metric;
import com.zhisheng.flink.model.Student;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

/**
 * 往kafka中寫數(shù)據(jù)
 * 可以使用這個(gè)main函數(shù)進(jìn)行測(cè)試一下
 * weixin: zhisheng_tian
 * blog: http://www.54tianzhisheng.cn/
 */
public class KafkaUtils2 {
    public static final String broker_list = "localhost:9092";
    public static final String topic = "student";  //kafka topic 需要和 flink 程序用同一個(gè) topic

    public static void writeToKafka() throws InterruptedException {
        Properties props = new Properties();
        props.put("bootstrap.servers", broker_list);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer producer = new KafkaProducer(props);

        for (int i = 1; i <= 100; i++) {
            Student student = new Student(i, "zhisheng" + i, "password" + i, 18 + i);
            ProducerRecord record = new ProducerRecord(topic, null, null, JSON.toJSONString(student));
            producer.send(record);
            System.out.println("發(fā)送數(shù)據(jù): " + JSON.toJSONString(student));
        }
        producer.flush();
    }

    public static void main(String[] args) throws InterruptedException {
        writeToKafka();
    }
}
SinkToMySQL

該類就是 Sink Function,繼承了 RichSinkFunction ,然后重寫了里面的方法。在 invoke 方法中將數(shù)據(jù)插入到 MySQL 中。

package com.zhisheng.flink.sink;

import com.zhisheng.flink.model.Student;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;

/**
 * Desc:
 * weixin: zhisheng_tian
 * blog: http://www.54tianzhisheng.cn/
 */
public class SinkToMySQL extends RichSinkFunction {
    PreparedStatement ps;
    private Connection connection;

    /**
     * open() 方法中建立連接,這樣不用每次 invoke 的時(shí)候都要建立連接和釋放連接
     *
     * @param parameters
     * @throws Exception
     */
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        connection = getConnection();
        String sql = "insert into Student(id, name, password, age) values(?, ?, ?, ?);";
        ps = this.connection.prepareStatement(sql);
    }

    @Override
    public void close() throws Exception {
        super.close();
        //關(guān)閉連接和釋放資源
        if (connection != null) {
            connection.close();
        }
        if (ps != null) {
            ps.close();
        }
    }

    /**
     * 每條數(shù)據(jù)的插入都要調(diào)用一次 invoke() 方法
     *
     * @param value
     * @param context
     * @throws Exception
     */
    @Override
    public void invoke(Student value, Context context) throws Exception {
        //組裝數(shù)據(jù),執(zhí)行插入操作
        ps.setInt(1, value.getId());
        ps.setString(2, value.getName());
        ps.setString(3, value.getPassword());
        ps.setInt(4, value.getAge());
        ps.executeUpdate();
    }

    private static Connection getConnection() {
        Connection con = null;
        try {
            Class.forName("com.mysql.jdbc.Driver");
            con = DriverManager.getConnection("jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8", "root", "root123456");
        } catch (Exception e) {
            System.out.println("-----------mysql get connection has exception , msg = "+ e.getMessage());
        }
        return con;
    }
}
Flink 程序

這里的 source 是從 kafka 讀取數(shù)據(jù)的,然后 Flink 從 Kafka 讀取到數(shù)據(jù)(JSON)后用阿里 fastjson 來解析成 student 對(duì)象,然后在 addSink 中使用我們創(chuàng)建的 SinkToMySQL,這樣就可以把數(shù)據(jù)存儲(chǔ)到 MySQL 了。

package com.zhisheng.flink;

import com.alibaba.fastjson.JSON;
import com.zhisheng.flink.model.Student;
import com.zhisheng.flink.sink.SinkToMySQL;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;

import java.util.Properties;

/**
 * Desc:
 * weixin: zhisheng_tian
 * blog: http://www.54tianzhisheng.cn/
 */
public class Main3 {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("zookeeper.connect", "localhost:2181");
        props.put("group.id", "metric-group");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("auto.offset.reset", "latest");

        SingleOutputStreamOperator student = env.addSource(new FlinkKafkaConsumer011<>(
                "student",   //這個(gè) kafka topic 需要和上面的工具類的 topic 一致
                new SimpleStringSchema(),
                props)).setParallelism(1)
                .map(string -> JSON.parseObject(string, Student.class)); //Fastjson 解析字符串成 student 對(duì)象

        student.addSink(new SinkToMySQL()); //數(shù)據(jù) sink 到 mysql

        env.execute("Flink add sink");
    }
}
結(jié)果

運(yùn)行 Flink 程序,然后再運(yùn)行 KafkaUtils2.java 工具類,這樣就可以了。

如果數(shù)據(jù)插入成功了,那么我們查看下我們的數(shù)據(jù)庫(kù):

數(shù)據(jù)庫(kù)中已經(jīng)插入了 100 條我們從 Kafka 發(fā)送的數(shù)據(jù)了。證明我們的 SinkToMySQL 起作用了。是不是很簡(jiǎn)單?

項(xiàng)目結(jié)構(gòu)

怕大家不知道我的項(xiàng)目結(jié)構(gòu),這里發(fā)個(gè)截圖看下:

最后

本文主要利用一個(gè) demo,告訴大家如何自定義 Sink Function,將從 Kafka 的數(shù)據(jù) Sink 到 MySQL 中,如果你項(xiàng)目中有其他的數(shù)據(jù)來源,你也可以換成對(duì)應(yīng)的 Source,也有可能你的 Sink 是到其他的地方或者其他不同的方式,那么依舊是這個(gè)套路:繼承 RichSinkFunction 抽象類,重寫 invoke 方法。

關(guān)注我

轉(zhuǎn)載請(qǐng)務(wù)必注明原創(chuàng)地址為:http://www.54tianzhisheng.cn/2018/10/31/flink-create-sink/

另外我自己整理了些 Flink 的學(xué)習(xí)資料,目前已經(jīng)全部放到微信公眾號(hào)了。你可以加我的微信:zhisheng_tian,然后回復(fù)關(guān)鍵字:Flink 即可無條件獲取到。

相關(guān)文章

1、《從0到1學(xué)習(xí)Flink》—— Apache Flink 介紹

2、《從0到1學(xué)習(xí)Flink》—— Mac 上搭建 Flink 1.6.0 環(huán)境并構(gòu)建運(yùn)行簡(jiǎn)單程序入門

3、《從0到1學(xué)習(xí)Flink》—— Flink 配置文件詳解

4、《從0到1學(xué)習(xí)Flink》—— Data Source 介紹

5、《從0到1學(xué)習(xí)Flink》—— 如何自定義 Data Source ?

6、《從0到1學(xué)習(xí)Flink》—— Data Sink 介紹

7、《從0到1學(xué)習(xí)Flink》—— 如何自定義 Data Sink ?

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://specialneedsforspecialkids.com/yun/72128.html

相關(guān)文章

  • 01學(xué)習(xí)Flink》—— Data Sink 介紹

    摘要:從上圖可以看到接口有方法,它有一個(gè)抽象類。上面的那些自帶的可以看到都是繼承了抽象類,實(shí)現(xiàn)了其中的方法,那么我們要是自己定義自己的的話其實(shí)也是要按照這個(gè)套路來做的。 showImg(https://segmentfault.com/img/remote/1460000016956595); 前言 再上一篇文章中 《從0到1學(xué)習(xí)Flink》—— Data Source 介紹 講解了 Fli...

    thursday 評(píng)論0 收藏0
  • 01學(xué)習(xí)Flink》—— Flink Data transformation(轉(zhuǎn)換)

    摘要:這些切片稱為窗口。函數(shù)允許對(duì)常規(guī)數(shù)據(jù)流進(jìn)行分組。通常,這是非并行數(shù)據(jù)轉(zhuǎn)換,因?yàn)樗诜欠謪^(qū)數(shù)據(jù)流上運(yùn)行。 showImg(https://segmentfault.com/img/remote/1460000017874226?w=1920&h=1271); 前言 在第一篇介紹 Flink 的文章 《《從0到1學(xué)習(xí)Flink》—— Apache Flink 介紹》 中就說過 Flink ...

    oujie 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

最新活動(dòng)
閱讀需要支付1元查看
<