国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專欄INFORMATION COLUMN

日志服務Flink Connector《支持Exactly Once》

endiat / 1099人閱讀

摘要:是阿里云日志服務提供的,用于對接的工具,包括兩部分,消費者和生產者。子用戶使用需要授權如下幾個用于將數據寫到阿里云日志服務中。

摘要: Flink log connector是阿里云日志服務推出的,用于對接Flink的工具,包含兩塊,分別是消費者和生產者,消費者用于從日志服務中讀數據,支持exactly once語義,生產者用于將數據寫到日志服務中,該Connector隱藏了日志服務的一些概念,比如Shard的分裂合并等,用戶在使用時只需要專注在自己的業務邏輯即可。

阿里云日志服務是針對實時數據一站式服務,用戶只需要將精力集中在分析上,過程中數據采集、對接各種存儲計算、數據索引和查詢等瑣碎工作等都可以交給日志服務完成。

日志服務中最基礎的功能是LogHub,支持數據實時采集與消費,實時消費家族除 Spark Streaming、Storm、StreamCompute(Blink外),目前新增Flink啦。

Flink Connector
Flink log connector是阿里云日志服務提供的,用于對接flink的工具,包括兩部分,消費者(Consumer)和生產者(Producer)。

消費者用于從日志服務中讀取數據,支持exactly once語義,支持shard負載均衡.
生產者用于將數據寫入日志服務,使用connector時,需要在項目中添加maven依賴:


            org.apache.flink
            flink-streaming-java_2.11
            1.3.2


            com.aliyun.openservices
            flink-log-connector
            0.1.3


            com.google.protobuf
            protobuf-java
            2.5.0

 
            com.aliyun.openservices
            aliyun-log
            0.6.10
 

            com.aliyun.openservices
            log-loghub-producer
            0.1.8

代碼:Github

用法
請參考日志服務文檔,正確創建Logstore。
如果使用子賬號訪問,請確認正確設置了LogStore的RAM策略。參考授權RAM子用戶訪問日志服務資源。
1. Log Consumer
在Connector中, 類FlinkLogConsumer提供了訂閱日志服務中某一個LogStore的能力,實現了exactly once語義,在使用時,用戶無需關心LogStore中shard數
量的變化,consumer會自動感知。

flink中每一個子任務負責消費LogStore中部分shard,如果LogStore中shard發生split或者merge,子任務消費的shard也會隨之改變。

1.1 配置啟動參數

Properties configProps = new Properties();
// 設置訪問日志服務的域名
configProps.put(ConfigConstants.LOG_ENDPOINT, "cn-hangzhou.log.aliyuncs.com");
// 設置訪問ak
configProps.put(ConfigConstants.LOG_ACCESSSKEYID, "");
configProps.put(ConfigConstants.LOG_ACCESSKEY, "");
// 設置日志服務的project
configProps.put(ConfigConstants.LOG_PROJECT, "ali-cn-hangzhou-sls-admin");
// 設置日志服務的LogStore
configProps.put(ConfigConstants.LOG_LOGSTORE, "sls_consumergroup_log");
// 設置消費日志服務起始位置
configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION, Consts.LOG_END_CURSOR);
// 設置日志服務的消息反序列化方法
RawLogGroupListDeserializer deserializer = new RawLogGroupListDeserializer();
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream logTestStream = env.addSource(
        new FlinkLogConsumer(deserializer, configProps));

上面是一個簡單的消費示例,我們使用java.util.Properties作為配置工具,所有Consumer的配置都可以在ConfigConstants中找到。

注意,flink stream的子任務數量和日志服務LogStore中的shard數量是獨立的,如果shard數量多于子任務數量,每個子任務不重復的消費多個shard,如果少于,

那么部分子任務就會空閑,等到新的shard產生。

1.2 設置消費起始位置
Flink log consumer支持設置shard的消費起始位置,通過設置屬性ConfigConstants.LOG_CONSUMER_BEGIN_POSITION,就可以定制消費從shard的頭尾或者某個特定時間開始消費,具體取值如下:

Consts.LOG_BEGIN_CURSOR: 表示從shard的頭開始消費,也就是從shard中最舊的數據開始消費。
Consts.LOG_END_CURSOR: 表示從shard的尾開始,也就是從shard中最新的數據開始消費。
UnixTimestamp: 一個整型數值的字符串,用1970-01-01到現在的秒數表示, 含義是消費shard中這個時間點之后的數據。
三種取值舉例如下:

configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION, Consts.LOG_BEGIN_CURSOR);
configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION, Consts.LOG_END_CURSOR);
configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION, "1512439000");

1.3 監控:消費進度(可選)
Flink log consumer支持設置消費進度監控,所謂消費進度就是獲取每一個shard實時的消費位置,這個位置使用時間戳表示,詳細概念可以參考
文檔消費組-查看狀態,[消費組-監控報警
](https://help.aliyun.com/docum...。

configProps.put(ConfigConstants.LOG_CONSUMERGROUP, "your consumer group name”);

注意上面代碼是可選的,如果設置了,consumer會首先創建consumerGroup,如果已經存在,則什么都不做,consumer中的snapshot會自動同步到日志服務的consumerGroup中,用戶可以在日志服務的控制臺查看consumer的消費進度。

1.4 容災和exactly once語義支持
當打開Flink的checkpointing功能時,Flink log consumer會周期性的將每個shard的消費進度保存起來,當作業失敗時,flink會恢復log consumer,并
從保存的最新的checkpoint開始消費。

寫checkpoint的周期定義了當發生失敗時,最多多少的數據會被回溯,也就是重新消費,使用代碼如下:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 開啟flink exactly once語義
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 每5s保存一次checkpoint
env.enableCheckpointing(5000);

更多Flink checkpoint的細節請參考Flink官方文檔Checkpoints。

1.5 補充材料:關聯 API與權限設置
Flink log consumer 會用到的阿里云日志服務接口如下:

GetCursorOrData

用于從shard中拉數據, 注意頻繁的調用該接口可能會導致數據超過日志服務的shard quota, 可以通過ConfigConstants.LOG_FETCH_DATA_INTERVAL_MILLIS和ConfigConstants.LOG_MAX_NUMBER_PER_FETCH
控制接口調用的時間間隔和每次調用拉取的日志數量,shard的quota參考文章[shard簡介](https://help.aliyun.com/document_detail/28976.html).
configProps.put(ConfigConstants.LOG_FETCH_DATA_INTERVAL_MILLIS, "100");
configProps.put(ConfigConstants.LOG_MAX_NUMBER_PER_FETCH, "100");

ListShards

 用于獲取logStore中所有的shard列表,獲取shard狀態等.如果您的shard經常發生分裂合并,可以通過調整接口的調用周期來及時發現shard的變化。
// 設置每30s調用一次ListShards
configProps.put(ConfigConstants.LOG_SHARDS_DISCOVERY_INTERVAL_MILLIS, "30000");

CreateConsumerGroup

該接口調用只有當設置消費進度監控時才會發生,功能是創建consumerGroup,用于同步checkpoint。

ConsumerGroupUpdateCheckPoint

該接口用戶將flink的snapshot同步到日志服務的consumerGroup中。

子用戶使用Flink log consumer需要授權如下幾個RAM Policy:

Log Producer

FlinkLogProducer 用于將數據寫到阿里云日志服務中。

注意producer只支持Flink at-least-once語義,這就意味著在發生作業失敗的情況下,寫入日志服務中的數據有可能會重復,但是絕對不會丟失。

用法示例如下,我們將模擬產生的字符串寫入日志服務:

// 將數據序列化成日志服務的數據格式
class SimpleLogSerializer implements LogSerializationSchema {

    public RawLogGroup serialize(String element) {
        RawLogGroup rlg = new RawLogGroup();
        RawLog rl = new RawLog();
        rl.setTime((int)(System.currentTimeMillis() / 1000));
        rl.addContent("message", element);
        rlg.addLog(rl);
        return rlg;
    }
}
public class ProducerSample {
    public static String sEndpoint = "cn-hangzhou.log.aliyuncs.com";
    public static String sAccessKeyId = "";
    public static String sAccessKey = "";
    public static String sProject = "ali-cn-hangzhou-sls-admin";
    public static String sLogstore = "test-flink-producer";
    private static final Logger LOG = LoggerFactory.getLogger(ConsumerSample.class);


    public static void main(String[] args) throws Exception {

        final ParameterTool params = ParameterTool.fromArgs(args);
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().setGlobalJobParameters(params);
        env.setParallelism(3);

        DataStream simpleStringStream = env.addSource(new EventsGenerator());

        Properties configProps = new Properties();
        // 設置訪問日志服務的域名
        configProps.put(ConfigConstants.LOG_ENDPOINT, sEndpoint);
        // 設置訪問日志服務的ak
        configProps.put(ConfigConstants.LOG_ACCESSSKEYID, sAccessKeyId);
        configProps.put(ConfigConstants.LOG_ACCESSKEY, sAccessKey);
        // 設置日志寫入的日志服務project
        configProps.put(ConfigConstants.LOG_PROJECT, sProject);
        // 設置日志寫入的日志服務logStore
        configProps.put(ConfigConstants.LOG_LOGSTORE, sLogstore);

        FlinkLogProducer logProducer = new FlinkLogProducer(new SimpleLogSerializer(), configProps);

        simpleStringStream.addSink(logProducer);

        env.execute("flink log producer");
    }
    // 模擬產生日志
    public static class EventsGenerator implements SourceFunction {
        private boolean running = true;

        @Override
        public void run(SourceContext ctx) throws Exception {
            long seq = 0;
            while (running) {
                Thread.sleep(10);
                ctx.collect((seq++) + "-" + RandomStringUtils.randomAlphabetic(12));
            }
        }

        @Override
        public void cancel() {
            running = false;
        }
    }
}

2.1 初始化
Producer初始化主要需要做兩件事情:

初始化配置參數Properties, 這一步和Consumer類似, Producer有一些定制的參數,一般情況下使用默認值即可,特殊場景可以考慮定制:

// 用于發送數據的io線程的數量,默認是8
ConfigConstants.LOG_SENDER_IO_THREAD_COUNT
// 該值定義日志數據被緩存發送的時間,默認是3000
ConfigConstants.LOG_PACKAGE_TIMEOUT_MILLIS
// 緩存發送的包中日志的數量,默認是4096
ConfigConstants.LOG_LOGS_COUNT_PER_PACKAGE
// 緩存發送的包的大小,默認是3Mb
ConfigConstants.LOG_LOGS_BYTES_PER_PACKAGE
// 作業可以使用的內存總的大小,默認是100Mb
ConfigConstants.LOG_MEM_POOL_BYTES
上述參數不是必選參數,用戶可以不設置,直接使用默認值。

重載LogSerializationSchema,定義將數據序列化成RawLogGroup的方法。

RawLogGroup是log的集合,每個字段的含義可以參考文檔[日志數據模型](https://help.aliyun.com/document_detail/29054.html)。

如果用戶需要使用日志服務的shardHashKey功能,指定數據寫到某一個shard中,可以使用LogPartitioner產生數據的hashKey,用法例子如下:

FlinkLogProducer logProducer = new FlinkLogProducer(new SimpleLogSerializer(), configProps);
logProducer.setCustomPartitioner(new LogPartitioner() {
            // 生成32位hash值
            public String getHashKey(String element) {
                try {
                    MessageDigest md = MessageDigest.getInstance("MD5");
                    md.update(element.getBytes());
                    String hash = new BigInteger(1, md.digest()).toString(16);
                    while(hash.length() < 32) hash = "0" + hash;
                    return hash;
                } catch (NoSuchAlgorithmException e) {
                }
                return  "0000000000000000000000000000000000000000000000000000000000000000";
            }
        });

注意LogPartitioner是可選的,不設置情況下, 數據會隨機寫入某一個shard。

2.2 權限設置:RAM Policy
Producer依賴日志服務的API寫數據,如下:

log:PostLogStoreLogs
log:ListShards
當RAM子用戶使用Producer時,需要對上述兩個API進行授權:

文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。

轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/51577.html

相關文章

  • Flink實戰(八) - Streaming Connectors 編程

    摘要:默認情況下,當數據元到達時,分段接收器將按當前系統時間拆分,并使用日期時間模式命名存儲區。如果需要,可以使用數據元或元組的屬性來確定目錄。這將調用傳入的數據元并將它們寫入部分文件,由換行符分隔。消費者的消費者被稱為或等。 1 概覽 1.1 預定義的源和接收器 Flink內置了一些基本數據源和接收器,并且始終可用。該預定義的數據源包括文件,目錄和插socket,并從集合和迭代器攝取數據...

    beita 評論0 收藏0
  • OPPO數據中臺之基石:基于Flink SQL構建實數據倉庫

    摘要:實際上,本身就預留了與外部元數據對接的能力,分別提供了和這兩個抽象。對接外部數據源搞清楚了注冊庫表的過程,給我們帶來這樣一個思路如果外部元數據創建的表也能被轉換成可識別的,那么就能被無縫地注冊到。 本文整理自 2019 年 4 月 13 日在深圳舉行的 Flink Meetup 會議,分享嘉賓張俊,目前擔任 OPPO 大數據平臺研發負責人,也是 Apache Flink contrib...

    jeffrey_up 評論0 收藏0
  • Flink1.7穩定版發布:新增功能為企業生產帶來哪些好處

    摘要:通過狀態演變,可以在狀態模式中添加或刪除列,以便更改應用程序部署后應捕獲的業務功能。本地恢復通過擴展的調度來完成本地恢復功能,以便在恢復時考慮先前的部署位置。此功能大大提高了恢復速度。問題導讀1.Flink1.7開始支持Scala哪個版本?2.Flink1.7狀態演變在實際生產中有什么好處?3.支持SQL/Table API中的富集連接可以做那些事情?4.Flink1.7新增了哪些連接器Ap...

    Hwg 評論0 收藏0
  • 從 Spark Streaming 到 Apache Flink : 實時數據流在愛奇藝的演進

    摘要:在移動端,愛奇藝月度總有效時長億小時,穩居中國榜第三名。愛奇藝的峰值事件數達到萬秒,在正確性容錯性能延遲吞吐量擴展性等方面均遇到不小的挑戰。從到愛奇藝主要使用的是和來進行流式計算。作者:陳越晨 整理:劉河 本文將為大家介紹Apache Flink在愛奇藝的生產與實踐過程。你可以借此了解到愛奇藝引入Apache Flink的背景與挑戰,以及平臺構建化流程。主要內容如下: 愛奇藝在實時計算方...

    econi 評論0 收藏0

發表評論

0條評論

endiat

|高級講師

TA的文章

閱讀更多
最新活動
閱讀需要支付1元查看
<