国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專欄INFORMATION COLUMN

k8s與日志--采用golang實現Fluent Bit的output插件

binta / 1469人閱讀

摘要:采用實現的插件前言目前社區日志采集和處理的組件不少,之前方案中的,社區中的,方案中的以及大數據用到比較多的。適合采用的方案,實現日志中心化收集的方案。主要負責采集,負責處理和傳送。

采用golang實現Fluent Bit的output插件 前言

目前社區日志采集和處理的組件不少,之前elk方案中的logstash,cncf社區中的fluentd,efk方案中的filebeat,以及大數據用到比較多的flume。而Fluent Bit是一款用c語言編寫的高性能的日志收集組件,整個架構源于fluentd。官方比較數據如下:

Fluentd Fluent Bit
Scope Containers / Servers Containers / Servers
Language C & Ruby C
Memory ~40MB ~450KB
Performance High Performance High Performance
Dependencies Built as a Ruby Gem, it requires a certain number of gems. Zero dependencies, unless some special plugin requires them.
Plugins More than 650 plugins available Around 35 plugins available
License Apache License v2.0 Apache License v2.0

通過數據可以看出,fluent bit 占用資源更少。適合采用fluent bit + fluentd 的方案,實現日志中心化收集的方案。fluent bit主要負責采集,fluentd負責處理和傳送。

擴展output插件

fluent bit 本身是C語言編寫,擴展插件有一定的難度。可能官方考慮到這一點,實現了fluent-bit-go,可以實現采用go語言來編寫插件,目前只支持output的編寫。
fluent-bit-go其實就是利用cgo,封裝了c接口。代碼比較簡單,主要分析其中一個關鍵文件

package output

/*
#include 
#include "flb_plugin.h"
#include "flb_output.h"
*/
import "C"
import "fmt"
import "unsafe"

// Define constants matching Fluent Bit core
const FLB_ERROR               =  C.FLB_ERROR
const FLB_OK                  =  C.FLB_OK
const FLB_RETRY               =  C.FLB_RETRY

const FLB_PROXY_OUTPUT_PLUGIN =  C.FLB_PROXY_OUTPUT_PLUGIN
const FLB_PROXY_GOLANG        =  C.FLB_PROXY_GOLANG

// Local type to define a plugin definition
type FLBPlugin C.struct_flb_plugin_proxy
type FLBOutPlugin C.struct_flbgo_output_plugin

// When the FLBPluginInit is triggered by Fluent Bit, a plugin context
// is passed and the next step is to invoke this FLBPluginRegister() function
// to fill the required information: type, proxy type, flags name and
// description.
func FLBPluginRegister(ctx unsafe.Pointer, name string, desc string) int {
    p := (*FLBPlugin) (unsafe.Pointer(ctx))
    p._type = FLB_PROXY_OUTPUT_PLUGIN
    p.proxy = FLB_PROXY_GOLANG
    p.flags = 0
    p.name  = C.CString(name)
    p.description = C.CString(desc)
    return 0
}

// Release resources allocated by the plugin initialization
func FLBPluginUnregister(ctx unsafe.Pointer) {
    p := (*FLBPlugin) (unsafe.Pointer(ctx))
    fmt.Printf("[flbgo] unregistering %v
", p)
    C.free(unsafe.Pointer(p.name))
    C.free(unsafe.Pointer(p.description))
}

func FLBPluginConfigKey(ctx unsafe.Pointer, key string) string {
    _key := C.CString(key)
    return C.GoString(C.output_get_property(_key, unsafe.Pointer(ctx)))
}

主要是定義了一些編寫插件需要用到的變量和方法,例如FLBPluginRegister注冊組件,FLBPluginConfigKey獲取配置文件設定參數等。
PS
實際上用golang調用fluent-bit-go,再加一些實際的業務邏輯實現,最終編譯成一個c-share的.so動態鏈接庫。

定制fluent-bit-kafka-ouput插件

實際上,fluent-bit v0.13版本以后就提供了kafka output的插件,但是實際項目中,并不滿足我們的需求,必須定制化。
當然接下來的代碼主要是作為一個demo,講清楚如何編寫一個output插件。

代碼編寫和分析

先上代碼:

package main

import (
    "C"
    "fmt"
    "io"
    "log"
    "reflect"
    "strconv"
    "strings"
    "time"
    "unsafe"

    "github.com/Shopify/sarama"
    "github.com/fluent/fluent-bit-go/output"
    "github.com/ugorji/go/codec"
)

var (
    brokers    []string
    producer   sarama.SyncProducer
    timeout    = 0 * time.Minute
    topic      string
    module     string
    messageKey string
)

//export FLBPluginRegister
func FLBPluginRegister(ctx unsafe.Pointer) int {
    return output.FLBPluginRegister(ctx, "out_kafka", "Kafka Output Plugin.!")
}

//export FLBPluginInit
// ctx (context) pointer to fluentbit context (state/ c code)
func FLBPluginInit(ctx unsafe.Pointer) int {

    if bs := output.FLBPluginConfigKey(ctx, "brokers"); bs != "" {
        brokers = strings.Split(bs, ",")
    } else {
        log.Printf("you must set brokers")
        return output.FLB_ERROR
    }

    if tp := output.FLBPluginConfigKey(ctx, "topics"); tp != "" {
        topic = tp
    } else {
        log.Printf("you must set topics")
        return output.FLB_ERROR
    }

    if mo := output.FLBPluginConfigKey(ctx, "module"); mo != "" {
        module = mo
    } else {
        log.Printf("you must set module")
        return output.FLB_ERROR
    }

    if key := output.FLBPluginConfigKey(ctx, "message_key"); key != "" {
        messageKey = key
    } else {
        log.Printf("you must set message_key")
        return output.FLB_ERROR
    }

    config := sarama.NewConfig()
    config.Producer.Return.Successes = true

    if required_acks := output.FLBPluginConfigKey(ctx, "required_acks"); required_acks != "" {
        if acks, err := strconv.Atoi(required_acks); err == nil {
            config.Producer.RequiredAcks = sarama.RequiredAcks(acks)
        }
    }

    if compression_codec := output.FLBPluginConfigKey(ctx, "compression_codec"); compression_codec != "" {
        if codec, err := strconv.Atoi(compression_codec); err == nil {
            config.Producer.Compression = sarama.CompressionCodec(codec)
        }
    }

    if max_retry := output.FLBPluginConfigKey(ctx, "max_retry"); max_retry != "" {
        if max_retry, err := strconv.Atoi(max_retry); err == nil {
            config.Producer.Retry.Max = max_retry
        }
    }

    if timeout == 0 {
        timeout = 5 * time.Minute
    }
    // If Kafka is not running on init, wait to connect
    deadline := time.Now().Add(timeout)
    for tries := 0; time.Now().Before(deadline); tries++ {
        var err error
        if producer == nil {
            producer, err = sarama.NewSyncProducer(brokers, config)
        }
        if err == nil {
            return output.FLB_OK
        }
        log.Printf("Cannot connect to Kafka: (%s) retrying...", err)
        time.Sleep(time.Second * 30)
    }

    log.Printf("Kafka failed to respond after %s", timeout)
    return output.FLB_ERROR

}

//export FLBPluginFlush
// FLBPluginFlush is called from fluent-bit when data need to be sent. is called from fluent-bit when data need to be sent.
func FLBPluginFlush(data unsafe.Pointer, length C.int, tag *C.char) int {
    var h codec.MsgpackHandle

    var b []byte
    var m interface{}
    var err error

    b = C.GoBytes(data, length)
    dec := codec.NewDecoderBytes(b, &h)

    // Iterate the original MessagePack array
    var msgs []*sarama.ProducerMessage
    for {
        // decode the msgpack data
        err = dec.Decode(&m)
        if err != nil {
            if err == io.EOF {
                break
            }
            log.Printf("Failed to decode msgpack data: %v
", err)
            return output.FLB_ERROR
        }

        // Get a slice and their two entries: timestamp and map
        slice := reflect.ValueOf(m)
        data := slice.Index(1)

        // Convert slice data to a real map and iterate
        mapData := data.Interface().(map[interface{}]interface{})
        flattenData, err := Flatten(mapData, "", UnderscoreStyle)
        if err != nil {
            break
        }

        message := ""
        host := ""

        for k, v := range flattenData {
            value := ""
            switch t := v.(type) {
            case string:
                value = t
            case []byte:
                value = string(t)
            default:
                value = fmt.Sprintf("%v", v)
            }

            if k == "pod_name" {
                host = value
            }

            if k == messageKey {
                message = value
            }

        }
        if message == "" || host == "" {
            break
        }

        m := &sarama.ProducerMessage{
            Topic: topic,
            Key:   sarama.StringEncoder(fmt.Sprintf("host=%s|module=%s", host, module)),
            Value: sarama.ByteEncoder(message),
        }
        msgs = append(msgs, m)

    }

    err = producer.SendMessages(msgs)
    if err != nil {
        log.Printf("FAILED to send kafka message: %s
", err)
        return output.FLB_ERROR
    }
    return output.FLB_OK
}

//export FLBPluginExit
func FLBPluginExit() int {
    producer.Close()
    return output.FLB_OK
}

func main() {
}

FLBPluginExit 插件退出的時候需要執行的一些方法,比如關閉連接。

FLBPluginRegister 注冊插件

FLBPluginInit 插件初始化

FLBPluginFlush flush到數據到output

FLBPluginConfigKey 獲取配置文件中參數

PS
當然除了FLBPluginConfigKey之外,也可以通過獲取環境變量來獲得設置參數。
ctx相當于一個上下文,負責之間的數據的傳遞。

編譯和執行

編譯的時候

go build -buildmode=c-shared -o out_kafka.so .

生成out_kafka.so

執行的時候

/fluent-bit/bin/fluent-bit" -c /fluent-bit/etc/fluent-bit.conf -e /fluent-bit/out_kafka.so
總結

采用類似的編寫結構,就可以定制化自己的輸出插件了。

文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。

轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/32693.html

相關文章

  • k8s日志--采用golang實現Fluent Bitoutput插件

    摘要:采用實現的插件前言目前社區日志采集和處理的組件不少,之前方案中的,社區中的,方案中的以及大數據用到比較多的。適合采用的方案,實現日志中心化收集的方案。主要負責采集,負責處理和傳送。 采用golang實現Fluent Bit的output插件 前言 目前社區日志采集和處理的組件不少,之前elk方案中的logstash,cncf社區中的fluentd,efk方案中的filebeat,以及大...

    岳光 評論0 收藏0
  • k8slog--利用fluent bit收集k8s日志

    摘要:是一個開源和多平臺的,它允許您從不同的來源收集數據日志,統一并將它們發送到多個目的地。例如日志收集日志分析主要講部署的集群。日志主要有和的日志,一般采用部署,自然而然就是要支持格式日志的采集。業務落盤的日志。部署方案采取部署。 前言 收集日志的組件多不勝數,有ELK久負盛名組合中的logstash, 也有EFK組合中的filebeat,更有cncf新貴fluentd,另外還有大數據領域...

    betacat 評論0 收藏0
  • k8slog--利用fluent bit收集k8s日志

    摘要:是一個開源和多平臺的,它允許您從不同的來源收集數據日志,統一并將它們發送到多個目的地。例如日志收集日志分析主要講部署的集群。日志主要有和的日志,一般采用部署,自然而然就是要支持格式日志的采集。業務落盤的日志。部署方案采取部署。 前言 收集日志的組件多不勝數,有ELK久負盛名組合中的logstash, 也有EFK組合中的filebeat,更有cncf新貴fluentd,另外還有大數據領域...

    CoffeX 評論0 收藏0
  • 關于k8s集群容器日志收集總結

    摘要:我推薦你使用進行日志收集,將作為的出口。集群目前暫時沒有提供日志查看機制。以如下的形式啟動容器,容器日志將發往配置的。 【作者barnett】本文介紹了k8s官方提供的日志收集方法,并介紹了Fluentd日志收集器并與其他產品做了比較。最后介紹了好雨云幫如何對k8s進行改造并使用ZeroMQ以消息的形式將日志傳輸到統一的日志處理中心。 容器日志存在形式 目前容器日志有兩種輸出形式: ...

    jeffrey_up 評論0 收藏0
  • 關于k8s集群容器日志收集總結

    摘要:我推薦你使用進行日志收集,將作為的出口。集群目前暫時沒有提供日志查看機制。以如下的形式啟動容器,容器日志將發往配置的。 【作者barnett】本文介紹了k8s官方提供的日志收集方法,并介紹了Fluentd日志收集器并與其他產品做了比較。最后介紹了好雨云幫如何對k8s進行改造并使用ZeroMQ以消息的形式將日志傳輸到統一的日志處理中心。 容器日志存在形式 目前容器日志有兩種輸出形式: ...

    or0fun 評論0 收藏0

發表評論

0條評論

最新活動
閱讀需要支付1元查看
<