国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專欄INFORMATION COLUMN

大數據開發系列二:自定義flink Metric kafka Reporter

IT那活兒 / 2350人閱讀
大數據開發系列二:自定義flink Metric kafka Reporter

點擊上方“IT那活兒”,關注后了解更多內容,不管IT什么活兒,干就完了!!!

我們通常在實現大數據場景的時候,需要分析任務上線后內部性能建康狀況,比如cpu使用比率,內存使用率,checkpoint生成情況,任務是否異常等,像這些指標通過人肉運維方式是無法實時捕獲異常信息的,所以需要一套實時監測體系來監測任務運行狀態,通過實時獲取相關指標數據,對數據進行規則告警,可以更好的分析定位問題。


基于這個需求,flink本身提供了很多的任務運行時刻Metrics相關指標,避免任務的運行處于黑盒狀態,通過分析指標,可以快速的調整任務的資源、定位遇到的問題。目前獲取 任務Metrics 有三種方式:

方式一:

通過flink  WebUI 進入Metrics 選項卡,根據不同算子,選擇需要監測指標,實時查看指標數據,缺點比較明顯,無法查看歷史監測數據,需要一直打開,并且無法設置告警,適合開發過程時使用。

方式二:

官方提供了一種通過 REST API獲取方式指標的方式,

https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/ops/rest_api/,提供的api 主要是面向 JobManager 對象的相關動作api, 主要用于任務提交等操作,但所提供TaskManager對象指標信息很少。通過此方式獲取相應的指標,需要另外開發一套自動化腳本或者程序,定期調用api 獲取相關信息,前題條件是需要提前規則監測的指標及對應的請求的api 地址,增大了系統復雜程度。

方式三:

flink 提供了一種MetricsReporter機制,可以將各個組件的metrics數據,通過不同的Metric Reporter插件將數據自動暴露給外部系統,這樣可以充份利用使用第三方的存儲和分析能力。

目前flink已經支持了很多reporter,如Graphite、JMX、InfluxDB、Prometheus等,不管用哪一處方式,都需要額外部署第三方系統來,進行接收、解析、分析metric數據。

我們本身已有了自動化運維平臺,不會考慮部署像Prometheus這樣的第三方平臺,需要做的是如何將metric數據跟自動運維平臺告警模塊進行對接使用,告警模塊主要是通過kafka進行對接數據,所以采用自定義kafka reporter 解決數據對接問題。

flink metrices指標項比較多,指標數據量級跟所跑的任務個數有著直接的關系,我們關注的核心指標項,對核心指標進行規則告警,接下來介紹如何基于flink 現有的reporter 代碼實現kafka reporter功能點

1. 下載對應版本flink 分支代碼,如

https://github.com/apache/flink/releases/tag/release-1.13.6

2. 解壓源代碼,導入開發工具,查看flink-metrics模塊代碼

3. 根據自己實際場景,對吐數據格式要求,參考不同自帶模板代碼,以flink-metrics-InfluxDB模塊代碼為例,新建flink-metrics-kafka

3.1 修改flink-metrics/pom.xml 文件,新增flink-metrics-kafka

3.2 新增KafkaReporterFactory主程序,需要實現MetricReporterFactory接口并重寫方法。

@InterceptInstantiationViaReflection(
        reporterClassName = "org.apache.flink.metrics.kafka.KafkaReporter")
public class KafkaReporterFactory implements MetricReporterFactory {

    @Override
    public MetricReporter createMetricReporter(Properties properties) {
        return new KafkaReporter();
    }
}

3.3 新增KafkaReporter實現類,需要繼承AbstractReporter并實現Scheduled接口并重寫方法,主要作用是收集指標數據,并推送到kafka。

  • 讀取配置flink配置文件conf/flink-conf.yaml kakfa 服務器地址及topic地址,初始化KafkaProducer消息生產者對象。

@Override
public void open(MetricConfig metricConfig) {
    LOG.info("metricConfig:" + metricConfig);
    topic = metricConfig.getString("topic", "");
    if (StringUtils.isBlank(topic)) {
        LOG.error("metrics.reporter.kafka_reporter.topic is null");
    }
    String endsWithMetric = metricConfig.getString("endswith.metricname", "").trim(); //指定需要獲取指標名稱
    endsWithMetricList = Arrays.asList(endsWithMetric.split(","));
    String bootstrapservers = metricConfig.getString("bootstrap.servers", "");
    if (StringUtils.isBlank(bootstrapservers)) {
        LOG.error("metrics.reporter.kafka_reporter.bootstrap.servers is null");
    }
    Properties properties = new Properties();
    properties.put("bootstrap.servers", bootstrapservers);
    properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    properties.put(
            "value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    kafkaProducer = new KafkaProducer<String, String>(properties);
}
  • 指標數據的類型及格式是根據它所歸屬metrics類型(Counters/Gauges/Histograms/Meters)有關,然后我們可以對指標數據進行格式化輸出所需要格式到kafka。

① Counters: 統計的是一個累加值,用與存儲數值類型指標數據。

② Gauges:用來存儲任何類型指標數據。

③ Histograms:度量值的統計結果,如平均值、最大值等。

④ Meters:用來計算平均速率,平均吞吐量等。

@Override
public void report() 
{
    tryReport();
}
private final ObjectMapper mapper = new ObjectMapper();
private void tryReport() {
    Instant timestamp = Instant.now();
    try {
        String job_id = "";
        String job_name = "";
        List metriclist = new ArrayList<>();
        metriclist.addAll(gauges.values()); //獲取gauges類型指標集
        metriclist.addAll(counters.values());//獲取gauges類型指標集
        metriclist.addAll(histograms.values());//獲取histograms類型指標集
        metriclist.addAll(meters.values());//獲取meters類型指標集
//每個指標數據里面都加上對應的job_id, job_name
        for (MeasurementInfo info : metriclist) {
            if (info.getName().startsWith("jobmanager_job_")
                    || info.getName().startsWith("taskmanager_job_")) {
                job_id = info.getTags().getOrDefault(JOB_ID, "");
                job_name = info.getTags().getOrDefault(JOB_NAME, "");
                if (StringUtils.isBlank(job_id) || StringUtils.isBlank(job_name)) {
                    LOG.error("do not get job_id or job name:{}", info);
                }
                break;
            }
        }
        List list = new ArrayList<>();
//根據不同metrices類型遍歷指標數據,只獲取指定的指標項數據
        for (Map.Entry, MeasurementInfo> entry : gauges.entrySet()) {
            boolean flag =
                    endsWithMetricList.stream()
                            .anyMatch(
                                    endWith ->
                                            entry.getValue()
                                                    .getName()
                                                    .endsWith(endWith.trim()));
            if (flag) {
                list.add(
                        getPointMap(
                                MetricMapper.map(entry.getValue(), timestamp, entry.getKey()),
                                job_id,
                                job_name));
            }
        }
        for (Map.Entry entry : counters.entrySet()) {
            boolean flag =
                    endsWithMetricList.stream()
                            .anyMatch(
                                    endWith ->
                                            entry.getValue()
                                                    .getName()
                                                    .endsWith(endWith.trim()));
            if (flag) {
                list.add(
                        getPointMap(
                                MetricMapper.map(entry.getValue(), timestamp, entry.getKey()),
                                job_id,
                                job_name));
            }
        }
        for (Map.Entry entry : histograms.entrySet()) {
            boolean flag =
                    endsWithMetricList.stream()
                            .anyMatch(
                                    endWith ->
                                            entry.getValue()
                                                    .getName()
                                                    .endsWith(endWith.trim()));
            if (flag) {
                list.add(
                        getPointMap(
                                MetricMapper.map(entry.getValue(), timestamp, entry.getKey()),
                                job_id,
                                job_name));
            }
        }
        for (Map.Entry entry : meters.entrySet()) {
            boolean flag =
                    endsWithMetricList.stream()
                            .anyMatch(
                                    endWith ->
                                            entry.getValue()
                                                    .getName()
                                                    .endsWith(endWith.trim()));
            if (flag) {
                list.add(
                        getPointMap(
                                MetricMapper.map(entry.getValue(), timestamp, entry.getKey()),
                                job_id,
                                job_name));
            }
        }
        if (list.size() > 0) {
            ProducerRecord record =
                    new ProducerRecord(
                            topic, null, mapper.writeValueAsString(list));
            kafkaProducer.send(record); //發送組裝好指標數據到kafka
        }

    } catch (ConcurrentModificationException
            | NoSuchElementException
            | JsonProcessingException e) {
               LOG.error(e.getMessage());
        return;
    }
}

新增flink-metrics-kafka項目有兩種打包方式:

  • 基于完整的flink源碼項目,進行全量打包。

  • 保留flink maven父子結構,flink parent pom.xml 只留snc-flink-metrics編譯成功后,將snc-flink-metrics.jar 放入flink/lib 目錄下。

3.4 修改flink配置文件conf/flink-conf.yaml,主要包括Reporter全類名,上報周期,指定所需的指標名。

  • metrics.reporterskafka_reporter

  • metrics.reporter.kafka_reporter.factory.classorg.apache.flink.metrics.kafka.KafkaReporterFactory

  • metrics.reporter.kafka_reporter.interval60 SECONDS

#kafka地址

  • metrics.reporter.kafka_reporter.bootstrap.serversXXX.XXX.XXX.10:9090

#kafkatopic

  • metrics.reporter.kafka_reporter.topickafka_topic

#指標名稱按后綴進行過濾,注釋則不過濾

  • metrics.reporter.kafka_reporter.endswith.metricnamejob_numRestarts,job_restartingTime,job_uptime,currentOutputWatermark,Status_JVM_CPU_Load,Status_JVM_Memory_Heap_Used

3.5 提交任務,消費kafka 可以獲取對應的數據。

[
  {
    "name": "jobmanager.uptime",
    "time": 1647314569119,
    "fields": {
      "value": 1703478823
    },
    "tags": {
      "host": "bigdata-03",
      "job_id": "dc7a58b3f202059cd72c3467ecedb4b7",
      "job_name": "amp_zabbix_pre"
    }
  },
  {
    "name": "jobmanager.Status.JVM.Memory.Heap.Max",
    "time": 1647314569119,
    "fields": {
      "value": 468713472
    },
    "tags": {
      "host": "bigdata-03",
      "job_id": "",
      "job_name": ""
    }
  },
  ...
]




本文作者:暨景書 vs 郭 憲

本文來源:IT那活兒(上海新炬王翦團隊)

文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。

轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/129566.html

相關文章

  • 《從0到1學習Flink》—— 如何定義 Data Source ?

    摘要:從上面自定義的可以看到我們繼承的就是這個類,那么來了解一下一個抽象類,繼承自。該類的子類有三個,兩個是抽象類,在此基礎上提供了更具體的實現,另一個是。 showImg(https://segmentfault.com/img/remote/1460000016978900?w=1920&h=1641); 前言 在 《從0到1學習Flink》—— Data Source 介紹 文章中,我...

    songze 評論0 收藏0
  • 《從0到1學習Flink》—— Flink 寫入數據到 ElasticSearch

    摘要:從到學習介紹從到學習介紹其中包括了和的,后面我也講了下如何自定義自己的和。這個問題可是線上很容易遇到的關注我轉載請務必注明原創地址為微信公眾號另外我自己整理了些的學習資料,目前已經全部放到微信公眾號了。 showImg(https://segmentfault.com/img/remote/1460000017935460?w=1280&h=853); 前言 前面 FLink 的文章中...

    W4n9Hu1 評論0 收藏0

發表評論

0條評論

IT那活兒

|高級講師

TA的文章

閱讀更多
最新活動
閱讀需要支付1元查看
<