国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專(zhuān)欄INFORMATION COLUMN

Kubernetes監(jiān)控之Heapster源碼分析

gclove / 1378人閱讀

摘要:源碼版本簡(jiǎn)介是下的一個(gè)監(jiān)控項(xiàng)目,用于進(jìn)行容器集群的監(jiān)控和性能分析。基本的功能及概念介紹可以回顧我之前的一篇文章監(jiān)控之介紹。在源碼分析之前我們先介紹的實(shí)現(xiàn)流程,由上圖可以看出會(huì)從各個(gè)上獲取相關(guān)的監(jiān)控信息,然后進(jìn)行匯總發(fā)送給后臺(tái)數(shù)據(jù)庫(kù)。

源碼版本

heapster version: release-1.2

簡(jiǎn)介

Heapster是Kubernetes下的一個(gè)監(jiān)控項(xiàng)目,用于進(jìn)行容器集群的監(jiān)控和性能分析。
基本的功能及概念介紹可以回顧我之前的一篇文章:《Kubernetes監(jiān)控之Heapster介紹》。
隨著的Heapster的版本迭代,支持的功能越越來(lái)越多,比如新版本支持更多的后端數(shù)據(jù)存儲(chǔ)方式:OpenTSDB、Monasca、Kafka、Elasticsearch等等。看過(guò)低版本(如v0.18)的源碼,會(huì)發(fā)現(xiàn)v1.2版本的源碼架構(gòu)完全變了樣,架構(gòu)擴(kuò)展性越來(lái)越強(qiáng),源碼學(xué)無(wú)止境!
上面很多介紹這篇文章并不會(huì)涉及,我們還是會(huì)用到最流行的模式:Heapster + InfluxDB。

監(jiān)控系統(tǒng)架構(gòu)圖:

該圖很好的描述了監(jiān)控系統(tǒng)的關(guān)鍵組件,及數(shù)據(jù)流向。
在源碼分析之前我們先介紹Heapster的實(shí)現(xiàn)流程,由上圖可以看出Heapster會(huì)從各個(gè)Node上kubelet獲取相關(guān)的監(jiān)控信息,然后進(jìn)行匯總發(fā)送給后臺(tái)數(shù)據(jù)庫(kù)InfluxDB。
這里會(huì)涉及到幾個(gè)關(guān)鍵點(diǎn):

k8s集群會(huì)增刪Nodes,Heapster需要獲取這些sources并做相應(yīng)的操作

Heapster后端數(shù)據(jù)庫(kù)怎么存儲(chǔ)?是否支持多后端?

Heapster獲取到數(shù)據(jù)后推送給后端數(shù)據(jù)庫(kù),那么其提供了API的數(shù)據(jù)該從何處獲取?本地cache?

Heapster從kubelet獲取到的數(shù)據(jù)是否需要處理?還是能直接存儲(chǔ)到后端

等等..

一起分析完heapster源碼實(shí)現(xiàn),就能進(jìn)行解惑了。

啟動(dòng)命令

先列出我解析源碼時(shí)所用的命令,及參數(shù)使用,便于后面的理解。

# heapster --source=kubernetes:http://:8080?inClusterConfig=false&useServiceAccount=false --sink=influxdb:http://:8086
啟動(dòng)流程

從Heapster的啟動(dòng)流程開(kāi)始分析其實(shí)現(xiàn),前面做了簡(jiǎn)單的分析,可以帶著問(wèn)題去看源碼會(huì)有更好的收獲。

main()

路徑: heapster/metrics/heapster.go

func main() {
    ...
    // 根據(jù)--source參數(shù)的輸入來(lái)創(chuàng)建數(shù)據(jù)源
    // 我們這里會(huì)使用kubernetes,下面會(huì)根據(jù)k8s來(lái)解析
    sourceFactory := sources.NewSourceFactory()
    // 創(chuàng)建該sourceProvider時(shí),會(huì)創(chuàng)建Node的ListWatch,用于監(jiān)控k8s節(jié)點(diǎn)的增刪情況,因?yàn)檫@些才是數(shù)據(jù)的真實(shí)來(lái)源.
    // 該sourceProvider會(huì)包含nodeLister,還有kubeletClient,用于跟各個(gè)節(jié)點(diǎn)的kubelet通信,獲取cadvisor數(shù)據(jù)
    sourceProvider, err := sourceFactory.BuildAll(argSources)
    if err != nil {
        glog.Fatalf("Failed to create source provide: %v", err)
    }
    // 創(chuàng)建sourceManager,其實(shí)就是sourceProvider + ScrapeTimeout,用于超時(shí)獲取數(shù)據(jù)
    sourceManager, err := sources.NewSourceManager(sourceProvider, sources.DefaultMetricsScrapeTimeout)
    if err != nil {
        glog.Fatalf("Failed to create source manager: %v", err)
    }

    // 根據(jù)--sink創(chuàng)建數(shù)據(jù)存儲(chǔ)后端
    // 我們這里會(huì)使用influxDB,來(lái)作為數(shù)據(jù)的存儲(chǔ)后端
    sinksFactory := sinks.NewSinkFactory()
    // 創(chuàng)建sinks時(shí)會(huì)返回各類(lèi)對(duì)象:
    // metricSink: 可以理解為本地的metrics數(shù)據(jù)池,Heapster API獲取到的數(shù)據(jù)都是從該對(duì)象中獲取的,默認(rèn)一定會(huì)創(chuàng)建
    // sinkList: Heapster在新版本中支持多后端數(shù)據(jù)存儲(chǔ),比如你可以指定多個(gè)不同的influxDB,也可以同時(shí)指定influxDB和Elasticsearch。
    // historicalSource: 需要配置,我們暫時(shí)沒(méi)有用到
    metricSink, sinkList, historicalSource := sinksFactory.BuildAll(argSinks, *argHistoricalSource)
    if metricSink == nil {
        glog.Fatal("Failed to create metric sink")
    }
    if historicalSource == nil && len(*argHistoricalSource) > 0 {
        glog.Fatal("Failed to use a sink as a historical metrics source")
    }
    for _, sink := range sinkList {
        glog.Infof("Starting with %s", sink.Name())
    }
    // 創(chuàng)建sinkManager,會(huì)根據(jù)之前的sinkList,創(chuàng)建對(duì)應(yīng)數(shù)量的協(xié)程,用于從sink的數(shù)據(jù)管道中獲取數(shù)據(jù),然后推送到對(duì)應(yīng)的后端
    sinkManager, err := sinks.NewDataSinkManager(sinkList, sinks.DefaultSinkExportDataTimeout, sinks.DefaultSinkStopTimeout)
    if err != nil {
        glog.Fatalf("Failed to created sink manager: %v", err)
    }

    // 創(chuàng)建對(duì)象,用于處理各個(gè)kubelet獲取到的metrics數(shù)據(jù)
    // 最終都會(huì)加入到dataProcessors,在最終的處理函數(shù)中會(huì)進(jìn)行遍歷并調(diào)用其process()
    metricsToAggregate := []string{
        core.MetricCpuUsageRate.Name,
        core.MetricMemoryUsage.Name,
        core.MetricCpuRequest.Name,
        core.MetricCpuLimit.Name,
        core.MetricMemoryRequest.Name,
        core.MetricMemoryLimit.Name,
    }

    metricsToAggregateForNode := []string{
        core.MetricCpuRequest.Name,
        core.MetricCpuLimit.Name,
        core.MetricMemoryRequest.Name,
        core.MetricMemoryLimit.Name,
    }
    // 速率計(jì)算對(duì)象
    dataProcessors := []core.DataProcessor{
        // Convert cumulaties to rate
        processors.NewRateCalculator(core.RateMetricsMapping),
    }

    kubernetesUrl, err := getKubernetesAddress(argSources)
    if err != nil {
        glog.Fatalf("Failed to get kubernetes address: %v", err)
    }

    kubeConfig, err := kube_config.GetKubeClientConfig(kubernetesUrl)
    if err != nil {
        glog.Fatalf("Failed to get client config: %v", err)
    }
    kubeClient := kube_client.NewOrDie(kubeConfig)
    // 會(huì)創(chuàng)建podLister、nodeLister、namespaceLister,用于從k8s watch各個(gè)資源的增刪情況
    // 防止獲取數(shù)據(jù)失敗
    podLister, err := getPodLister(kubeClient)
    if err != nil {
        glog.Fatalf("Failed to create podLister: %v", err)
    }
    nodeLister, err := getNodeLister(kubeClient)
    if err != nil {
        glog.Fatalf("Failed to create nodeLister: %v", err)
    }

    podBasedEnricher, err := processors.NewPodBasedEnricher(podLister)
    if err != nil {
        glog.Fatalf("Failed to create PodBasedEnricher: %v", err)
    }
    dataProcessors = append(dataProcessors, podBasedEnricher)

    namespaceBasedEnricher, err := processors.NewNamespaceBasedEnricher(kubernetesUrl)
    if err != nil {
        glog.Fatalf("Failed to create NamespaceBasedEnricher: %v", err)
    }
    dataProcessors = append(dataProcessors, namespaceBasedEnricher)

    // 這里的對(duì)象append順序會(huì)有一定的要求
    // 比如Pod的有些數(shù)據(jù)需要進(jìn)行containers數(shù)據(jù)的累加得到
    dataProcessors = append(dataProcessors,
        processors.NewPodAggregator(),
        &processors.NamespaceAggregator{
            MetricsToAggregate: metricsToAggregate,
        },
        &processors.NodeAggregator{
            MetricsToAggregate: metricsToAggregateForNode,
        },
        &processors.ClusterAggregator{
            MetricsToAggregate: metricsToAggregate,
        })

    nodeAutoscalingEnricher, err := processors.NewNodeAutoscalingEnricher(kubernetesUrl)
    if err != nil {
        glog.Fatalf("Failed to create NodeAutoscalingEnricher: %v", err)
    }
    dataProcessors = append(dataProcessors, nodeAutoscalingEnricher)

    // 這是整個(gè)Heapster功能的關(guān)鍵處
    // 根據(jù)sourceManger、sinkManager、dataProcessors來(lái)創(chuàng)建manager對(duì)象
    manager, err := manager.NewManager(sourceManager, dataProcessors, sinkManager, *argMetricResolution,
        manager.DefaultScrapeOffset, manager.DefaultMaxParallelism)
    if err != nil {
        glog.Fatalf("Failed to create main manager: %v", err)
    }
    // 開(kāi)始創(chuàng)建協(xié)程,從各個(gè)sources獲取metrics數(shù)據(jù),并經(jīng)過(guò)dataProcessors的處理,然后export到各個(gè)用于后端數(shù)據(jù)存儲(chǔ)的sinks
    manager.Start()

    // 以下的就是創(chuàng)建Heapster server,用于提供各類(lèi)API
    // 通過(guò)http.mux及go-restful進(jìn)行實(shí)現(xiàn)
    // 新版的heapster還支持TLS
    handler := setupHandlers(metricSink, podLister, nodeLister, historicalSource)
    addr := fmt.Sprintf("%s:%d", *argIp, *argPort)
    glog.Infof("Starting heapster on port %d", *argPort)

    mux := http.NewServeMux()
    promHandler := prometheus.Handler()
    if len(*argTLSCertFile) > 0 && len(*argTLSKeyFile) > 0 {
        if len(*argTLSClientCAFile) > 0 {
            authPprofHandler, err := newAuthHandler(handler)
            if err != nil {
                glog.Fatalf("Failed to create authorized pprof handler: %v", err)
            }
            handler = authPprofHandler

            authPromHandler, err := newAuthHandler(promHandler)
            if err != nil {
                glog.Fatalf("Failed to create authorized prometheus handler: %v", err)
            }
            promHandler = authPromHandler
        }
        mux.Handle("/", handler)
        mux.Handle("/metrics", promHandler)
        healthz.InstallHandler(mux, healthzChecker(metricSink))

        // If allowed users is set, then we need to enable Client Authentication
        if len(*argAllowedUsers) > 0 {
            server := &http.Server{
                Addr:      addr,
                Handler:   mux,
                TLSConfig: &tls.Config{ClientAuth: tls.RequestClientCert},
            }
            glog.Fatal(server.ListenAndServeTLS(*argTLSCertFile, *argTLSKeyFile))
        } else {
            glog.Fatal(http.ListenAndServeTLS(addr, *argTLSCertFile, *argTLSKeyFile, mux))
        }

    } else {
        mux.Handle("/", handler)
        mux.Handle("/metrics", promHandler)
        healthz.InstallHandler(mux, healthzChecker(metricSink))

        glog.Fatal(http.ListenAndServe(addr, mux))
    }
}

介紹了Heapster的啟動(dòng)流程后,大致能明白了該啟動(dòng)過(guò)程分為幾個(gè)關(guān)鍵點(diǎn):

創(chuàng)建數(shù)據(jù)源對(duì)象

創(chuàng)建后端存儲(chǔ)對(duì)象list

創(chuàng)建處理metrics數(shù)據(jù)的processors

創(chuàng)建manager,并開(kāi)啟數(shù)據(jù)的獲取及export的協(xié)程

開(kāi)啟Heapster server,并支持各類(lèi)API

下面進(jìn)行一一介紹。

創(chuàng)建數(shù)據(jù)源

先介紹下相關(guān)的結(jié)構(gòu)體,因?yàn)檫@才是作者的核心思想。
創(chuàng)建的sourceProvider是實(shí)現(xiàn)了MetricsSourceProvider接口的對(duì)象。
先看下MetricsSourceProvider:

type MetricsSourceProvider interface {
    GetMetricsSources() []MetricsSource
}

每個(gè)最終返回的對(duì)象,都需要提供GetMetricsSources(),看字面意識(shí)就可以知道就是提供所有的獲取Metrics源頭的接口。
我們的參數(shù)--source=kubernetes,所以其實(shí)我們真實(shí)返回的結(jié)構(gòu)是kubeletProvider.
路徑: heapster/metrics/sources/kubelet/kubelet.go

type kubeletProvider struct {
    // 用于從k8s獲取最新的nodes信息,然后根據(jù)kubeletClient,合成各個(gè)metricSources
    nodeLister    *cache.StoreToNodeLister
    // 反射
    reflector     *cache.Reflector
    // kubeletClient相關(guān)的配置,比如端口:10255
    kubeletClient *KubeletClient
}

結(jié)構(gòu)介紹完了,看下具體的創(chuàng)建過(guò)程,跟kubernetes相關(guān)的關(guān)鍵接口是NewKubeletProvider():

func NewKubeletProvider(uri *url.URL) (MetricsSourceProvider, error) {
    // 創(chuàng)建kubernetes master及kubelet client相關(guān)的配置
    kubeConfig, kubeletConfig, err := GetKubeConfigs(uri)
    if err != nil {
        return nil, err
    }
    // 創(chuàng)建kubeClient及kubeletClient
    kubeClient := kube_client.NewOrDie(kubeConfig)
    kubeletClient, err := NewKubeletClient(kubeletConfig)
    if err != nil {
        return nil, err
    }

    // 獲取下所有的Nodes,測(cè)試下創(chuàng)建的client是否能正常通訊
    if _, err := kubeClient.Nodes().List(kube_api.ListOptions{
        LabelSelector: labels.Everything(),
        FieldSelector: fields.Everything()}); err != nil {
        glog.Errorf("Failed to load nodes: %v", err)
    }

    // 監(jiān)控k8s的nodes變更
    // 這里會(huì)創(chuàng)建協(xié)程進(jìn)行watch,便于后面調(diào)用nodeLister.List()列出所有的nodes。
    // 該Watch的實(shí)現(xiàn),需要看下apiServer中的實(shí)現(xiàn),后面會(huì)進(jìn)行講解
    lw := cache.NewListWatchFromClient(kubeClient, "nodes", kube_api.NamespaceAll, fields.Everything())
    nodeLister := &cache.StoreToNodeLister{Store: cache.NewStore(cache.MetaNamespaceKeyFunc)}
    reflector := cache.NewReflector(lw, &kube_api.Node{}, nodeLister.Store, time.Hour)
    reflector.Run()
    // 結(jié)構(gòu)在前面介紹過(guò)
    return &kubeletProvider{
        nodeLister:    nodeLister,
        reflector:     reflector,
        kubeletClient: kubeletClient,
    }, nil
}

該過(guò)程會(huì)涉及到較多的技術(shù)點(diǎn),比如apiServer中的watch實(shí)現(xiàn),reflector的使用。這里不會(huì)進(jìn)行細(xì)講,該文章主要是針對(duì)heapster的源碼實(shí)現(xiàn),apiServer相關(guān)的實(shí)現(xiàn)后面會(huì)進(jìn)行多帶帶輸出。

這里需要注意的是創(chuàng)建了ListWath,需要關(guān)注后面哪里用到了nodeLister.List()進(jìn)行nodes的獲取。

創(chuàng)建后端服務(wù)

前面已經(jīng)提到后端數(shù)據(jù)存儲(chǔ)會(huì)有兩處,一個(gè)是metricSink,另一個(gè)是influxdbSink。所以這里會(huì)涉及到兩個(gè)結(jié)構(gòu):

type MetricSink struct {
    // 鎖
    lock sync.Mutex

    // 長(zhǎng)時(shí)間存儲(chǔ)metrics數(shù)據(jù),默認(rèn)時(shí)間是15min
    longStoreMetrics   []string
    longStoreDuration  time.Duration
    // 短時(shí)間存儲(chǔ)metrics數(shù)據(jù),默認(rèn)時(shí)間是140s
    shortStoreDuration time.Duration

    // 短時(shí)存儲(chǔ)空間
    shortStore []*core.DataBatch
    // 長(zhǎng)時(shí)存儲(chǔ)空間
    longStore []*multimetricStore
}

該結(jié)構(gòu)就是用于heapster API調(diào)用時(shí)獲取的數(shù)據(jù)源,這里會(huì)分為兩種數(shù)據(jù)存儲(chǔ)方式:長(zhǎng)時(shí)存儲(chǔ)和短時(shí)存儲(chǔ)。所以集群越大時(shí),heapster占用內(nèi)存越多,需要考慮該問(wèn)題如何處理或者優(yōu)化。

type influxdbSink struct {
    // 連接后端influxDB數(shù)據(jù)庫(kù)的client
    client influxdb_common.InfluxdbClient
    // 鎖
    sync.RWMutex
    c        influxdb_common.InfluxdbConfig
    dbExists bool
}

這個(gè)就是我們配置的InfluxDB的結(jié)構(gòu),是我們真正的數(shù)據(jù)存儲(chǔ)后端。

開(kāi)始介紹創(chuàng)建后端服務(wù)流程,從sinksFactory.BuildAll()接口直接入手。
路徑: heapster/metrics/sinks/factory.go

func (this *SinkFactory) BuildAll(uris flags.Uris, historicalUri string) (*metricsink.MetricSink, []core.DataSink, core.HistoricalSource) {
    result := make([]core.DataSink, 0, len(uris))
    var metric *metricsink.MetricSink
    var historical core.HistoricalSource
    // 根據(jù)傳入的"--sink"參數(shù)信息,進(jìn)行build
    // 支持多后端數(shù)據(jù)存儲(chǔ),會(huì)進(jìn)行遍歷并創(chuàng)建
    for _, uri := range uris {
        // 關(guān)鍵接口
        sink, err := this.Build(uri)
        if err != nil {
            glog.Errorf("Failed to create sink: %v", err)
            continue
        }
        if uri.Key == "metric" {
            metric = sink.(*metricsink.MetricSink)
        }
        if uri.String() == historicalUri {
            if asHistSource, ok := sink.(core.AsHistoricalSource); ok {
                historical = asHistSource.Historical()
            } else {
                glog.Errorf("Sink type %q does not support being used for historical access", uri.Key)
            }
        }
        result = append(result, sink)
    }
    // 默認(rèn)metricSink一定會(huì)創(chuàng)建
    if metric == nil {
        uri := flags.Uri{}
        uri.Set("metric")
        sink, err := this.Build(uri)
        if err == nil {
            result = append(result, sink)
            metric = sink.(*metricsink.MetricSink)
        } else {
            glog.Errorf("Error while creating metric sink: %v", err)
        }
    }
    if len(historicalUri) > 0 && historical == nil {
        glog.Errorf("Error while initializing historical access: unable to use sink %q as a historical source", historicalUri)
    }
    return metric, result, historical
}

該接口流程比較簡(jiǎn)單,就是對(duì)傳入?yún)?shù)進(jìn)行判斷,然后調(diào)用this.Build()進(jìn)行創(chuàng)建,這里只需要注意即使沒(méi)有配置metric,也會(huì)進(jìn)行metricSink的創(chuàng)建。

func (this *SinkFactory) Build(uri flags.Uri) (core.DataSink, error) {
    switch uri.Key {
    。。。
    case "influxdb":
        return influxdb.CreateInfluxdbSink(&uri.Val)
    。。。
    case "metric":
        return metricsink.NewMetricSink(140*time.Second, 15*time.Minute, []string{
            core.MetricCpuUsageRate.MetricDescriptor.Name,
            core.MetricMemoryUsage.MetricDescriptor.Name}), nil
    。。。
    default:
        return nil, fmt.Errorf("Sink not recognized: %s", uri.Key)
    }
}

influxdb的創(chuàng)建其實(shí)就是根據(jù)傳入的參數(shù)然后創(chuàng)建一個(gè)config結(jié)構(gòu),用于后面創(chuàng)建連接influxDB的client;
metric的創(chuàng)建其實(shí)就是初始化了一個(gè)MetricSink結(jié)構(gòu),需要注意的是傳入的第三個(gè)參數(shù),因?yàn)檫@是用于指定哪些metrics需要進(jìn)行長(zhǎng)時(shí)間存儲(chǔ),默認(rèn)就是cpu/usage和memory/usage,因?yàn)檫@兩個(gè)參數(shù)用戶最為關(guān)心。
具體的創(chuàng)建接口就不在深入了,較為簡(jiǎn)單。
到這里BuildAll()就結(jié)束了,至于返回值前面已經(jīng)做過(guò)介紹,就不在累贅了。
其實(shí)沒(méi)那么簡(jiǎn)單,還有一步:sinkManager的創(chuàng)建。
進(jìn)入sinks.NewDataSinkManager()接口看下:

func NewDataSinkManager(sinks []core.DataSink, exportDataTimeout, stopTimeout time.Duration) (core.DataSink, error) {
    sinkHolders := []sinkHolder{}
    // 遍歷前面創(chuàng)建的sinkList
    for _, sink := range sinks {
        // 為每個(gè)sink添加一個(gè)dataChannel和stopChannel
        // 用于獲取數(shù)據(jù)和stop信號(hào)
        sh := sinkHolder{
            sink:             sink,
            dataBatchChannel: make(chan *core.DataBatch),
            stopChannel:      make(chan bool),
        }
        sinkHolders = append(sinkHolders, sh)
        // 每個(gè)sink都會(huì)創(chuàng)建一個(gè)協(xié)程
        // 從dataChannel獲取數(shù)據(jù),并調(diào)用sink.export()導(dǎo)出到后端數(shù)據(jù)庫(kù)
        go func(sh sinkHolder) {
            for {
                select {
                case data := <-sh.dataBatchChannel:
                    export(sh.sink, data)
                case isStop := <-sh.stopChannel:
                    glog.V(2).Infof("Stop received: %s", sh.sink.Name())
                    if isStop {
                        sh.sink.Stop()
                        return
                    }
                }
            }
        }(sh)
    }
    return &sinkManager{
        sinkHolders:       sinkHolders,
        exportDataTimeout: exportDataTimeout,
        stopTimeout:       stopTimeout,
    }, nil
}

這里會(huì)為每個(gè)sink創(chuàng)建協(xié)程,等待數(shù)據(jù)的到來(lái)并最終將數(shù)據(jù)導(dǎo)入到對(duì)應(yīng)的后端數(shù)據(jù)庫(kù)。
這里需要帶個(gè)問(wèn)號(hào),既然channel有一端在收,總得有地方會(huì)發(fā)送,這會(huì)在后面才會(huì)揭曉。

go協(xié)程 + channel的方式,是golang最常見(jiàn)的方式,確實(shí)便用。

創(chuàng)建數(shù)據(jù)Processors

因?yàn)閏Advisor返回的原始數(shù)據(jù)就包含了nodes和containers的相關(guān)數(shù)據(jù),所以heapster需要?jiǎng)?chuàng)建各種processor,用于處理成不同類(lèi)型的數(shù)據(jù),比如pod, namespace, cluster,node。
還有些數(shù)據(jù)需要計(jì)算出速率,有些數(shù)據(jù)需要進(jìn)行累加,不同類(lèi)型擁有的metrics還不一樣等等情況。
看下源碼:

func main() {
    ...

    // 計(jì)算namespace和cluster的metrics值時(shí),下列數(shù)據(jù)需要進(jìn)行累加求值
    metricsToAggregate := []string{
        core.MetricCpuUsageRate.Name,
        core.MetricMemoryUsage.Name,
        core.MetricCpuRequest.Name,
        core.MetricCpuLimit.Name,
        core.MetricMemoryRequest.Name,
        core.MetricMemoryLimit.Name,
    }
    // 計(jì)算node的metrics值時(shí),下列數(shù)據(jù)需要進(jìn)行累加求值
    metricsToAggregateForNode := []string{
        core.MetricCpuRequest.Name,
        core.MetricCpuLimit.Name,
        core.MetricMemoryRequest.Name,
        core.MetricMemoryLimit.Name,
    }
    // RateMetricsMapping中的數(shù)據(jù)需要計(jì)算速率,比如cpu/usage_rate,network/rx_rate
    dataProcessors := []core.DataProcessor{
        // Convert cumulaties to rate
        processors.NewRateCalculator(core.RateMetricsMapping),
    }

    kubernetesUrl, err := getKubernetesAddress(argSources)
    if err != nil {
        glog.Fatalf("Failed to get kubernetes address: %v", err)
    }

    kubeConfig, err := kube_config.GetKubeClientConfig(kubernetesUrl)
    if err != nil {
        glog.Fatalf("Failed to get client config: %v", err)
    }
    kubeClient := kube_client.NewOrDie(kubeConfig)
    // 創(chuàng)建pod的ListWatch,用于從k8s server監(jiān)聽(tīng)pod變更
    podLister, err := getPodLister(kubeClient)
    if err != nil {
        glog.Fatalf("Failed to create podLister: %v", err)
    }
    // 創(chuàng)建node的ListWatch,用于從k8s server監(jiān)聽(tīng)node變更
    nodeLister, err := getNodeLister(kubeClient)
    if err != nil {
        glog.Fatalf("Failed to create nodeLister: %v", err)
    }
    // 該podBasedEnricher用于解析從sources獲取到的pod和container的metrics數(shù)據(jù),
    // 然后對(duì)pod和container進(jìn)行數(shù)據(jù)完善,比如添加labels.但這里還不會(huì)處理metricsValue
    podBasedEnricher, err := processors.NewPodBasedEnricher(podLister)
    if err != nil {
        glog.Fatalf("Failed to create PodBasedEnricher: %v", err)
    }
    dataProcessors = append(dataProcessors, podBasedEnricher)
    // 跟上面的podBasedEnricher同理,需要注意的是在append時(shí)有先后順序
    namespaceBasedEnricher, err := processors.NewNamespaceBasedEnricher(kubernetesUrl)
    if err != nil {
        glog.Fatalf("Failed to create NamespaceBasedEnricher: %v", err)
    }
    dataProcessors = append(dataProcessors, namespaceBasedEnricher)

    // 這里的對(duì)象會(huì)對(duì)metricsValue進(jìn)行處理,對(duì)應(yīng)的數(shù)據(jù)進(jìn)行累加求值
    dataProcessors = append(dataProcessors,
        processors.NewPodAggregator(),
        &processors.NamespaceAggregator{
            MetricsToAggregate: metricsToAggregate,
        },
        &processors.NodeAggregator{
            MetricsToAggregate: metricsToAggregateForNode,
        },
        &processors.ClusterAggregator{
            MetricsToAggregate: metricsToAggregate,
        })

    dataProcessors = append(dataProcessors, processors.NewRcAggregator())

    nodeAutoscalingEnricher, err := processors.NewNodeAutoscalingEnricher(kubernetesUrl)
    if err != nil {
        glog.Fatalf("Failed to create NodeAutoscalingEnricher: %v", err)
    }
    dataProcessors = append(dataProcessors, nodeAutoscalingEnricher)

Processors的功能基本就是這樣了,相對(duì)有點(diǎn)復(fù)雜,數(shù)據(jù)處理的樣式和類(lèi)別較多。
各個(gè)對(duì)象的Process()方法就不進(jìn)行一一介紹了,就是按照順序一個(gè)一個(gè)的填充core.DataBatch數(shù)據(jù)。有興趣的可以逐個(gè)看下,可以借鑒下實(shí)現(xiàn)的方式。

獲取源數(shù)據(jù)并存儲(chǔ)

前面的都是鋪墊,開(kāi)始介紹heapster的關(guān)鍵實(shí)現(xiàn),進(jìn)行源數(shù)據(jù)的獲取,并導(dǎo)出到后端存儲(chǔ)。
先介紹相關(guān)結(jié)構(gòu):

type Manager interface {
    Start()
    Stop()
}

Manager是需要實(shí)現(xiàn)Start和stop方法的接口。而真實(shí)創(chuàng)建的對(duì)象其實(shí)是realManager:

type realManager struct {
    // 數(shù)據(jù)源
    source                 core.MetricsSource
    // 數(shù)據(jù)處理對(duì)象
    processors             []core.DataProcessor
    // 后端存儲(chǔ)對(duì)象
    sink                   core.DataSink
    // 每次scrape數(shù)據(jù)的時(shí)間間隔
    resolution             time.Duration
    // 創(chuàng)建多個(gè)scrape協(xié)程時(shí),需要sleep這點(diǎn)時(shí)間,防止異常
    scrapeOffset           time.Duration
    // scrape 停止的管道
    stopChan               chan struct{}
    // 
    housekeepSemaphoreChan chan struct{}
    // 超時(shí)
    housekeepTimeout       time.Duration
}

關(guān)鍵的代碼如下:

    manager, err := manager.NewManager(sourceManager, dataProcessors, sinkManager, *argMetricResolution,
        manager.DefaultScrapeOffset, manager.DefaultMaxParallelism)
    if err != nil {
        glog.Fatalf("Failed to create main manager: %v", err)
    }
    manager.Start()

首先會(huì)根據(jù)前面創(chuàng)建的sourceManager, dataProcessors, sinkManager對(duì)象,再創(chuàng)建manager。
路徑: heapster/metrics/manager/manager.go

func NewManager(source core.MetricsSource, processors []core.DataProcessor, sink core.DataSink, resolution time.Duration,
    scrapeOffset time.Duration, maxParallelism int) (Manager, error) {
    manager := realManager{
        source:                 source,
        processors:             processors,
        sink:                   sink,
        resolution:             resolution,
        scrapeOffset:           scrapeOffset,
        stopChan:               make(chan struct{}),
        housekeepSemaphoreChan: make(chan struct{}, maxParallelism),
        housekeepTimeout:       resolution / 2,
    }

    for i := 0; i < maxParallelism; i++ {
        manager.housekeepSemaphoreChan <- struct{}{}
    }

    return &manager, nil
}

前面介紹了該關(guān)鍵結(jié)構(gòu)readlManager,繼續(xù)進(jìn)入manager.Start():

func (rm *realManager) Start() {
    go rm.Housekeep()
}

func (rm *realManager) Housekeep() {
    for {
        // Always try to get the newest metrics
        now := time.Now()
        // 獲取數(shù)據(jù)的時(shí)間段,默認(rèn)是1min
        start := now.Truncate(rm.resolution)
        end := start.Add(rm.resolution)
        // 真正同步一次的時(shí)間間隔,默認(rèn)是1min + 5s
        timeToNextSync := end.Add(rm.scrapeOffset).Sub(now)

        select {
        case <-time.After(timeToNextSync):
            rm.housekeep(start, end)
        case <-rm.stopChan:
            rm.sink.Stop()
            return
        }
    }
}

繼續(xù)看rm.housekeep(start, end), 該接口就傳入了時(shí)間區(qū)間,其實(shí)cAdvisor就是支持時(shí)間區(qū)間來(lái)獲取metrics值。

func (rm *realManager) housekeep(start, end time.Time) {
    if !start.Before(end) {
        glog.Warningf("Wrong time provided to housekeep start:%s end: %s", start, end)
        return
    }

    select {
    case <-rm.housekeepSemaphoreChan:
        // ok, good to go

    case <-time.After(rm.housekeepTimeout):
        glog.Warningf("Spent too long waiting for housekeeping to start")
        return
    }

    go func(rm *realManager) {
        defer func() { rm.housekeepSemaphoreChan <- struct{}{} }()
        // 從sources獲取數(shù)據(jù)
        data := rm.source.ScrapeMetrics(start, end)
        // 遍歷processors,然后進(jìn)行數(shù)據(jù)處理
        for _, p := range rm.processors {
            newData, err := process(p, data)
            if err == nil {
                data = newData
            } else {
                glog.Errorf("Error in processor: %v", err)
                return
            }
        }

        // 最終將數(shù)據(jù)導(dǎo)出到后端存儲(chǔ)
        rm.sink.ExportData(data)

    }(rm)
}

邏輯比較簡(jiǎn)單,會(huì)有三個(gè)關(guān)鍵:

源數(shù)據(jù)獲取

數(shù)據(jù)處理

導(dǎo)出到后端

先看下rm.source.ScrapeMetrics()接口實(shí)現(xiàn).
路徑: heapster/metrics/sources/manager.go

func (this *sourceManager) ScrapeMetrics(start, end time.Time) *DataBatch {
    // 調(diào)用了nodeLister.List()獲取最新的k8s nodes列表,再根據(jù)之前配置的kubelet端口等信息,返回sources
    // 在創(chuàng)建sourceProvider時(shí),會(huì)創(chuàng)建node的ListWatch,所以這里nodeLister可使用list()
    sources := this.metricsSourceProvider.GetMetricsSources()
    
    responseChannel := make(chan *DataBatch)
    。。。

    // 遍歷各個(gè)source,然后創(chuàng)建協(xié)程獲取數(shù)據(jù)
    for _, source := range sources {

        go func(source MetricsSource, channel chan *DataBatch, start, end, timeoutTime time.Time, delayInMs int) {
            // scrape()接口其實(shí)就是調(diào)用了kubeletMetricsSource.ScrapeMetrics()
            // 每個(gè)node都會(huì)組成對(duì)應(yīng)的kubeletMetricsSource
            // ScrapeMetrics()就是從cAdvisor中獲取監(jiān)控信息,并進(jìn)行了decode
            metrics := scrape(source, start, end)
            ...
            select {
            // 將獲取到的數(shù)據(jù)丟入responseChannel
            // 下面會(huì)用到
            case channel <- metrics:
                // passed the response correctly.
                return
            case <-time.After(timeForResponse):
                glog.Warningf("Failed to send the response back %s", source)
                return
            }
        }(source, responseChannel, start, end, timeoutTime, delayMs)
    }
    response := DataBatch{
        Timestamp:  end,
        MetricSets: map[string]*MetricSet{},
    }

    latencies := make([]int, 11)

responseloop:
    for i := range sources {
        ...
        select {
        // 獲取前面創(chuàng)建的協(xié)程得到的數(shù)據(jù)
        case dataBatch := <-responseChannel:
            if dataBatch != nil {
                for key, value := range dataBatch.MetricSets {
                    response.MetricSets[key] = value
                }
            }
            。。。

        case <-time.After(timeoutTime.Sub(now)):
            glog.Warningf("Failed to get all responses in time (got %d/%d)", i, len(sources))
            break responseloop
        }
    }

    ...
    return &response
}

該接口的邏輯就是先通過(guò)nodeLister獲取k8s所有的nodes,這樣便能知道所有的kubelet信息,然后創(chuàng)建對(duì)應(yīng)數(shù)量的協(xié)程從各個(gè)kubelet中獲取對(duì)應(yīng)的cAdvisor監(jiān)控信息,進(jìn)行處理后再返回。

獲取到數(shù)據(jù)后,就需要調(diào)用各個(gè)processors的Process()接口進(jìn)行數(shù)據(jù)處理,接口太多就不一一介紹了,挑個(gè)node_aggregator.go進(jìn)行介紹:

func (this *NodeAggregator) Process(batch *core.DataBatch) (*core.DataBatch, error) {
    for key, metricSet := range batch.MetricSets {
        // 判斷下該metric是否是pod的
        // metricSet.Labels都是前面就進(jìn)行了填充,所以前面說(shuō)需要注意每個(gè)processor的append順序
        if metricSetType, found := metricSet.Labels[core.LabelMetricSetType.Key]; found && metricSetType == core.MetricSetTypePod {
            // Aggregating pods
            nodeName, found := metricSet.Labels[core.LabelNodename.Key]
            if nodeName == "" {
                glog.V(8).Infof("Skipping pod %s: no node info", key)
                continue
            }
            if found {
                // 獲取nodeKey,比如: node:172.25.5.111
                nodeKey := core.NodeKey(nodeName)
                // 前面都是判斷該pod在哪個(gè)node上,然后該node的數(shù)據(jù)是需要通過(guò)這些pod進(jìn)行累加得到
                node, found := batch.MetricSets[nodeKey]
                if !found {
                    glog.V(1).Info("No metric for node %s, cannot perform node level aggregation.")
                } else if err := aggregate(metricSet, node, this.MetricsToAggregate); err != nil {
                    return nil, err
                }
            } else {
                glog.Errorf("No node info in pod %s: %v", key, metricSet.Labels)
            }
        }
    }
    return batch, nil
}

基本流程就是這樣了,有需要的可以各個(gè)深入查看。

最后就是數(shù)據(jù)的后端存儲(chǔ)。
這里會(huì)涉及到兩部分:metricSink和influxdbSink。

從rm.sink.ExportData(data)接口入手:
路徑: heapster/metrics/sinks/manager.go

func (this *sinkManager) ExportData(data *core.DataBatch) {
    var wg sync.WaitGroup
    // 遍歷所有的sink,這里其實(shí)就兩個(gè)
    for _, sh := range this.sinkHolders {
        wg.Add(1)
        // 創(chuàng)建協(xié)程,然后將之前獲取的data丟入dataBatchChannel
        go func(sh sinkHolder, wg *sync.WaitGroup) {
            defer wg.Done()
            glog.V(2).Infof("Pushing data to: %s", sh.sink.Name())
            select {
            case sh.dataBatchChannel <- data:
                glog.V(2).Infof("Data push completed: %s", sh.sink.Name())
                // everything ok
            case <-time.After(this.exportDataTimeout):
                glog.Warningf("Failed to push data to sink: %s", sh.sink.Name())
            }
        }(sh, &wg)
    }
    // Wait for all pushes to complete or timeout.
    wg.Wait()
}

千辛萬(wàn)苦,你把數(shù)據(jù)丟入sh.dataBatchChannel完事了?
dataBatchChannel有點(diǎn)眼熟,因?yàn)橹皠?chuàng)建sinkManager的時(shí)候,也創(chuàng)建了協(xié)程并監(jiān)聽(tīng)了該管道,所以真正export數(shù)據(jù)是在之前就完成了,這里只需要把數(shù)據(jù)丟入管道即可。
所以golang中協(xié)程與協(xié)程之間的通信,channel才是王道啊!
ExportData有兩個(gè),一個(gè)一個(gè)講吧。
先來(lái)關(guān)鍵的influxDB.
路徑: heapster/metrics/sinks/influxdb/influxdb.go

func (sink *influxdbSink) ExportData(dataBatch *core.DataBatch) {
    ...
    dataPoints := make([]influxdb.Point, 0, 0)
    for _, metricSet := range dataBatch.MetricSets {
        // 遍歷MetricValues
        for metricName, metricValue := range metricSet.MetricValues {
            var value interface{}
            if core.ValueInt64 == metricValue.ValueType {
                value = metricValue.IntValue
            } else if core.ValueFloat == metricValue.ValueType {
                value = float64(metricValue.FloatValue)
            } else {
                continue
            }

            // Prepare measurement without fields
            fieldName := "value"
            measurementName := metricName
            if sink.c.WithFields {
                // Prepare measurement and field names
                serieName := strings.SplitN(metricName, "/", 2)
                measurementName = serieName[0]
                if len(serieName) > 1 {
                    fieldName = serieName[1]
                }
            }
            // influxdb單條數(shù)據(jù)結(jié)構(gòu)
            point := influxdb.Point{
                // 度量值名稱,比如cpu/usage
                Measurement: measurementName,
                // 該tags就是在processors中進(jìn)行添加,主要是pod_name,node_name,namespace_name等
                Tags:        metricSet.Labels,
                // 該字段就是具體的值了
                Fields: map[string]interface{}{
                    fieldName: value,
                },
                // 時(shí)間戳
                Time: dataBatch.Timestamp.UTC(),
            }
            // append到dataPoints,超過(guò)maxSendBatchSize數(shù)量后直接sendData到influxdb
            dataPoints = append(dataPoints, point)
            if len(dataPoints) >= maxSendBatchSize {
                sink.sendData(dataPoints)
                dataPoints = make([]influxdb.Point, 0, 0)
            }
        }
        // 遍歷LabeledMetrics,主要就是filesystem的數(shù)據(jù)
        // 不太明白為何要將filesystem的數(shù)據(jù)進(jìn)行區(qū)分,要放到Labeled中?什么意圖?望高手指點(diǎn),謝謝
        // 接下來(lái)的操作就跟上面MetricValues的操作差不多了
        for _, labeledMetric := range metricSet.LabeledMetrics {
            。。。
            point := influxdb.Point{
                Measurement: measurementName,
                Tags:        make(map[string]string),
                Fields: map[string]interface{}{
                    fieldName: value,
                },
                Time: dataBatch.Timestamp.UTC(),
            }
            for key, value := range metricSet.Labels {
                point.Tags[key] = value
            }
            for key, value := range labeledMetric.Labels {
                point.Tags[key] = value
            }
            dataPoints = append(dataPoints, point)
            if len(dataPoints) >= maxSendBatchSize {
                sink.sendData(dataPoints)
                dataPoints = make([]influxdb.Point, 0, 0)
            }
        }
    }
    if len(dataPoints) >= 0 {
        sink.sendData(dataPoints)
    }
}

該接口中有一處不太明白,metricSet中的LabeledMetrics和MetricsValue有何差別,為何要將filesystem的數(shù)據(jù)進(jìn)行區(qū)分對(duì)待,放入LabeldMetrics?
看代碼的過(guò)程中沒(méi)有得到答案,望大神指點(diǎn)迷津,多謝多謝!

有問(wèn)題,但也不影響繼續(xù)往下學(xué)習(xí),接著看下MetricSink:

func (this *MetricSink) ExportData(batch *core.DataBatch) {
    this.lock.Lock()
    defer this.lock.Unlock()

    now := time.Now()
    // 將數(shù)據(jù)丟入longStore和shortStore
    // 需要根據(jù)保存的時(shí)間將老數(shù)據(jù)丟棄
    this.longStore = append(popOldStore(this.longStore, now.Add(-this.longStoreDuration)),
        buildMultimetricStore(this.longStoreMetrics, batch))
    this.shortStore = append(popOld(this.shortStore, now.Add(-this.shortStoreDuration)), batch)
}

該邏輯比較簡(jiǎn)單,就是將數(shù)據(jù)丟入兩個(gè)Store中,然后把過(guò)期數(shù)據(jù)丟棄。
這里提醒一點(diǎn),heapster API調(diào)用時(shí)先會(huì)從longStore中匹配數(shù)據(jù),沒(méi)匹配上的話再?gòu)膕hortStore獲取,而longStore中存儲(chǔ)的數(shù)據(jù)類(lèi)型前面已經(jīng)做過(guò)介紹。

終于結(jié)束了。。

Heapster API創(chuàng)建

前面的主流業(yè)務(wù)都介紹完了,Heapster本身也提供了API用于開(kāi)發(fā)者進(jìn)行使用與測(cè)試。
繼續(xù)分析代碼吧:

    // 關(guān)鍵接口,后面分析
    handler := setupHandlers(metricSink, podLister, nodeLister, historicalSource)
    。。。
    // 創(chuàng)建http的mux多分器,用于http.Server的路由
    mux := http.NewServeMux()
    // prometheus:最新出現(xiàn)的人氣很高的監(jiān)控系統(tǒng),值得了解學(xué)習(xí)下,后續(xù)安排!
    promHandler := prometheus.Handler()
    // 支持TLS,我們用了http
    if len(*argTLSCertFile) > 0 && len(*argTLSKeyFile) > 0 {
        。。。
    } else {
        // 多分器分了"/"和"/metrics"
        // 進(jìn)入"/",還會(huì)進(jìn)行細(xì)分,里面使用到了go-restful
        mux.Handle("/", handler)
        mux.Handle("/metrics", promHandler)
        // 注冊(cè)健康檢測(cè)接口
        healthz.InstallHandler(mux, healthzChecker(metricSink))
        // 啟動(dòng)Server
        glog.Fatal(http.ListenAndServe(addr, mux))
    }

這里的關(guān)鍵是setupHandlers()接口,需要學(xué)習(xí)下里面如何使用go-restful進(jìn)行請(qǐng)求路由的。

k8s apiServer中也大量使用了go-restful,在學(xué)習(xí)該源碼時(shí)有進(jìn)行過(guò)分析

路徑: heapster/metrics/handlers.go

func setupHandlers(metricSink *metricsink.MetricSink, podLister *cache.StoreToPodLister, nodeLister *cache.StoreToNodeLister, historicalSource core.HistoricalSource) http.Handler {

    runningInKubernetes := true

    // 創(chuàng)建container,指定route類(lèi)型為CurlyRouter
    // 這些都跟go-restful基礎(chǔ)有關(guān),有興趣的可以看下原理
    wsContainer := restful.NewContainer()
    wsContainer.EnableContentEncoding(true)
    wsContainer.Router(restful.CurlyRouter{})
    // 注冊(cè)v1版本相關(guān)的api,包括官方介紹的"/api/v1/model"
    a := v1.NewApi(runningInKubernetes, metricSink, historicalSource)
    a.Register(wsContainer)
    // 這個(gè)metricsApi注冊(cè)了"/apis/metrics/v1alpha1"的各類(lèi)命令
    // 暫不關(guān)心
    m := metricsApi.NewApi(metricSink, podLister, nodeLister)
    m.Register(wsContainer)

    handlePprofEndpoint := func(req *restful.Request, resp *restful.Response) {
        name := strings.TrimPrefix(req.Request.URL.Path, pprofBasePath)
        switch name {
        case "profile":
            pprof.Profile(resp, req.Request)
        case "symbol":
            pprof.Symbol(resp, req.Request)
        case "cmdline":
            pprof.Cmdline(resp, req.Request)
        default:
            pprof.Index(resp, req.Request)
        }
    }

    // Setup pporf handlers.
    ws = new(restful.WebService).Path(pprofBasePath)
    ws.Route(ws.GET("/{subpath:*}").To(metrics.InstrumentRouteFunc("pprof", handlePprofEndpoint))).Doc("pprof endpoint")
    wsContainer.Add(ws)

    return wsContainer
}

關(guān)鍵在于v1版本的API注冊(cè),繼續(xù)深入a.Register(wsContainer):

func (a *Api) Register(container *restful.Container) {
    // 注冊(cè)"/api/v1/metric-export" API
    // 用于從shortStore中獲取所有的metrics信息
    ws := new(restful.WebService)
    ws.Path("/api/v1/metric-export").
        Doc("Exports the latest point for all Heapster metrics").
        Produces(restful.MIME_JSON)
    ws.Route(ws.GET("").
        To(a.exportMetrics).
        Doc("export the latest data point for all metrics").
        Operation("exportMetrics").
        Writes([]*types.Timeseries{}))
    // ws必須要add到container中才能生效
    container.Add(ws)
    // 注冊(cè)"/api/v1/metric-export-schema" API
    // 用于導(dǎo)出所有的metrics name,比如network-rx
    // 還會(huì)導(dǎo)出還有的labels,比如pod-name
    ws = new(restful.WebService)
    ws.Path("/api/v1/metric-export-schema").
        Doc("Schema for metrics exported by heapster").
        Produces(restful.MIME_JSON)
    ws.Route(ws.GET("").
        To(a.exportMetricsSchema).
        Doc("export the schema for all metrics").
        Operation("exportmetricsSchema").
        Writes(types.TimeseriesSchema{}))
    container.Add(ws)

    // 注冊(cè)metircSink相關(guān)的API,即"/api/v1/model/"
    if a.metricSink != nil {
        glog.Infof("Starting to Register Model.")
        a.RegisterModel(container)
    }

    if a.historicalSource != nil {
        a.RegisterHistorical(container)
    }
}

官方資料中介紹heapster metric model,我們使用到這些API也會(huì)比較多。
進(jìn)入a.RegisterModel(container)看下:

func (a *Api) RegisterModel(container *restful.Container) {
    ws := new(restful.WebService)
    // 指定所有命令的prefix: "/api/v1/model"
    ws.Path("/api/v1/model").
        Doc("Root endpoint of the stats model").
        Consumes("*/*").
        Produces(restful.MIME_JSON)
    // 在這里增加各類(lèi)命令,比如"/metrics/,/nodes/"等等
    addClusterMetricsRoutes(a, ws)

    // 列出所有的keys
    ws.Route(ws.GET("/debug/allkeys").
        To(metrics.InstrumentRouteFunc("debugAllKeys", a.allKeys)).
        Doc("Get keys of all metric sets available").
        Operation("debugAllKeys"))
    container.Add(ws)
}

繼續(xù)看addClusterMetricsRoutes():

func addClusterMetricsRoutes(a clusterMetricsFetcher, ws *restful.WebService) {
    。。。
    if a.isRunningInKubernetes() {
        // 列出所有namespaces的API
        ws.Route(ws.GET("/namespaces/").
            To(metrics.InstrumentRouteFunc("namespaceList", a.namespaceList)).
            Doc("Get a list of all namespaces that have some current metrics").
            Operation("namespaceList"))

        // 獲取指定namespaces的metrics
        ws.Route(ws.GET("/namespaces/{namespace-name}/metrics").
            To(metrics.InstrumentRouteFunc("availableNamespaceMetrics", a.availableNamespaceMetrics)).
            Doc("Get a list of all available metrics for a Namespace entity").
            Operation("availableNamespaceMetrics").
            Param(ws.PathParameter("namespace-name", "The name of the namespace to lookup").DataType("string")))

        // 獲取namespace指定的metrics值
        ws.Route(ws.GET("/namespaces/{namespace-name}/metrics/{metric-name:*}").
            To(metrics.InstrumentRouteFunc("namespaceMetrics", a.namespaceMetrics)).
            Doc("Export an aggregated namespace-level metric").
            Operation("namespaceMetrics").
            Param(ws.PathParameter("namespace-name", "The name of the namespace to lookup").DataType("string")).
            Param(ws.PathParameter("metric-name", "The name of the requested metric").DataType("string")).
            Param(ws.QueryParameter("start", "Start time for requested metrics").DataType("string")).
            Param(ws.QueryParameter("end", "End time for requested metric").DataType("string")).
            Param(ws.QueryParameter("labels", "A comma-separated list of key:values pairs to use to search for a labeled metric").DataType("string")).
            Writes(types.MetricResult{}))
        。。。
    }
    。。。
}

Heapster API的注冊(cè)基本就這樣了,在花點(diǎn)時(shí)間看下API的實(shí)現(xiàn)吧。
我們挑一個(gè)例子做下分析,獲取某個(gè)pod的指定的metrics值.
對(duì)應(yīng)的接口:heapster/metrics/api/v1/model_handler.go

func (a *Api) podMetrics(request *restful.Request, response *restful.Response) {
    a.processMetricRequest(
        // 根據(jù)URI傳入的ns和pod名字,拼裝成key,如:"namespace:default/pod:123"
        core.PodKey(request.PathParameter("namespace-name"),
            request.PathParameter("pod-name")),
        request, response)
}

根據(jù)URI的輸入?yún)?shù)并調(diào)用processMetricRequest()接口,獲取對(duì)應(yīng)的metric value:

func (a *Api) processMetricRequest(key string, request *restful.Request, response *restful.Response) {
    // 時(shí)間區(qū)間
    start, end, err := getStartEndTime(request)
    if err != nil {
        response.WriteError(http.StatusBadRequest, err)
        return
    }
    // 獲取metric Name,比如"/cpu/usage"
    metricName := request.PathParameter("metric-name")
    // 根據(jù)metricName進(jìn)行轉(zhuǎn)換,比如將cpu-usage轉(zhuǎn)換成cpu/usage_rate
    // 所以這里需要注意cpu-usage不等于/cpu/usage,一個(gè)表示cpu使用率,一個(gè)表示cpu使用量
    convertedMetricName := convertMetricName(metricName)
    // 獲取請(qǐng)求中的labels,根據(jù)是否有指定labels來(lái)調(diào)用不同的接口
    labels, err := getLabels(request)
    if err != nil {
        response.WriteError(http.StatusBadRequest, err)
        return
    }

    var metrics map[string][]core.TimestampedMetricValue
    if labels != nil {
        // 該接口從metricSet.LabeledMetrics中獲取對(duì)應(yīng)的value
        metrics = a.metricSink.GetLabeledMetric(convertedMetricName, labels, []string{key}, start, end)
    } else {
        // 該接口先從longStoreMetrics中進(jìn)行匹配,匹配不到的話再?gòu)膕hortStore中獲取對(duì)應(yīng)的metricValue
        metrics = a.metricSink.GetMetric(convertedMetricName, []string{key}, start, end)
    }
    // 將獲取到的metricValue轉(zhuǎn)換成MetricPoint格式的值,會(huì)有多組"時(shí)間戳+value"
    converted := exportTimestampedMetricValue(metrics[key])
    // 將結(jié)果進(jìn)行response
    response.WriteEntity(converted)
}

OK,大功告成!API的實(shí)現(xiàn)也講完了,很多API都是相通的,最終都會(huì)調(diào)用相同的接口,所以不一一介紹了。
這里需要注意heapster的API的URI還有多種寫(xiě)法,比如/api/v1/model/cpu-usage,等價(jià)于/api/v1/model/cpu/usage_rate/,別誤理解成/cpu/usage了,這兩個(gè)概念不一樣,一個(gè)是cpu使用率,一個(gè)是cpu使用量。

上面的提醒告訴我們,沒(méi)事多看源碼,很多誤解自然而然就解除了!

筆者能力有限,看源碼也在于學(xué)習(xí)提升能力,當(dāng)然也會(huì)有較多不理解或者理解不當(dāng)?shù)牡胤剑M魑荒苡枰猿C正,多謝多謝!

擴(kuò)展

上面的介紹完了Heapster的實(shí)現(xiàn),我們可以思考下是否可以動(dòng)手修改源碼,比如增加一些對(duì)象的metrics信息。
筆者考慮是否可以直接支持RC/RS/Deployment的metrics信息,讓業(yè)務(wù)層可以直接拿到服務(wù)的整體信息。

參考資料

Heapster官方資料:https://github.com/kubernetes...

InfluxDB github: https://github.com/influxdata...

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://specialneedsforspecialkids.com/yun/32552.html

相關(guān)文章

  • Kubernetes監(jiān)控Heapster介紹

    摘要:在每個(gè)上都會(huì)運(yùn)行,它會(huì)收集本機(jī)以及容器的監(jiān)控?cái)?shù)據(jù)。使用這里主要介紹的使用,及可獲取的。參考資料文檔文檔及可用在官方文檔中都介紹的比較齊全。我們沒(méi)有采用該方式,是考慮到如果和監(jiān)控系統(tǒng)相互依賴,會(huì)導(dǎo)致異常之后,存在監(jiān)控系統(tǒng)無(wú)法使用的隱患。 什么是Heapster? Heapster是容器集群監(jiān)控和性能分析工具,天然的支持Kubernetes和CoreOS。Kubernetes有個(gè)出名的監(jiān)控...

    LeviDing 評(píng)論0 收藏0
  • 容器監(jiān)控實(shí)踐—cAdvisor

    摘要:在中包含大量的了相關(guān)的信息參考原生監(jiān)控文章的收集器更多源碼參考文章總結(jié)優(yōu)缺點(diǎn)優(yōu)點(diǎn)谷歌開(kāi)源產(chǎn)品,監(jiān)控指標(biāo)齊全,部署方便,而且有官方的鏡像。 概述 為了解決docker stats的問(wèn)題(存儲(chǔ)、展示),谷歌開(kāi)源的cadvisor誕生了,cadvisor不僅可以搜集一臺(tái)機(jī)器上所有運(yùn)行的容器信息,還提供基礎(chǔ)查詢界面和http接口,方便其他組件如Prometheus進(jìn)行數(shù)據(jù)抓取,或者cadvis...

    andycall 評(píng)論0 收藏0
  • 容器監(jiān)控實(shí)踐—cAdvisor

    摘要:在中包含大量的了相關(guān)的信息參考原生監(jiān)控文章的收集器更多源碼參考文章總結(jié)優(yōu)缺點(diǎn)優(yōu)點(diǎn)谷歌開(kāi)源產(chǎn)品,監(jiān)控指標(biāo)齊全,部署方便,而且有官方的鏡像。 概述 為了解決docker stats的問(wèn)題(存儲(chǔ)、展示),谷歌開(kāi)源的cadvisor誕生了,cadvisor不僅可以搜集一臺(tái)機(jī)器上所有運(yùn)行的容器信息,還提供基礎(chǔ)查詢界面和http接口,方便其他組件如Prometheus進(jìn)行數(shù)據(jù)抓取,或者cadvis...

    kbyyd24 評(píng)論0 收藏0
  • Kubernetes v1.0特性解析

    摘要:?jiǎn)栴}是不是定義的一個(gè)的容器集群是只部署在同一個(gè)主機(jī)上楊樂(lè)到目前是,同一個(gè)里的是部署在同一臺(tái)主機(jī)的。問(wèn)題這個(gè)圖里的是安裝在哪里的所有的客戶端以及會(huì)連接這個(gè)嘛楊樂(lè)可以任意地方,只要能訪問(wèn)到集群,會(huì)作為的出口。 kubernetes1.0剛剛發(fā)布,開(kāi)源社區(qū)400多位貢獻(xiàn)者一年的努力,多達(dá)14000多次的代碼提交,最終達(dá)到了之前預(yù)計(jì)的milestone, 并意味著這個(gè)開(kāi)源容器編排系統(tǒng)可以正式在...

    HackerShell 評(píng)論0 收藏0
  • kubernetesheapster的部署案例

    摘要:舉個(gè)例子,我們?cè)谶@種狀態(tài)下創(chuàng)建一個(gè),然后執(zhí)行在中會(huì)發(fā)現(xiàn)有了字段,并且裝載了一個(gè)是的,這個(gè)就是我們這個(gè)下的。 注:本案例在我的部署環(huán)境下是可行的,但不保證在所有環(huán)境下都可行。我盡可能講得直白而詳細(xì),因?yàn)槲易约阂膊艅傞_(kāi)始接觸,已經(jīng)做過(guò)深入研究的可以瀏覽,若有什么錯(cuò)誤,煩請(qǐng)指正,感激不盡! 我的環(huán)境: K8S1.0.0+flannel+docker1.6的分布式集群。 這里先不贅述fla...

    Ali_ 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

最新活動(dòng)
閱讀需要支付1元查看
<