diff --git a/README.md b/README.md index f74bd5f4..42dc6525 100644 --- a/README.md +++ b/README.md @@ -522,3 +522,22 @@ receivers: foo: bar url: http://127.0.0.1:3100/loki/api/v1/push ``` + +### Fluentd + +[Fluentd](https://www.fluentd.org/) or [Fluentbit](https://fluentbit.io/) are open source data collectors, which lets you unify the data collection and consumption for a better use and understanding of data. You might choose to send all events to Fluentd or Fluentbit to centralize logging and further route or process logs as needed. This can be especially useful for monitoring, alerting, and analyzing Kubernetes events. +Note: Fluentd/Fleuntbit should be configured with in_tcp or in_forwarding input plugins. + +```yaml +# ... +receivers: + - name: "dump" + fluent: + host: "localhost" + port: 5500 + tag: "kube-event" + bufferSize: 1048576 + # If set to true, all dots in labels and annotation keys are replaced by underscores. Defaults false + deDot: true|false + layout: # Optional +``` \ No newline at end of file diff --git a/go.mod b/go.mod index 28b541b9..d53eb343 100644 --- a/go.mod +++ b/go.mod @@ -27,6 +27,9 @@ require ( ) require ( + github.com/fluent/fluent-logger-golang v1.9.0 // indirect + github.com/philhofer/fwd v1.1.3-0.20240916144458-20a13a1f6b7c // indirect + github.com/tinylib/msgp v1.2.5 // indirect github.com/xdg-go/pbkdf2 v1.0.0 // indirect github.com/xdg-go/stringprep v1.0.4 // indirect ) diff --git a/go.sum b/go.sum index 79562f97..770a6e83 100644 --- a/go.sum +++ b/go.sum @@ -70,6 +70,8 @@ github.com/evanphx/json-patch v4.12.0+incompatible h1:4onqiflcdA9EOZ4RxV643DvftH github.com/fatih/color v1.15.0 h1:kOqh6YHBtK8aywxGerMG2Eq3H6Qgoqeo13Bk2Mv/nBs= github.com/fatih/color v1.15.0/go.mod h1:0h5ZqXfHYED7Bhv2ZJamyIOUej9KtShiJESRwBDUSsw= github.com/flowstack/go-jsonschema v0.1.1/go.mod h1:yL7fNggx1o8rm9RlgXv7hTBWxdBM0rVwpMwimd3F3N0= +github.com/fluent/fluent-logger-golang v1.9.0 h1:zUdY44CHX2oIUc7VTNZc+4m+ORuO/mldQDA7czhWXEg= +github.com/fluent/fluent-logger-golang v1.9.0/go.mod h1:2/HCT/jTy78yGyeNGQLGQsjF3zzzAuy6Xlk6FCMV5eU= github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/go-kit/log v0.2.1 h1:MRVx0/zhvdseW+Gza6N9rVzU/IVzaeE1SFI4raAhmBU= @@ -246,6 +248,8 @@ github.com/opensearch-project/opensearch-go v1.1.0 h1:eG5sh3843bbU1itPRjA9QXbxcg github.com/opensearch-project/opensearch-go v1.1.0/go.mod h1:+6/XHCuTH+fwsMJikZEWsucZ4eZMma3zNSeLrTtVGbo= github.com/opsgenie/opsgenie-go-sdk-v2 v1.2.14 h1:ni+M5q9QIZQq5xQiYzbttDZpPQogPWx8MdHpvZtWcTE= github.com/opsgenie/opsgenie-go-sdk-v2 v1.2.14/go.mod h1:4OjcxgwdXzezqytxN534MooNmrxRD50geWZxTD7845s= +github.com/philhofer/fwd v1.1.3-0.20240916144458-20a13a1f6b7c h1:dAMKvw0MlJT1GshSTtih8C2gDs04w8dReiOGXrGLNoY= +github.com/philhofer/fwd v1.1.3-0.20240916144458-20a13a1f6b7c/go.mod h1:RqIHx9QI14HlwKwm98g9Re5prTQ6LdeRQn+gXJFxsJM= github.com/pierrec/lz4/v4 v4.1.17 h1:kV4Ip+/hUBC+8T6+2EgburRtkE9ef4nbY3f4dFhGjMc= github.com/pierrec/lz4/v4 v4.1.17/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= @@ -298,6 +302,8 @@ github.com/stretchr/testify v1.7.5/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/tinylib/msgp v1.2.5 h1:WeQg1whrXRFiZusidTQqzETkRpGjFjcIhW6uqWH09po= +github.com/tinylib/msgp v1.2.5/go.mod h1:ykjzy2wzgrlvpDCRc4LA8UXy6D8bzMSuAF3WD57Gok0= github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c= github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY= diff --git a/pkg/sinks/fluentd.go b/pkg/sinks/fluentd.go new file mode 100644 index 00000000..f5a312c5 --- /dev/null +++ b/pkg/sinks/fluentd.go @@ -0,0 +1,82 @@ +package sinks + +import ( + "context" + "encoding/json" + "fmt" + + "github.com/fluent/fluent-logger-golang/fluent" + "github.com/resmoio/kubernetes-event-exporter/pkg/kube" + "github.com/rs/zerolog/log" +) + +// FluentConfig holds the configuration for sending events to Fluent Bit over TCP +type FluentConfig struct { + Host string `yaml:"host"` + Port int `yaml:"port"` + Tag string `yaml:"tag"` + // DeDot all labels and annotations in the event. For both the event and the involvedObject + DeDot bool `yaml:"deDot"` + Layout map[string]interface{} `yaml:"layout"` + BufferSize int `yaml:"bufferSize"` +} + +func NewFluent(cfg *FluentConfig) (*Fluent, error) { + // Connection specific + logger, err := fluent.New(fluent.Config{ + FluentHost: cfg.Host, + FluentPort: cfg.Port, + BufferLimit: cfg.BufferSize, + }) + if err != nil { + return nil, fmt.Errorf("failed to create Fluent Bit logger: %w", err) + } + + return &Fluent{ + logger: logger, + tag: cfg.Tag, + cfg: cfg, + }, nil +} + +type Fluent struct { + logger *fluent.Fluent + tag string + cfg *FluentConfig +} + +func (f *Fluent) Send(ctx context.Context, ev *kube.EnhancedEvent) error { + // Sends events to Fluentd/Fluent Bit with INPUT type tcp + var toSend map[string]interface{} + + if f.cfg.DeDot { + de := ev.DeDot() + ev = &de + } + if f.cfg.Layout != nil { + res, err := convertLayoutTemplate(f.cfg.Layout, ev) + if err != nil { + return fmt.Errorf("failed to convert layout template: %w", err) + } + toSend = res + } else { + jsonData := ev.ToJSON() + if err := json.Unmarshal(jsonData, &toSend); err != nil { + return fmt.Errorf("failed to unmarshal JSON data: %w", err) + } + } + + err := f.logger.Post(f.tag, toSend) + if err != nil { + log.Error().Msgf("Failed to send event to Fluent Bit: %v", err) + return fmt.Errorf("failed to send event: %w", err) + } + + return nil +} + +func (f *Fluent) Close() { + if err := f.logger.Close(); err != nil { + log.Error().Msgf("Failed to close Fluent Bit logger: %v", err) + } +} diff --git a/pkg/sinks/receiver.go b/pkg/sinks/receiver.go index 21fc35a5..40117e31 100644 --- a/pkg/sinks/receiver.go +++ b/pkg/sinks/receiver.go @@ -11,6 +11,7 @@ type ReceiverConfig struct { Syslog *SyslogConfig `yaml:"syslog"` Stdout *StdoutConfig `yaml:"stdout"` Elasticsearch *ElasticsearchConfig `yaml:"elasticsearch"` + Fluent *FluentConfig `yaml:"fluent"` Kinesis *KinesisConfig `yaml:"kinesis"` Firehose *FirehoseConfig `yaml:"firehose"` OpenSearch *OpenSearchConfig `yaml:"opensearch"` @@ -66,6 +67,10 @@ func (r *ReceiverConfig) GetSink() (Sink, error) { return NewElasticsearch(r.Elasticsearch) } + if r.Fluent != nil { + return NewFluent(r.Fluent) + } + if r.Kinesis != nil { return NewKinesisSink(r.Kinesis) }