摘要:從上面自定義的可以看到我們繼承的就是這個(gè)類,那么來(lái)了解一下一個(gè)抽象類,繼承自。該類的子類有三個(gè),兩個(gè)是抽象類,在此基礎(chǔ)上提供了更具體的實(shí)現(xiàn),另一個(gè)是。
前言
在 《從0到1學(xué)習(xí)Flink》—— Data Source 介紹 文章中,我給大家介紹了 Flink Data Source 以及簡(jiǎn)短的介紹了一下自定義 Data Source,這篇文章更詳細(xì)的介紹下,并寫一個(gè) demo 出來(lái)讓大家理解。
Flink Kafka source 準(zhǔn)備工作我們先來(lái)看下 Flink 從 Kafka topic 中獲取數(shù)據(jù)的 demo,首先你需要安裝好了 FLink 和 Kafka 。
運(yùn)行啟動(dòng) Flink、Zookepeer、Kafka,
好了,都啟動(dòng)了!
maven 依賴測(cè)試發(fā)送數(shù)據(jù)到 kafka topicorg.apache.flink flink-java ${flink.version} provided org.apache.flink flink-streaming-java_${scala.binary.version} ${flink.version} provided org.slf4j slf4j-log4j12 1.7.7 runtime log4j log4j 1.2.17 runtime org.apache.flink flink-connector-kafka-0.11_${scala.binary.version} ${flink.version} com.alibaba fastjson 1.2.51
實(shí)體類,Metric.java
package com.zhisheng.flink.model; import java.util.Map; /** * Desc: * weixi: zhisheng_tian * blog: http://www.54tianzhisheng.cn/ */ public class Metric { public String name; public long timestamp; public Mapfields; public Map tags; public Metric() { } public Metric(String name, long timestamp, Map fields, Map tags) { this.name = name; this.timestamp = timestamp; this.fields = fields; this.tags = tags; } @Override public String toString() { return "Metric{" + "name="" + name + """ + ", timestamp="" + timestamp + """ + ", fields=" + fields + ", tags=" + tags + "}"; } public String getName() { return name; } public void setName(String name) { this.name = name; } public long getTimestamp() { return timestamp; } public void setTimestamp(long timestamp) { this.timestamp = timestamp; } public Map getFields() { return fields; } public void setFields(Map fields) { this.fields = fields; } public Map getTags() { return tags; } public void setTags(Map tags) { this.tags = tags; } }
往 kafka 中寫數(shù)據(jù)工具類:KafkaUtils.java
import com.alibaba.fastjson.JSON; import com.zhisheng.flink.model.Metric; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.HashMap; import java.util.Map; import java.util.Properties; /** * 往kafka中寫數(shù)據(jù) * 可以使用這個(gè)main函數(shù)進(jìn)行測(cè)試一下 * weixin: zhisheng_tian * blog: http://www.54tianzhisheng.cn/ */ public class KafkaUtils { public static final String broker_list = "localhost:9092"; public static final String topic = "metric"; // kafka topic,F(xiàn)link 程序中需要和這個(gè)統(tǒng)一 public static void writeToKafka() throws InterruptedException { Properties props = new Properties(); props.put("bootstrap.servers", broker_list); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //key 序列化 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //value 序列化 KafkaProducer producer = new KafkaProducer(props); Metric metric = new Metric(); metric.setTimestamp(System.currentTimeMillis()); metric.setName("mem"); Map tags = new HashMap<>(); Map fields = new HashMap<>(); tags.put("cluster", "zhisheng"); tags.put("host_ip", "101.147.022.106"); fields.put("used_percent", 90d); fields.put("max", 27244873d); fields.put("used", 17244873d); fields.put("init", 27244873d); metric.setTags(tags); metric.setFields(fields); ProducerRecord record = new ProducerRecord (topic, null, null, JSON.toJSONString(metric)); producer.send(record); System.out.println("發(fā)送數(shù)據(jù): " + JSON.toJSONString(metric)); producer.flush(); } public static void main(String[] args) throws InterruptedException { while (true) { Thread.sleep(300); writeToKafka(); } } }
運(yùn)行:
如果出現(xiàn)如上圖標(biāo)記的,即代表能夠不斷的往 kafka 發(fā)送數(shù)據(jù)的。
Flink 程序Main.java
package com.zhisheng.flink; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011; import java.util.Properties; /** * Desc: * weixi: zhisheng_tian * blog: http://www.54tianzhisheng.cn/ */ public class Main { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("zookeeper.connect", "localhost:2181"); props.put("group.id", "metric-group"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //key 反序列化 props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("auto.offset.reset", "latest"); //value 反序列化 DataStreamSourcedataStreamSource = env.addSource(new FlinkKafkaConsumer011<>( "metric", //kafka topic new SimpleStringSchema(), // String 序列化 props)).setParallelism(1); dataStreamSource.print(); //把從 kafka 讀取到的數(shù)據(jù)打印在控制臺(tái) env.execute("Flink add data source"); } }
運(yùn)行起來(lái):
看到?jīng)]程序,F(xiàn)link 程序控制臺(tái)能夠源源不斷的打印數(shù)據(jù)呢。
自定義 Source上面就是 Flink 自帶的 Kafka source,那么接下來(lái)就模仿著寫一個(gè)從 MySQL 中讀取數(shù)據(jù)的 Source。
首先 pom.xml 中添加 MySQL 依賴:
mysql mysql-connector-java 5.1.34
數(shù)據(jù)庫(kù)建表如下:
DROP TABLE IF EXISTS `student`; CREATE TABLE `student` ( `id` int(11) unsigned NOT NULL AUTO_INCREMENT, `name` varchar(25) COLLATE utf8_bin DEFAULT NULL, `password` varchar(25) COLLATE utf8_bin DEFAULT NULL, `age` int(10) DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8 COLLATE=utf8_bin;
插入數(shù)據(jù):
INSERT INTO `student` VALUES ("1", "zhisheng01", "123456", "18"), ("2", "zhisheng02", "123", "17"), ("3", "zhisheng03", "1234", "18"), ("4", "zhisheng04", "12345", "16"); COMMIT;
新建實(shí)體類:Student.java
package com.zhisheng.flink.model; /** * Desc: * weixi: zhisheng_tian * blog: http://www.54tianzhisheng.cn/ */ public class Student { public int id; public String name; public String password; public int age; public Student() { } public Student(int id, String name, String password, int age) { this.id = id; this.name = name; this.password = password; this.age = age; } @Override public String toString() { return "Student{" + "id=" + id + ", name="" + name + """ + ", password="" + password + """ + ", age=" + age + "}"; } public int getId() { return id; } public void setId(int id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getPassword() { return password; } public void setPassword(String password) { this.password = password; } public int getAge() { return age; } public void setAge(int age) { this.age = age; } }
新建 Source 類 SourceFromMySQL.java,該類繼承 RichSourceFunction ,實(shí)現(xiàn)里面的 open、close、run、cancel 方法:
package com.zhisheng.flink.source; import com.zhisheng.flink.model.Student; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; /** * Desc: * weixi: zhisheng_tian * blog: http://www.54tianzhisheng.cn/ */ public class SourceFromMySQL extends RichSourceFunction{ PreparedStatement ps; private Connection connection; /** * open() 方法中建立連接,這樣不用每次 invoke 的時(shí)候都要建立連接和釋放連接。 * * @param parameters * @throws Exception */ @Override public void open(Configuration parameters) throws Exception { super.open(parameters); connection = getConnection(); String sql = "select * from Student;"; ps = this.connection.prepareStatement(sql); } /** * 程序執(zhí)行完畢就可以進(jìn)行,關(guān)閉連接和釋放資源的動(dòng)作了 * * @throws Exception */ @Override public void close() throws Exception { super.close(); if (connection != null) { //關(guān)閉連接和釋放資源 connection.close(); } if (ps != null) { ps.close(); } } /** * DataStream 調(diào)用一次 run() 方法用來(lái)獲取數(shù)據(jù) * * @param ctx * @throws Exception */ @Override public void run(SourceContext ctx) throws Exception { ResultSet resultSet = ps.executeQuery(); while (resultSet.next()) { Student student = new Student( resultSet.getInt("id"), resultSet.getString("name").trim(), resultSet.getString("password").trim(), resultSet.getInt("age")); ctx.collect(student); } } @Override public void cancel() { } private static Connection getConnection() { Connection con = null; try { Class.forName("com.mysql.jdbc.Driver"); con = DriverManager.getConnection("jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8", "root", "root123456"); } catch (Exception e) { System.out.println("-----------mysql get connection has exception , msg = "+ e.getMessage()); } return con; } }
Flink 程序:
package com.zhisheng.flink; import com.zhisheng.flink.source.SourceFromMySQL; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; /** * Desc: * weixi: zhisheng_tian * blog: http://www.54tianzhisheng.cn/ */ public class Main2 { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.addSource(new SourceFromMySQL()).print(); env.execute("Flink add data sourc"); } }
運(yùn)行 Flink 程序,控制臺(tái)日志中可以看見(jiàn)打印的 student 信息。
RichSourceFunction從上面自定義的 Source 可以看到我們繼承的就是這個(gè) RichSourceFunction 類,那么來(lái)了解一下:
一個(gè)抽象類,繼承自 AbstractRichFunction。為實(shí)現(xiàn)一個(gè) Rich SourceFunction 提供基礎(chǔ)能力。該類的子類有三個(gè),兩個(gè)是抽象類,在此基礎(chǔ)上提供了更具體的實(shí)現(xiàn),另一個(gè)是 ContinuousFileMonitoringFunction。
MessageAcknowledgingSourceBase :它針對(duì)的是數(shù)據(jù)源是消息隊(duì)列的場(chǎng)景并且提供了基于 ID 的應(yīng)答機(jī)制。
MultipleIdsMessageAcknowledgingSourceBase : 在 MessageAcknowledgingSourceBase 的基礎(chǔ)上針對(duì) ID 應(yīng)答機(jī)制進(jìn)行了更為細(xì)分的處理,支持兩種 ID 應(yīng)答模型:session id 和 unique message id。
ContinuousFileMonitoringFunction:這是單個(gè)(非并行)監(jiān)視任務(wù),它接受 FileInputFormat,并且根據(jù) FileProcessingMode 和 FilePathFilter,它負(fù)責(zé)監(jiān)視用戶提供的路徑;決定應(yīng)該進(jìn)一步讀取和處理哪些文件;創(chuàng)建與這些文件對(duì)應(yīng)的 FileInputSplit 拆分,將它們分配給下游任務(wù)以進(jìn)行進(jìn)一步處理。
最后本文主要講了下 Flink 使用 Kafka Source 的使用,并提供了一個(gè) demo 教大家如何自定義 Source,從 MySQL 中讀取數(shù)據(jù),當(dāng)然你也可以從其他地方讀取,實(shí)現(xiàn)自己的數(shù)據(jù)源 source。可能平時(shí)工作會(huì)比這個(gè)更復(fù)雜,需要大家靈活應(yīng)對(duì)!
關(guān)注我轉(zhuǎn)載請(qǐng)務(wù)必注明原創(chuàng)地址為:http://www.54tianzhisheng.cn/2018/10/30/flink-create-source/
另外我自己整理了些 Flink 的學(xué)習(xí)資料,目前已經(jīng)全部放到微信公眾號(hào)了。你可以加我的微信:zhisheng_tian,然后回復(fù)關(guān)鍵字:Flink 即可無(wú)條件獲取到。
相關(guān)文章1、《從0到1學(xué)習(xí)Flink》—— Apache Flink 介紹
2、《從0到1學(xué)習(xí)Flink》—— Mac 上搭建 Flink 1.6.0 環(huán)境并構(gòu)建運(yùn)行簡(jiǎn)單程序入門
3、《從0到1學(xué)習(xí)Flink》—— Flink 配置文件詳解
4、《從0到1學(xué)習(xí)Flink》—— Data Source 介紹
5、《從0到1學(xué)習(xí)Flink》—— 如何自定義 Data Source ?
6、《從0到1學(xué)習(xí)Flink》—— Data Sink 介紹
7、《從0到1學(xué)習(xí)Flink》—— 如何自定義 Data Sink ?
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://specialneedsforspecialkids.com/yun/72116.html
摘要:這些切片稱為窗口。函數(shù)允許對(duì)常規(guī)數(shù)據(jù)流進(jìn)行分組。通常,這是非并行數(shù)據(jù)轉(zhuǎn)換,因?yàn)樗诜欠謪^(qū)數(shù)據(jù)流上運(yùn)行。 showImg(https://segmentfault.com/img/remote/1460000017874226?w=1920&h=1271); 前言 在第一篇介紹 Flink 的文章 《《從0到1學(xué)習(xí)Flink》—— Apache Flink 介紹》 中就說(shuō)過(guò) Flink ...
摘要:指定了該迭代器返回元素的類型。這可能導(dǎo)致節(jié)點(diǎn)故障后的恢復(fù)速度較慢,因?yàn)樵撟鳂I(yè)將從最后一個(gè)檢查點(diǎn)恢復(fù)讀取。監(jiān)聽(tīng)的端口過(guò)來(lái)的數(shù)據(jù)這個(gè)在從到學(xué)習(xí)上搭建環(huán)境并構(gòu)建運(yùn)行簡(jiǎn)單程序入門文章里用的就是基于的程序。取消一個(gè),也即將中的循環(huán)元素的行為終止。 showImg(https://segmentfault.com/img/remote/1460000016944116); 前言 Data Sou...
閱讀 752·2021-09-28 09:35
閱讀 2591·2019-08-29 11:25
閱讀 2154·2019-08-23 18:36
閱讀 1849·2019-08-23 16:31
閱讀 2065·2019-08-23 14:50
閱讀 3112·2019-08-23 13:55
閱讀 3286·2019-08-23 12:49
閱讀 2074·2019-08-23 11:46