摘要:開發指南是為簡化計算模型,降低用戶使用實時計算的門檻而設計的一套符合標準語義的開發套件。隨后,將為該表生成字段,用于記錄并表示事件時間。
UFli
接下來,開發者可以根據如下內容,逐漸熟悉并使用 UFli
數據類型 | 說明 | 映射的Java類型 |
---|---|---|
BOOLEAN | 邏輯值,true 或 false | Boolean.class |
INTEGER | 整形,4 字節整數 | Integer.class |
INT | 整型,4 字節整數 | Integer.class |
BIGINT | 長整型,8 字節整數 | Long.class |
TINYINT | 微整型,1 字節整數 | Byte.class |
SMALLINT | 短整型,2 字節整數 | Short.class |
VARCHAR | 變長字符串 | String.class |
FLOAT | 4 字節浮點數 | Float.class |
DOUBLE | 8 字節浮點數 | Double.class |
DATE | 日期類型: | Date.class |
TIMESTAMP | 時間戳類型 | Timestamp.class |
DECIMAL | 小數類型 | BigDecimal.class |
MAP | 映射類型 | li |
ARRAY | List 列表類型 | li |
LIST | List 列表類型 | li |
UFli
在 UFli
CREATE TABLE table_name(
columnName dataType
[ columnName dataType ]*
) WITH (
type = xxxx
xxxx = xxxx
);
? 以 Kafka 作為數據源表,建表語句示例如下:
CREATE TABLE ut_kafka_source(
user_id VARCHAR
pay_cash FLOAT
pay_time VARCHAR
)WITH(
type = kafka11
bootstrapServers = linux01:9092linux02:9092linux03:9092
topic = flink-test01
groupId = group1
parallelism = 1
);
上述代碼用于在 UFli
隨后,必須在 WITH 代碼塊中指定數據來源,以及數據源所使用的框架的必要參數。
? 可配置參數如下表所示:
參數名 | 必填 | 默認值 | 含義 | 舉例 |
---|---|---|---|---|
type | 是 | 無 | 數據源類型 | type = kafka11 |
bootstrapServers | 是 | 無 | Kafka Broker地址 | bootstrapServers = linux01:9092linux02:9092 |
topic | 是 | 無 | 將要消費的 Kafka 主題 | topic = fli |
groupId | 是 | 無 | 消費者組ID | groupId = group01 |
offsetReset | 否 | latest | 啟動時從何處消費 | offsetReset = earliest(或:latest、{"0": 12 "1": 32}) |
parallelism | 否 | 1 | 讀取數據并行度 | parallelism = 1 |
在 UFli
CREATE TABLE table_name(
columnName dataType
[ columnName dataType ]*
) WITH (
type = xxxx
xxxx = xxxx
);
? 以 Kafka 作為結果表,建表語句示例如下:
CREATE TABLE ut_kafka_result(
uid VARCHAR
pay_cash FLOAT
sex VARCHAR
age VARCHAR
)WITH(
type = kafka11
bootstrapServers = linux01:9092linux02:9092linux03:9092
topic = flink-test02
parallelism = 1
);
上述代碼用于在 UFli
隨后,必須在 WITH 代碼塊中指定數據輸出時的相關參數,以及數據源所使用的框架的必要參數。
? 可配置參數如下表所示:
參數名 | 必填 | 默認值 | 含義 | 舉例 |
---|---|---|---|---|
type | 是 | 無 | 數據源類型 | type = kafka11 |
bootstrapServers | 是 | 無 | Kafka Broker地址 | bootstrapServers = linux01:9092linux02:9092 |
topic | 是 | 無 | 將要消費的 Kafka 主題 | topic = fli |
parallelism | 否 | 1 | 讀取數據并行度 | parallelism = 1 |
? 以 MySQL 作為結果表,建表語句示例如下:
CREATE TABLE ut_mysql_result(
uid VARCHAR
pay_cash FLOAT
sex VARCHAR
age VARCHAR
)WITH(
type = mysql
url = jdbc:mysql://linux01:3306/db_flink_test?useUnicode=true&characterEncoding=utf-8
userName = root
password = 123456
tableName = ut_mysql_result
parallelism = 1
);
? 可配置參數如下表所示:
參數名 | 必填 | 默認值 | 含義 | 舉例 |
---|---|---|---|---|
type | 是 | 無 | 數據源類型 | type = mysql |
url | 是 | 無 | MySQL Databa | url = jdbc:mysql://linux01:3306/db_fli |
username | 是 | 無 | 用戶名 | userName =root |
password | 是 | 無 | 密碼 | password =123456 |
tablename | 是 | 無 | MySQL 中的表名 | tableName =ut_mysql_result |
parallelism | 否 | 1 | 讀取數據并行度 | parallelism = 1 |
? 以 PostgreSQL 作為結果表,建表語句示例如下:
CREATE TABLE ut_postgre_result(
uid VARCHAR
pay_cash FLOAT
sex VARCHAR
age VARCHAR
)WITH(
type = postgre
url = jdbc:postgresql://10.9.11.122:5432/db_flink_test?useUnicode=true&characterEncoding=utf-8
userName = root
password = 123456
tableName = ut_postgre_result
parallelism = 1
);
上述代碼展示了如何使用 UFli
? 可配置參數如下表所示:
參數名 | 必填 | 默認值 | 含義 | 舉例 |
---|---|---|---|---|
type | 是 | 無 | 數據源類型 | type = postgre |
url | 是 | 無 | PostgreSQL Databa | url = jdbc:postgresql://10.9.11.122:5432/db_fli |
username | 是 | 無 | 用戶名 | userName =root |
password | 是 | 無 | 密碼 | password =123456 |
tablename | 是 | 無 | PostgreSQL 中的表名 | tableName =ut_postgre_result |
parallelism | 否 | 1 | 讀取數據并行度 | parallelism = 1 |
在 UFli
CREATE TABLE table_name(
columnName dataType
[ columnName dataType ]*
) WITH (
type = xxxx
xxxx = xxxx
SIDE DIMENSION SIGN
);
其中,維表中最后一個字段,必須為SIDE DIMENSION SIGN,以此來標識這是一張維度表。
? 以 MySQL 作為維度表,建表語句示例如下:
CREATE TABLE ut_mysql_side(
uid VARCHAR
sex VARCHAR
age VARCHAR
PRIMARY KEY(uid)
SIDE DIMENSION SIGN
)WITH(
type =mysql
url =jdbc:mysql://linux01:3306/db_flink_test?useUnicode=true&characterEncoding=utf-8
userName =root
password =123456
tableName =ut_mysql_side
cache =LRU
cacheSize =2
cacheTTLMs =2001
parallelism =1
partitionedJoin=false
);
上述代碼用于在 UFli
隨后,必須在 WITH 代碼塊中指定數據輸出時的相關參數,以及數據源所使用的框架的必要參數。
可配置參數如下表所示:
參數名 | 必填 | 默認值 | 含義 | 舉例 |
---|---|---|---|---|
type | 是 | 無 | 數據源類型 | type = postgre |
url | 是 | 無 | MySQL Databa | url = jdbc:mysql://linux01:3306/db_fli |
username | 是 | 無 | 用戶名 | userName =root |
password | 是 | 無 | 密碼 | password =123456 |
tablename | 是 | 無 | MySQL 中的表名 | tableName =ut_postgre_result |
cache | 否 | NONE | 維表數據緩存方式 | cache = LRU cache = NONE |
cacheSize | 否 | 無 | 緩存行數,cache屬性為LRU時生效 | cacheSize = 100 |
cacheTTLMs | 否 | 無 | 緩存過期時間,cache屬性為LRU時生效,單位:毫秒 | cacheTTLMs =60000 |
partitionedJoin | 否 | 無 | 是否在 JOIN 數據之前,將維表數據按照 PRIMARY KEY指定的列進行reduceByKey操作,從而減少維表緩存數據量。 | partitionedJoin = true |
parallelism | 否 | 1 | 讀取數據并行度 | parallelism = 1 |
? 以 MySQL 作為維度表,建表語句示例如下:
CREATE TABLE ut_postgre_side(
uid VARCHAR
sex VARCHAR
age VARCHAR
PRIMARY KEY(uid)
SIDE DIMENSION SIGN
)WITH(
type =postgre
url = jdbc:postgresql://10.9.116.184:5432/db_flink_test?useUnicode=true&characterEncoding=utf-8
userName =root
password =birthDAY+-0230
tableName =ut_postgre_side
parallelism =1
cache =LRU
cacheSize =2
cacheTTLMs =2001
partitionedJoin=false
);
上述代碼用于在 UFli
隨后,必須在 WITH 代碼塊中指定數據輸出時的相關參數,以及數據源所使用的框架的必要參數。
可配置參數如下表所示:
參數名 | 必填 | 默認值 | 含義 | 舉例 |
---|---|---|---|---|
type | 是 | 無 | 數據源類型 | type = postgre |
url | 是 | 無 | PostgreSQL Databa | url = jdbc:postgresql://10.9.11.122:5432/db_fli |
username | 是 | 無 | 用戶名 | userName =root |
password | 是 | 無 | 密碼 | password =123456 |
tablename | 是 | 無 | PostgreSQL 中的表名 | tableName =ut_postgre_result |
cache | 否 | NONE | 維表數據緩存方式 | cache = LRU cache = NONE |
cacheSize | 否 | 無 | 緩存行數,cache屬性為LRU時生效 | cacheSize = 100 |
cacheTTLMs | 否 | 無 | 緩存過期時間,cache屬性為LRU時生效,單位:毫秒 | cacheTTLMs =60000 |
partitionedJoin | 否 | 無 | 是否在 JOIN 數據之前,將維表數據按照 PRIMARY KEY指定的列進行reduceByKey操作,從而減少維表緩存數據量。 | partitionedJoin = true |
parallelism | 否 | 1 | 讀取數據并行度 | parallelism = 1 |
? 在 UFli
CREATE VIEW view_name AS SELECT columnName [ columnName]* FROM table_name[view_name];
在 Fli
在 UFli
INSERT INTO
table_name
SELECT
[ (columnName[ columnName]*) ]
queryStatement;
例如如下 SQL 操作:
INSERT INTO
ut_kafka_result
SELECT
t3.uid
t3.pay_cash
t3.sex
t3.age
FROM
(SELECT
t1.*
t2.uid
t2.sex
t2.age
FROM
ut_kafka_source t1
JOIN
ut_mysql_side t2
ON
t1.user_id = t2.uid
WHERE
t2.sex = 男
AND
t1.pay_cash > 100) AS t3
對于 INSERT INTO 操作,需遵循如下約束:
UFli
例如下面的數據:
{"grade_data":[{"class1":{"小明":"98""老王":"100"}}{"class2":{"凱特琳":"88""凱南":"99"}}]}
通過如下 SQL 語句進行 行轉列 操作:
CREATE TABLE FUNCTION ParseArrayUDTF WITH cn.ucloud.sql.ParseArrayUDTF;
INSERT INTO
tb_class_grade_result
SELECT
class
student
grade
FROM
tb_class_grade LATERAL TABLE(ParseArrayUDTF(grade_data)) as T(class student grade)
執行完畢后,輸出效果為:
class | student | grade |
---|---|---|
class1 | 小明 | 98 |
class1 | 老王 | 100 |
class2 | 凱特琳 | 88 |
class2 | 凱南 | 99 |
其中自定義函數的具體實現可參考第 5 小節中的內容。
同時也可以使用 WHERE 和 JOIN 語法,例如如下 SQL 語句:
SELECT
t3.uid
t3.pay_cash
t3.sex
t3.age
FROM
(SELECT
t1.*
t2.uid
t2.sex
t2.age
FROM
ut_kafka_source t1
JOIN
ut_mysql_side t2
ON
t1.user_id = t2.uid
WHERE
t2.sex = 男
AND
t1.pay_cash > 100) AS t3
另外同時支持 Fli
在 Fli
該類型函數主要特點為“一進一出”,即,依次對某單列數據進行處理,并輸出當前列的處理結果(依然為單列)。
編寫 UFli
public class XXX extends ScalarFunction{
public OUT eval(IN columnValue){
...
}
}
以對某列數據進行大寫轉換為例,通過繼承 ScalarFunction 類實現 UDF 函數,代碼如下:
package cn.ucloud.sql;
public class TransCaseUDF extends ScalarFunction {
private static final Logger LOG = LoggerFactory.getLogger(TransCaseUDF.class);
public String eval(object data){
return new String(data.toString().toLowerCase().getBytes() StandardCharsets.UTF_8);
}
}
其中 eval 為固定方法名。
在 UFli
CREATE SCALAR FUNCTION [UDFName] WITH [UDF Class Name];
注冊示例如下所示:
CREATE SCALAR FUNCTION TransCaseUDF WITH cn.ucloud.sql.TransCaseUDF;
結合上述示例,使用該函數方法如下:
INSERT INTO
tb_name
SELECT
TransCaseUDF(name) as upper_name
FROM
tb_user_info;
該類型函數主要特點為 “多進一出”,即,對某幾列多行數據進行聚合,并輸出聚合結果。
編寫 UFli
public static class XXXAccum {...)
public class XXXUDAF extends AggregateFunction {
public XXXAccum createAccumulator(){...} //創建累加器
public T getValue(){...} //獲取聚合結果
public void accumulate(){...} //累加
public void merge(){...} //合并多個分區的累加器
public void resetAccumulator(){...} //重置累加器
}
以求某列的平均數為例,實現方法如下:
package cn.ucloud.sql;
import org.apache.flink.table.functions.AggregateFunction;
import java.util.Iterator;
public class AverageUDAF extends AggregateFunction {
public static class AverageAccum {
public float sum count = 0F;
}
@Override
public AverageAccum createAccumulator() {
return new AverageAccum();
}
@Override
public Float getValue(AverageAccum accumulator) {
if(accumulator.count == 0F) return null;
else return accumulator.sum / accumulator.count;
}
public void accumulate(AverageAccum accumulator float value) {
accumulator.sum += value;
accumulator.count += 1F;
}
public void merge(AverageAccum accumulator Iterable it) {
Iterator iter = it.iterator();
while(iter.hasNext()) {
AverageAccum acc = iter.next();
accumulator.sum += acc.sum;
accumulator.count += acc.count;
}
}
public void resetAccumulator(AverageAccum accumulator) {
accumulator.sum = 0F;
accumulator.count = 0F;
}
}
在 UFli
CREATE AGGREGATE FUNCTION [UDAFName] WITH [UDAF Class Name];
結合上述示例,使用該函數方法如下:
CREATE AGGREGATE FUNCTION AverageUDAF WITH cn.ucloud.sql.AverageUDAF;
結合上述示例,使用該函數方法如下:
CREATE AGGREGATE FUNCTION AverageUDAF WITH cn.ucloud.sql.AverageUDAF;
CREATE TABLE ut_kafka_source(
sex VARCHAR
age FLOAT
)WITH(
type =kafka11
bootstrapServers =linux01:9092linux02:9092linux03:9092
topic =flink-test01
groupId = group1
parallelism =1
);
CREATE TABLE ut_mysql_age_result(
sex VARCHAR
average_age FLOAT
)WITH(
type =mysql
url = jdbc:mysql://linux01:3306/db_flink_test?useUnicode=true&characterEncoding=utf-8
userName =root
password =123456
tableName =ut_mysql_age_result
parallelism =1
);
INSERT INTO
ut_mysql_age_result
SELECT
sex
AverageUDAF(age) AS average_age
FROM
ut_kafka_source
GROUP BY
sex;
該類型函數主要特點為 “一進多出”,即,對某一行一列數據進行拆分,并輸出結果到某列多行。
編寫 UFli
public class ParseArrayUDTF extends TableFunction {
public void eval(T obj) {
...
}
@Override
public TypeInformation getResultType() {
return Types.ROW(A.class B.class C.class);
}
}
以拆分如下數據為例:
{"grade_data":[{"class1":{"小明":"98""老王":"100"}}{"class2":{"凱特琳":"88""凱南":"99"}}]}
將其拆分為如下表:
class | student | grade |
---|---|---|
class1 | 小明 | 98 |
class1 | 老王 | 100 |
class2 | 凱特琳 | 88 |
class2 | 凱南 | 99 |
此時,UDTF 函數實現如下:
package cn.ucloud.sql;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.api.Types;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;
import java.util.List;
import java.util.Map;
public class ParseArrayUDTF extends TableFunction {
//list 數據為 [{"class1":{"小明":"98""老王":"100"}}{"class2":{"凱特琳":"88""凱南":"99"}}]
public void eval(List
在 UFli
CREATE TABLE FUNCTION [UDTFName] WITH [UDTF Class Name];
結合上述示例,使用該函數方法如下:
CREATE TABLE FUNCTION ParseArrayUDTF WITH cn.ucloud.sql.ParseArrayUDTF;
結合上述示例,使用該函數方法如下:
CREATE TABLE FUNCTION ParseArrayUDTF WITH cn.ucloud.sql.ParseArrayUDTF;
CREATE TABLE tb_class_grade(
grade_data ARRAY
)WITH(
type =kafka11
bootstrapServers =linux01:9092linux02:9092linux03:9092
zookeeperQuorum =linux01:2181/kafka
offsetReset =latest
topic =flink-sql-test02
groupId = group1
parallelism =1
);
CREATE TABLE tb_class_grade_result(
class VARCHAR
student VARCHAR
grade VARCHAR
)WITH(
type =mysql
url = jdbc:mysql://linux01:3306/db_flink_test?useUnicode=true&characterEncoding=utf-8
userName =root
password =123456
tableName =tb_class_grade_result
parallelism =1
);
INSERT INTO
tb_class_grade_result
SELECT
class
student
grade
FROM
tb_class_grade LATERAL TABLE(ParseArrayUDTF(grade_data)) as T(class student grade)
在創建表時,在屬性字段的最后一行,為時間列創建Watermark,語法如下:
WATERMARK FOR columnName AS WITHOFFSET(columnName delayTime(ms))
其中 columnName 必須為 LONG ,或BIGINT,或 TimeStamp類型的事件時間。
隨后,Fli
舉個例子來說明 Watermark 的相關用法,示例 SQL 內容如下:
CREATE TABLE ut_kafka_source(
user_id VARCHAR
pay_cash FLOAT
pay_time LONG
WATERMARK FOR pay_time AS WITHOFFSET(pay_time 2000)
)WITH(
type =kafka11
bootstrapServers =linux01:9092linux02:9092linux03:9092
topic =flink-test01
groupId = group1
parallelism =1
);
CREATE TABLE ut_kafka_result(
user_id VARCHAR
pay_cash FLOAT
)WITH(
type =kafka11
bootstrapServers =linux01:9092linux02:9092linux03:9092
topic =flink-test02
parallelism =1
);
INSERT INTO
ut_kafka_result
SELECT
user_id
SUM(pay_cash) as pay_cash
FROM
ut_kafka_source
GROUP BY
TUMBLE (
ROWTIME
INTERVAL 2 SECOND
)
user_id
其中,TUMBLE 為生成 滾動窗口 的函數,ROWTIME 為當前表的事件時間。
提示:上述操作需要在提交 SQL 任務時添加指定參數:time.characteristic: EventTime
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/126205.html
摘要:開發注意事項基于托管集群的應用開發,和自建集群類似,但是仍然有幾個地方需要注意。和默認配置托管集群默認指定以及的堆大小為,目前不支持進行更改單個中的數量設置為高可用配置應用的高可用由集群以及共同保證。配置集群的運行時狀態保存在的指定目錄中。UFlink開發注意事項基于UFlink托管集群的Flink應用開發,和自建集群類似,但是仍然有幾個地方需要注意。JobManager和TaskManag...
摘要:基于開發指南本節主要介紹如何創建項目,并開發簡單的應用,從而使該應用可以被提交到平臺運行。如果不設置為,可能會導致的類沖突,產生不可預見的問題。在自動生成的文件中,使用了來更方便的控制依賴的可見性。基于Maven開發指南本節主要介紹如何創建項目,并開發簡單的Flink應用,從而使該Flink應用可以被提交到UFlink平臺運行。==== 自動生成代碼框架 ==== 對于Java開發者,可以使...
摘要:什么是實時計算實時計算基于構建,為分布式高性能隨時可用以及準確的流處理應用程序提供流處理框架,可用于流式數據處理等應用場景。版本支持當前支持的版本為,,,可以在提交任務時選擇所使用的版本。什么是實時計算實時計算(UFlink)基于ApacheFlink構建,為分布式、高性能、隨時可用以及準確的流處理應用程序提供流處理框架,可用于流式數據處理等應用場景。產品優勢100%開源兼容基于開源社區版本...
摘要:基于開發指南如果基于進行應用開發,需要在文件中加入如下配置注解注意修改的值,確保其符合您的應用。應用開發完成后,可以直接直接運行方法,在本地進行基本的測試。基于gradle開發指南如果基于gradle進行應用開發,需要在build.gradle文件中加入如下配置:buildscript { repositories { jcenter() // this applie...
摘要:集群管理進入集群管理頁面通過集群列表頁面進入集群管理頁面獲取集群詳情通過集群列表的詳情按鈕進入詳情頁面調整集群大小點擊調整容量調整集群大小查看點擊詳情頁的按鈕查看查看任務歷史點擊詳情頁的按鈕查看歷史任務節點密碼重置點擊集群列表頁的集群管理1. 進入集群管理頁面通過UFlink集群列表頁面進入集群管理頁面:2. 獲取集群詳情通過集群列表的詳情按鈕進入詳情頁面:3. 調整集群大小點擊調整容量調整...
閱讀 3514·2023-04-25 20:09
閱讀 3720·2022-06-28 19:00
閱讀 3035·2022-06-28 19:00
閱讀 3058·2022-06-28 19:00
閱讀 3132·2022-06-28 19:00
閱讀 2859·2022-06-28 19:00
閱讀 3014·2022-06-28 19:00
閱讀 2610·2022-06-28 19:00