国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專欄INFORMATION COLUMN

k8s與日志--journalbeat源碼解讀

jemygraw / 2207人閱讀

摘要:但是也存在諸多的問題,隨著新設備的出現以及對安全的重視,這些缺點越發顯得突出,例如日志消息內容無法驗證數據格式松散日志檢索低效有限的元數據保存無法記錄二進制數據等。該服務可以為項目增加一定數量的元數據。

前言

對于日志系統的重要性不言而喻,參照滬江的一篇關于日志系統的介紹,基本上日志數據在以下幾方面具有非常重要的作用:

數據查找:通過檢索日志信息,定位相應的 bug ,找出解決方案

服務診斷:通過對日志信息進行統計、分析,了解服務器的負荷和服務運行狀態

數據分析:可以做進一步的數據分析,比如根據請求中的課程 id ,找出 TOP10 用戶感興趣課程

日志+大數據+AI的確有很多想象空間。
而對于收集系統,流行的技術stack有之前的elk,到現在的efk。logstash換成了filebeat。當然日志收集agent,也有flume和fluentd,尤其fluentd屬于cncf組織的產品,在k8s中有著廣泛的應用。但是fluentd是ruby寫的,不利于深入源碼了解。當然今天我們重點講的是另外一個agent--journalbeat。望文生義,隸屬于efk stack 中beats系列中的一員,專門用于收集journald日志。

journalbeat源碼解讀 journald日志簡介

長久以來 syslog 是每一個 Unix 系統中的重要部件。在漫長的歷史中在各種 Linux 發行版中都有不同的實現去完成類似的工作,它們采取的是邏輯相近,并使用基本相同的文件格式。但是 syslog 也存在諸多的問題,隨著新設備的出現以及對安全的重視,這些缺點越發顯得突出,例如日志消息內容無法驗證、數據格式松散、日志檢索低效、有限的元數據保存、無法記錄二進制數據等。
Journald是針對以上需求的解決方案。受udev事件啟發,Journal 條目與環境組塊相似。一個鍵值域,按照換行符分開,使用大寫的變量名。除了支持ASCII 格式的字符串外,還能夠支持二進制數據,如 ATA SMART 健康信息、SCSI 數據。應用程序和服務可以通過將項目域傳遞給systemd journald服務來生成項目。該服務可以為項目增加一定數量的元數據。這些受信任域的值由 Journal 服務來決定且無法由客戶端來偽造。在Journald中,可以把日志數據導出,在異地讀取,并不受處理器架構的影響。這對嵌入式設備是很有用的功能,方便維護人員分析設備運行狀況。
大致總結就是

journald日志是新的linux系統的具備的

journald區別于傳統的文件存儲方式,是二進制存儲。需要用journalctl查看。

docker對于journald的支持

The journald logging driver sends container logs to the systemd journal. Log entries can be retrieved using the journalctl command, through use of the journal API, or using the docker logs command.
即docker除了json等日志格式,已經增加了journald驅動。

目前本司使用場景

我們的k8s集群,所有的docker輸出的日志格式都采用journald,這樣主機centos系統日志和docker的日志都用journalbeat來收集。

journalbeat實現關鍵

journalbeat整個實現過程,基本上兩點:

與其他社區貢獻的beats系列,比如packetbeat,mysqlbeat類似,遵循了beats的框架和約定,journalbeat實現了run和stop等方法即可,然后作為一個客戶端,將收集到的數據,publish到beats中。

讀取journald日志,采用了coreos開源的go-systemd庫中sdjournal部分。其實sdjournal是一個利用cgo 對于journald日志c接口的封裝。

源碼解讀

程序入口:

package main

import (
    "log"

    "github.com/elastic/beats/libbeat/beat"
    "github.com/mheese/journalbeat/beater"
)

func main() {
    err := beat.Run("journalbeat", "", beater.New)
    if err != nil {
        log.Fatal(err)
    }
}

整個journalbeat共實現了3個方法即可。run,stop,和new。
run和stop顧名思義,就是beats控制journalbeat的運行和停止。
而new:
需要按照

// Creator initializes and configures a new Beater instance used to execute
// the beat its run-loop.
type Creator func(*Beat, *common.Config) (Beater, error)

實現Creator方法,返回的Beater實例,交由beats控制。
具體實現:

// New creates beater
func New(b *beat.Beat, cfg *common.Config) (beat.Beater, error) {
    config := config.DefaultConfig
    var err error
    if err = cfg.Unpack(&config); err != nil {
        return nil, fmt.Errorf("Error reading config file: %v", err)
    }

    jb := &Journalbeat{
        config:     config,
        done:       make(chan struct{}),
        cursorChan: make(chan string),
        pending:    make(chan *eventReference),
        completed:  make(chan *eventReference, config.PendingQueue.CompletedQueueSize),
    }

    if err = jb.initJournal(); err != nil {
        logp.Err("Failed to connect to the Systemd Journal: %v", err)
        return nil, err
    }

    jb.client = b.Publisher.Connect()
    return jb, nil
}

一般的beats中,都會有一些共同屬性。例如下面的done和client屬性。

// Journalbeat is the main Journalbeat struct
type Journalbeat struct {
    done   chan struct{}
    config config.Config
    client publisher.Client

    journal *sdjournal.Journal

    cursorChan         chan string
    pending, completed chan *eventReference
    wg                 sync.WaitGroup
}

done是一個控制整個beater啟停的信號量。
而client 是與beats平臺通信的client。注意在初始化的時候,

jb.client = b.Publisher.Connect()

建立鏈接。
然后在收集到數據,發送的時候,也是通過該client

select {
        case <-jb.done:
            return nil
        default:
            // we need to clone to avoid races since map is a pointer...
            jb.client.PublishEvent(ref.body.Clone(), publisher.Signal(&eventSignal{ref, jb.completed}), publisher.Guaranteed)
        }

注意上邊的發送姿勢和對于剛才提到的done信號量使用。
其他方法都是業務相關不再詳細解讀了。

journalbeat如何保證發送失敗的日志重新發送

關于這點,個人感覺是最優雅的部分

所有發送失敗的日志是會在程序結束之前以json格式保存到文件,完成持久化。
    // on exit fully consume both queues and flush to disk the pending queue
    defer func() {
        var wg sync.WaitGroup
        wg.Add(2)

        go func() {
            defer wg.Done()
            for evRef := range jb.pending {
                pending[evRef.cursor] = evRef.body
            }
        }()

        go func() {
            defer wg.Done()
            for evRef := range jb.completed {
                completed[evRef.cursor] = evRef.body
            }
        }()
        wg.Wait()

        logp.Info("Saving the pending queue, consists of %d messages", len(diff(pending, completed)))
        if err := flush(diff(pending, completed), jb.config.PendingQueue.File); err != nil {
            logp.Err("error writing pending queue %s: %s", jb.config.PendingQueue.File, err)
        }
    }()
程序啟動以后首先會讀取之前持久化的發送失敗的日志,重新發送
// load the previously saved queue of unsent events and try to publish them if any
    if err := jb.publishPending(); err != nil {
        logp.Warn("could not read the pending queue: %s", err)
    }
client publish收集到的日志到beats,設置了publisher.Guaranteed模式,成功和失敗都有反饋
jb.client.PublishEvent(ref.body.Clone(), publisher.Signal(&eventSignal{ref, jb.completed}), publisher.Guaranteed)

其中publisher.Signal(&eventSignal{ref, jb.completed})類似于一個回調,凡是成功的都會寫成功的ref到jb.completed中。方便客戶端控制。

維護了兩個chan,一個存放客戶端發送的日志,一個存放服務端接受成功的日志,精確對比,可獲取發送失敗的日志,進入重發動作

journalbeat struct中有下面兩個屬性

    pending, completed chan *eventReference

每次客戶端發送一條日志,都會寫到pending。

case publishedChan <- jb.client.PublishEvent(event, publisher.Signal(&eventSignal{ref, jb.completed}), publisher.Guaranteed):
            if published := <-publishedChan; published {
                jb.pending <- ref

                // save cursor
                if jb.config.WriteCursorState {
                    jb.cursorChan <- rawEvent.Cursor
                }
            }
        }

publisher.Signal(&eventSignal{ref, jb.completed}),回調會將成功的寫到completed。
整個程序同時會啟動一個
go jb.managePendingQueueLoop()
協程,專門用來定時重發失敗日志。

// managePendingQueueLoop runs the loop which manages the set of events waiting to be acked
func (jb *Journalbeat) managePendingQueueLoop() {
    jb.wg.Add(1)
    defer jb.wg.Done()
    pending := map[string]common.MapStr{}
    completed := map[string]common.MapStr{}

    // diff returns the difference between this map and the other.
    diff := func(this, other map[string]common.MapStr) map[string]common.MapStr {
        result := map[string]common.MapStr{}
        for k, v := range this {
            if _, ok := other[k]; !ok {
                result[k] = v
            }
        }
        return result
    }

    // flush saves the map[string]common.MapStr to the JSON file on disk
    flush := func(source map[string]common.MapStr, dest string) error {
        tempFile, err := ioutil.TempFile(filepath.Dir(dest), fmt.Sprintf(".%s", filepath.Base(dest)))
        if err != nil {
            return err
        }

        if err = json.NewEncoder(tempFile).Encode(source); err != nil {
            _ = tempFile.Close()
            return err
        }

        _ = tempFile.Close()
        return os.Rename(tempFile.Name(), dest)
    }

    // on exit fully consume both queues and flush to disk the pending queue
    defer func() {
        var wg sync.WaitGroup
        wg.Add(2)

        go func() {
            defer wg.Done()
            for evRef := range jb.pending {
                pending[evRef.cursor] = evRef.body
            }
        }()

        go func() {
            defer wg.Done()
            for evRef := range jb.completed {
                completed[evRef.cursor] = evRef.body
            }
        }()
        wg.Wait()

        logp.Info("Saving the pending queue, consists of %d messages", len(diff(pending, completed)))
        if err := flush(diff(pending, completed), jb.config.PendingQueue.File); err != nil {
            logp.Err("error writing pending queue %s: %s", jb.config.PendingQueue.File, err)
        }
    }()

    // flush the pending queue to disk periodically
    tick := time.Tick(jb.config.PendingQueue.FlushPeriod)
    for {
        select {
        case <-jb.done:
            return
        case p, ok := <-jb.pending:
            if ok {
                pending[p.cursor] = p.body
            }
        case c, ok := <-jb.completed:
            if ok {
                completed[c.cursor] = c.body
            }
        case <-tick:
            result := diff(pending, completed)
            if err := flush(result, jb.config.PendingQueue.File); err != nil {
                logp.Err("error writing %s: %s", jb.config.PendingQueue.File, err)
            }
            pending = result
            completed = map[string]common.MapStr{}
        }
    }
}
總結

當然還有一些其他的細節,不再一一講述了。比如定時寫Cursor的功能和日志格式轉換等。具體的大家可以看源碼。主要是講了我認為其優雅的部分和為beats編寫beater的要點。

文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。

轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/32627.html

相關文章

  • k8s日志--journalbeat源碼解讀

    摘要:但是也存在諸多的問題,隨著新設備的出現以及對安全的重視,這些缺點越發顯得突出,例如日志消息內容無法驗證數據格式松散日志檢索低效有限的元數據保存無法記錄二進制數據等。該服務可以為項目增加一定數量的元數據。 前言 對于日志系統的重要性不言而喻,參照滬江的一篇關于日志系統的介紹,基本上日志數據在以下幾方面具有非常重要的作用: 數據查找:通過檢索日志信息,定位相應的 bug ,找出解決方案 ...

    Amio 評論0 收藏0
  • 快收藏!52篇25萬字,微服務、云原生、容器、K8S、Serverless精華文章集錦

    摘要:正在走遠,新年之初,小數精選過去一年閱讀量居高的技術干貨,從容器到微服務云原生,匯集成篇精華集錦,充分反映了這一年的技術熱點走向。此文值得收藏,方便隨時搜索和查看。,小數將繼續陪伴大家,為朋友們奉獻更有逼格的技術內容。 2017正在走遠,新年之初,小數精選過去一年閱讀量居高的技術干貨,從容器、K8S 到微服務、云原生、Service Mesh,匯集成52篇精華集錦,充分反映了這一年的技...

    AaronYuan 評論0 收藏0
  • k8s網絡--Flannel源碼分析

    摘要:今天主要針對版本進行源碼分析。外部接口的定義如下創建子網管理器負責子網的創建更新添加刪除監聽等,主要和打交道定義續約。在到期之前,子網管理器調用該方法進行續約。 前言 之前在k8s與網絡--Flannel解讀一文中,我們主要講了Flannel整體的工作原理。今天主要針對Flannel v0.10.0版本進行源碼分析。首先需要理解三個比較重要的概念: 網絡(Network):整個集群中...

    wpw 評論0 收藏0
  • k8s網絡--Flannel源碼分析

    摘要:今天主要針對版本進行源碼分析。外部接口的定義如下創建子網管理器負責子網的創建更新添加刪除監聽等,主要和打交道定義續約。在到期之前,子網管理器調用該方法進行續約。 前言 之前在k8s與網絡--Flannel解讀一文中,我們主要講了Flannel整體的工作原理。今天主要針對Flannel v0.10.0版本進行源碼分析。首先需要理解三個比較重要的概念: 網絡(Network):整個集群中...

    hoohack 評論0 收藏0
  • k8s網絡--Flannel源碼分析

    摘要:今天主要針對版本進行源碼分析。外部接口的定義如下創建子網管理器負責子網的創建更新添加刪除監聽等,主要和打交道定義續約。在到期之前,子網管理器調用該方法進行續約。 前言 之前在k8s與網絡--Flannel解讀一文中,我們主要講了Flannel整體的工作原理。今天主要針對Flannel v0.10.0版本進行源碼分析。首先需要理解三個比較重要的概念: 網絡(Network):整個集群中...

    Jeffrrey 評論0 收藏0

發表評論

0條評論

最新活動
閱讀需要支付1元查看
<