博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
k8s与日志--采用golang实现Fluent Bit的output插件
阅读量:5844 次
发布时间:2019-06-18

本文共 7889 字,大约阅读时间需要 26 分钟。

采用golang实现Fluent Bit的output插件

前言

目前社区日志采集和处理的组件不少,之前elk方案中的logstash,cncf社区中的fluentd,efk方案中的filebeat,以及大数据用到比较多的flume。而Fluent Bit是一款用c语言编写的高性能的日志收集组件,整个架构源于fluentd。官方比较数据如下:

Fluentd Fluent Bit
Scope Containers / Servers Containers / Servers
Language C & Ruby C
Memory ~40MB ~450KB
Performance High Performance High Performance
Dependencies Built as a Ruby Gem, it requires a certain number of gems. Zero dependencies, unless some special plugin requires them.
Plugins More than 650 plugins available Around 35 plugins available
License

通过数据可以看出,fluent bit 占用资源更少。适合采用fluent bit + fluentd 的方案,实现日志中心化收集的方案。fluent bit主要负责采集,fluentd负责处理和传送。

扩展output插件

fluent bit 本身是C语言编写,扩展插件有一定的难度。可能官方考虑到这一点,实现了,可以实现采用go语言来编写插件,目前只支持output的编写。

fluent-bit-go其实就是利用cgo,封装了c接口。代码比较简单,主要分析其中一个关键文件

package output/*#include 
#include "flb_plugin.h"#include "flb_output.h"*/import "C"import "fmt"import "unsafe"// Define constants matching Fluent Bit coreconst FLB_ERROR = C.FLB_ERRORconst FLB_OK = C.FLB_OKconst FLB_RETRY = C.FLB_RETRYconst FLB_PROXY_OUTPUT_PLUGIN = C.FLB_PROXY_OUTPUT_PLUGINconst FLB_PROXY_GOLANG = C.FLB_PROXY_GOLANG// Local type to define a plugin definitiontype FLBPlugin C.struct_flb_plugin_proxytype FLBOutPlugin C.struct_flbgo_output_plugin// When the FLBPluginInit is triggered by Fluent Bit, a plugin context// is passed and the next step is to invoke this FLBPluginRegister() function// to fill the required information: type, proxy type, flags name and// description.func FLBPluginRegister(ctx unsafe.Pointer, name string, desc string) int { p := (*FLBPlugin) (unsafe.Pointer(ctx)) p._type = FLB_PROXY_OUTPUT_PLUGIN p.proxy = FLB_PROXY_GOLANG p.flags = 0 p.name = C.CString(name) p.description = C.CString(desc) return 0}// Release resources allocated by the plugin initializationfunc FLBPluginUnregister(ctx unsafe.Pointer) { p := (*FLBPlugin) (unsafe.Pointer(ctx)) fmt.Printf("[flbgo] unregistering %v\n", p) C.free(unsafe.Pointer(p.name)) C.free(unsafe.Pointer(p.description))}func FLBPluginConfigKey(ctx unsafe.Pointer, key string) string { _key := C.CString(key) return C.GoString(C.output_get_property(_key, unsafe.Pointer(ctx)))}

主要是定义了一些编写插件需要用到的变量和方法,例如FLBPluginRegister注册组件,FLBPluginConfigKey获取配置文件设定参数等。

PS
实际上用golang调用fluent-bit-go,再加一些实际的业务逻辑实现,最终编译成一个c-share的.so动态链接库。

定制fluent-bit-kafka-ouput插件

实际上,fluent-bit v0.13版本以后就提供了kafka output的插件,但是实际项目中,并不满足我们的需求,必须定制化。

当然接下来的代码主要是作为一个demo,讲清楚如何编写一个output插件。

代码编写和分析

先上代码:

package mainimport (    "C"    "fmt"    "io"    "log"    "reflect"    "strconv"    "strings"    "time"    "unsafe"    "github.com/Shopify/sarama"    "github.com/fluent/fluent-bit-go/output"    "github.com/ugorji/go/codec")var (    brokers    []string    producer   sarama.SyncProducer    timeout    = 0 * time.Minute    topic      string    module     string    messageKey string)//export FLBPluginRegisterfunc FLBPluginRegister(ctx unsafe.Pointer) int {    return output.FLBPluginRegister(ctx, "out_kafka", "Kafka Output Plugin.!")}//export FLBPluginInit// ctx (context) pointer to fluentbit context (state/ c code)func FLBPluginInit(ctx unsafe.Pointer) int {    if bs := output.FLBPluginConfigKey(ctx, "brokers"); bs != "" {        brokers = strings.Split(bs, ",")    } else {        log.Printf("you must set brokers")        return output.FLB_ERROR    }    if tp := output.FLBPluginConfigKey(ctx, "topics"); tp != "" {        topic = tp    } else {        log.Printf("you must set topics")        return output.FLB_ERROR    }    if mo := output.FLBPluginConfigKey(ctx, "module"); mo != "" {        module = mo    } else {        log.Printf("you must set module")        return output.FLB_ERROR    }    if key := output.FLBPluginConfigKey(ctx, "message_key"); key != "" {        messageKey = key    } else {        log.Printf("you must set message_key")        return output.FLB_ERROR    }    config := sarama.NewConfig()    config.Producer.Return.Successes = true    if required_acks := output.FLBPluginConfigKey(ctx, "required_acks"); required_acks != "" {        if acks, err := strconv.Atoi(required_acks); err == nil {            config.Producer.RequiredAcks = sarama.RequiredAcks(acks)        }    }    if compression_codec := output.FLBPluginConfigKey(ctx, "compression_codec"); compression_codec != "" {        if codec, err := strconv.Atoi(compression_codec); err == nil {            config.Producer.Compression = sarama.CompressionCodec(codec)        }    }    if max_retry := output.FLBPluginConfigKey(ctx, "max_retry"); max_retry != "" {        if max_retry, err := strconv.Atoi(max_retry); err == nil {            config.Producer.Retry.Max = max_retry        }    }    if timeout == 0 {        timeout = 5 * time.Minute    }    // If Kafka is not running on init, wait to connect    deadline := time.Now().Add(timeout)    for tries := 0; time.Now().Before(deadline); tries++ {        var err error        if producer == nil {            producer, err = sarama.NewSyncProducer(brokers, config)        }        if err == nil {            return output.FLB_OK        }        log.Printf("Cannot connect to Kafka: (%s) retrying...", err)        time.Sleep(time.Second * 30)    }    log.Printf("Kafka failed to respond after %s", timeout)    return output.FLB_ERROR}//export FLBPluginFlush// FLBPluginFlush is called from fluent-bit when data need to be sent. is called from fluent-bit when data need to be sent.func FLBPluginFlush(data unsafe.Pointer, length C.int, tag *C.char) int {    var h codec.MsgpackHandle    var b []byte    var m interface{}    var err error    b = C.GoBytes(data, length)    dec := codec.NewDecoderBytes(b, &h)    // Iterate the original MessagePack array    var msgs []*sarama.ProducerMessage    for {        // decode the msgpack data        err = dec.Decode(&m)        if err != nil {            if err == io.EOF {                break            }            log.Printf("Failed to decode msgpack data: %v\n", err)            return output.FLB_ERROR        }        // Get a slice and their two entries: timestamp and map        slice := reflect.ValueOf(m)        data := slice.Index(1)        // Convert slice data to a real map and iterate        mapData := data.Interface().(map[interface{}]interface{})        flattenData, err := Flatten(mapData, "", UnderscoreStyle)        if err != nil {            break        }        message := ""        host := ""        for k, v := range flattenData {            value := ""            switch t := v.(type) {            case string:                value = t            case []byte:                value = string(t)            default:                value = fmt.Sprintf("%v", v)            }            if k == "pod_name" {                host = value            }            if k == messageKey {                message = value            }        }        if message == "" || host == "" {            break        }        m := &sarama.ProducerMessage{            Topic: topic,            Key:   sarama.StringEncoder(fmt.Sprintf("host=%s|module=%s", host, module)),            Value: sarama.ByteEncoder(message),        }        msgs = append(msgs, m)    }    err = producer.SendMessages(msgs)    if err != nil {        log.Printf("FAILED to send kafka message: %s\n", err)        return output.FLB_ERROR    }    return output.FLB_OK}//export FLBPluginExitfunc FLBPluginExit() int {    producer.Close()    return output.FLB_OK}func main() {}
  • FLBPluginExit 插件退出的时候需要执行的一些方法,比如关闭连接。
  • FLBPluginRegister 注册插件
  • FLBPluginInit 插件初始化
  • FLBPluginFlush flush到数据到output
  • FLBPluginConfigKey 获取配置文件中参数

PS

当然除了FLBPluginConfigKey之外,也可以通过获取环境变量来获得设置参数。
ctx相当于一个上下文,负责之间的数据的传递。

编译和执行

编译的时候

go build -buildmode=c-shared -o out_kafka.so .

生成out_kafka.so

执行的时候

/fluent-bit/bin/fluent-bit" -c /fluent-bit/etc/fluent-bit.conf -e /fluent-bit/out_kafka.so

总结

采用类似的编写结构,就可以定制化自己的输出插件了。

转载地址:http://opqcx.baihongyu.com/

你可能感兴趣的文章
【C语言入门教程】4.7 指针的地址分配 - mallocl(), free()
查看>>
ArcGIS Portal 10.4 本地坐标系的web 3d地形展示制作说明
查看>>
北京长途汽车站一览表
查看>>
001_chrome工具详解
查看>>
SQL Script for read information from a csv file in FTP Server
查看>>
从S3中导入数据到Dynamodb
查看>>
Redis学习-String
查看>>
存储过程中SELECT INTO的使用
查看>>
c数据库读写分离和负载均衡策略
查看>>
[LeetCode] Maximum Product of Three Numbers 三个数字的最大乘积
查看>>
【Energy Forecasting】能源预測的发展和展望
查看>>
2015阿里秋招当中一个算法题(经典)
查看>>
android 利用cmdline,将參数从preloader传递到kernel
查看>>
一、K3 Wise 实施指导《K3 Wise实施手册》
查看>>
redis.conf 配置项说明
查看>>
快速幂总结
查看>>
pip安装报错处理+PyPi源切换教程
查看>>
linux下yum安装maven
查看>>
expect的安装与使用
查看>>
https遇到自签名证书/信任证书
查看>>