-
Notifications
You must be signed in to change notification settings - Fork 80
feat: Update Mongodb receiver to use LoggingReceiverMacro
#2003
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 9 commits
5e26c86
f620ecb
9e26aae
cbe4dbb
2aafb05
f0ae87d
1daa2bb
087c68e
0b2f024
f6a140b
ba3d258
38391cf
87897ed
75b5492
1bba7a3
7b4c005
866ee08
266d929
d864b85
7796e98
34beec6
dc8cdcd
37e85b3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -20,8 +20,6 @@ import ( | |
| "strings" | ||
|
|
||
| "github.com/GoogleCloudPlatform/ops-agent/confgenerator" | ||
| "github.com/GoogleCloudPlatform/ops-agent/confgenerator/fluentbit" | ||
| "github.com/GoogleCloudPlatform/ops-agent/confgenerator/fluentbit/modify" | ||
| "github.com/GoogleCloudPlatform/ops-agent/confgenerator/otel" | ||
| "github.com/GoogleCloudPlatform/ops-agent/internal/secret" | ||
| ) | ||
|
|
@@ -94,40 +92,39 @@ func init() { | |
| confgenerator.MetricsReceiverTypes.RegisterType(func() confgenerator.MetricsReceiver { return &MetricsReceiverMongoDB{} }) | ||
| } | ||
|
|
||
| type LoggingProcessorMongodb struct { | ||
| confgenerator.ConfigComponent `yaml:",inline"` | ||
| type LoggingProcessorMacroMongodb struct { | ||
| } | ||
|
|
||
| func (*LoggingProcessorMongodb) Type() string { | ||
| func (LoggingProcessorMacroMongodb) Type() string { | ||
| return "mongodb" | ||
| } | ||
|
|
||
| func (p *LoggingProcessorMongodb) Components(ctx context.Context, tag, uid string) []fluentbit.Component { | ||
| c := []fluentbit.Component{} | ||
| func (p LoggingProcessorMacroMongodb) Expand(ctx context.Context) []confgenerator.InternalLoggingProcessor { | ||
| c := []confgenerator.InternalLoggingProcessor{} | ||
|
|
||
| c = append(c, p.JsonLogComponents(ctx, tag, uid)...) | ||
| c = append(c, p.RegexLogComponents(tag, uid)...) | ||
| c = append(c, p.severityParser(ctx, tag, uid)...) | ||
| c = append(c, p.JsonLogComponents(ctx)...) | ||
| c = append(c, p.RegexLogComponents()...) | ||
| c = append(c, p.severityParser()...) | ||
|
|
||
| return c | ||
| } | ||
|
|
||
| // JsonLogComponents are the fluentbit components for parsing log messages that are json formatted. | ||
| // these are generally messages from mongo with versions greater than or equal to 4.4 | ||
| // documentation: https://docs.mongodb.com/v4.4/reference/log-messages/#log-message-format | ||
| func (p *LoggingProcessorMongodb) JsonLogComponents(ctx context.Context, tag, uid string) []fluentbit.Component { | ||
| c := p.jsonParserWithTimeKey(ctx, tag, uid) | ||
| func (p LoggingProcessorMacroMongodb) JsonLogComponents(ctx context.Context) []confgenerator.InternalLoggingProcessor { | ||
| c := p.jsonParserWithTimeKey() | ||
|
|
||
| c = append(c, p.promoteWiredTiger(tag, uid)...) | ||
| c = append(c, p.renames(tag, uid)...) | ||
| c = append(c, p.promoteWiredTiger()...) | ||
| c = append(c, p.renames()...) | ||
|
|
||
| return c | ||
| } | ||
|
|
||
| // jsonParserWithTimeKey requires promotion of the nested timekey for the json parser so we must | ||
| // first promote the $date field from the "t" field before declaring the parser | ||
| func (p *LoggingProcessorMongodb) jsonParserWithTimeKey(ctx context.Context, tag, uid string) []fluentbit.Component { | ||
| c := []fluentbit.Component{} | ||
| func (p LoggingProcessorMacroMongodb) jsonParserWithTimeKey() []confgenerator.InternalLoggingProcessor { | ||
| c := []confgenerator.InternalLoggingProcessor{} | ||
|
|
||
| jsonParser := &confgenerator.LoggingProcessorParseJson{ | ||
| ParserShared: confgenerator.ParserShared{ | ||
|
|
@@ -139,75 +136,40 @@ func (p *LoggingProcessorMongodb) jsonParserWithTimeKey(ctx context.Context, tag | |
| }, | ||
| }, | ||
| } | ||
| jpComponents := jsonParser.Components(ctx, tag, uid) | ||
|
|
||
| // The parserFilterComponent is the actual filter component that configures and defines | ||
| // which parser to use. We need the component to determine which parser to use when | ||
| // re-parsing below. Each time a parser filter is used, there are 2 filter components right | ||
| // before it to account for the nest lua script (see confgenerator/fluentbit/parse_deduplication.go). | ||
| // Therefore, the parse filter component is actually the third component in the list. | ||
| parserFilterComponent := jpComponents[2] | ||
| c = append(c, jpComponents...) | ||
| c = append(c, jsonParser) | ||
|
|
||
| tempPrefix := "temp_ts_" | ||
| timeKey := "time" | ||
| // have to bring $date to top level in order for it to be parsed as timeKey | ||
| // see https://github.com/fluent/fluent-bit/issues/1013 | ||
| liftTs := fluentbit.Component{ | ||
| Kind: "FILTER", | ||
| Config: map[string]string{ | ||
| "Name": "nest", | ||
| "Match": tag, | ||
| "Operation": "lift", | ||
| "Nested_under": "t", | ||
| "Add_prefix": tempPrefix, | ||
| }, | ||
| } | ||
| c = append(c, &confgenerator.LoggingProcessorNestLift{ | ||
| NestedUnder: "t", | ||
| AddPrefix: tempPrefix, | ||
| }) | ||
|
|
||
| renameTsOption := modify.NewHardRenameOptions(fmt.Sprintf("%s$date", tempPrefix), timeKey) | ||
| renameTs := renameTsOption.Component(tag) | ||
|
|
||
| c = append(c, liftTs, renameTs) | ||
| c = append(c, &confgenerator.LoggingProcessorHardRename{ | ||
| Field: fmt.Sprintf("%s$date", tempPrefix), | ||
| NewName: timeKey, | ||
| }) | ||
|
|
||
| // IMPORTANT: now that we have lifted the json to top level | ||
| // we need to re-parse in order to properly set time at the | ||
| // parser level | ||
| nestFilters := fluentbit.LuaFilterComponents(tag, fluentbit.ParserNestLuaFunction, fmt.Sprintf(fluentbit.ParserNestLuaScriptContents, "message")) | ||
| parserFilter := fluentbit.Component{ | ||
| Kind: "FILTER", | ||
| Config: map[string]string{ | ||
| "Name": "parser", | ||
| "Match": tag, | ||
| "Key_Name": "message", | ||
| "Reserve_Data": "True", | ||
| "Parser": parserFilterComponent.OrderedConfig[0][1], | ||
| }, | ||
| } | ||
| mergeFilters := fluentbit.LuaFilterComponents(tag, fluentbit.ParserMergeLuaFunction, fluentbit.ParserMergeLuaScriptContents) | ||
| c = append(c, nestFilters...) | ||
| c = append(c, parserFilter) | ||
| c = append(c, mergeFilters...) | ||
|
|
||
| removeTimestamp := fluentbit.Component{ | ||
| Kind: "FILTER", | ||
| Config: map[string]string{ | ||
| "Name": "modify", | ||
| "Match": tag, | ||
| "Remove": timeKey, | ||
| }, | ||
| } | ||
| c = append(c, removeTimestamp) | ||
| c = append(c, jsonParser) | ||
|
|
||
| c = append(c, &confgenerator.LoggingProcessorRemoveField{ | ||
| Field: timeKey, | ||
| }) | ||
|
|
||
| return c | ||
| } | ||
|
|
||
| // severityParser is used by both regex and json parser to ensure an "s" field on the entry gets translated | ||
| // to a valid logging.googleapis.com/seveirty field | ||
| func (p *LoggingProcessorMongodb) severityParser(ctx context.Context, tag, uid string) []fluentbit.Component { | ||
| severityComponents := []fluentbit.Component{} | ||
|
|
||
| severityComponents = append(severityComponents, | ||
| confgenerator.LoggingProcessorModifyFields{ | ||
| func (p LoggingProcessorMacroMongodb) severityParser() []confgenerator.InternalLoggingProcessor { | ||
| return []confgenerator.InternalLoggingProcessor{ | ||
| &confgenerator.LoggingProcessorModifyFields{ | ||
| Fields: map[string]*confgenerator.ModifyField{ | ||
| "jsonPayload.severity": { | ||
| MoveFrom: "jsonPayload.s", | ||
|
|
@@ -230,115 +192,95 @@ func (p *LoggingProcessorMongodb) severityParser(ctx context.Context, tag, uid s | |
| }, | ||
| InstrumentationSourceLabel: instrumentationSourceValue(p.Type()), | ||
| }, | ||
| }.Components(ctx, tag, uid)..., | ||
| ) | ||
|
|
||
| return severityComponents | ||
| }, | ||
| } | ||
| } | ||
|
|
||
| func (p *LoggingProcessorMongodb) renames(tag, uid string) []fluentbit.Component { | ||
| r := []fluentbit.Component{} | ||
| func (p LoggingProcessorMacroMongodb) renames() []confgenerator.InternalLoggingProcessor { | ||
| r := []confgenerator.InternalLoggingProcessor{} | ||
| renames := []struct { | ||
| src string | ||
| dest string | ||
| }{ | ||
| {"c", "component"}, | ||
| {"ctx", "context"}, | ||
| {"msg", "message"}, | ||
| {"jsonPayload.c", "jsonPayload.component"}, | ||
| {"jsonPayload.ctx", "jsonPayload.context"}, | ||
| {"jsonPayload.msg", "jsonPayload.message"}, | ||
| } | ||
|
|
||
| for _, rename := range renames { | ||
| rename := modify.NewRenameOptions(rename.src, rename.dest) | ||
| r = append(r, rename.Component(tag)) | ||
| r = append(r, &confgenerator.LoggingProcessorModifyFields{ | ||
| Fields: map[string]*confgenerator.ModifyField{ | ||
| rename.dest: { | ||
| MoveFrom: rename.src, | ||
| }, | ||
| }, | ||
| }) | ||
| } | ||
|
|
||
| return r | ||
| } | ||
|
|
||
| func (p *LoggingProcessorMongodb) promoteWiredTiger(tag, uid string) []fluentbit.Component { | ||
| // promote messages that are WiredTiger messages and are nested in attr.message | ||
| addPrefix := "temp_attributes_" | ||
| upNest := fluentbit.Component{ | ||
| Kind: "FILTER", | ||
| Config: map[string]string{ | ||
| "Name": "nest", | ||
| "Match": tag, | ||
| "Operation": "lift", | ||
| "Nested_under": "attr", | ||
| "Add_prefix": addPrefix, | ||
| }, | ||
| } | ||
| func (p LoggingProcessorMacroMongodb) promoteWiredTiger() []confgenerator.InternalLoggingProcessor { | ||
| // promote messages that are WiredTiger messages and are nested in attr.messagey | ||
| c := []confgenerator.InternalLoggingProcessor{} | ||
|
|
||
| hardRenameMessage := modify.NewHardRenameOptions(fmt.Sprintf("%smessage", addPrefix), "msg") | ||
| wiredTigerRename := hardRenameMessage.Component(tag) | ||
|
|
||
| renameRemainingAttributes := fluentbit.Component{ | ||
| Kind: "FILTER", | ||
| Config: map[string]string{ | ||
| "Name": "nest", | ||
| "Wildcard": fmt.Sprintf("%s*", addPrefix), | ||
| "Match": tag, | ||
| "Operation": "nest", | ||
| "Nest_under": "attributes", | ||
| "Remove_prefix": addPrefix, | ||
| }, | ||
| } | ||
| addPrefix := "temp_attributes_" | ||
| c = append(c, &confgenerator.LoggingProcessorNestLift{ | ||
| NestedUnder: "attr", | ||
| AddPrefix: addPrefix, | ||
| }) | ||
|
|
||
| c = append(c, &confgenerator.LoggingProcessorHardRename{ | ||
| Field: addPrefix + "message", | ||
| NewName: "msg", | ||
| }) | ||
|
|
||
| c = append(c, &confgenerator.LoggingProcessorNestWildcard{ | ||
| Wildcard: addPrefix + "*", | ||
| NestUnder: "attributes", | ||
| RemovePrefix: addPrefix, | ||
| }) | ||
|
|
||
| return []fluentbit.Component{upNest, wiredTigerRename, renameRemainingAttributes} | ||
| return c | ||
| } | ||
|
|
||
| func (p *LoggingProcessorMongodb) RegexLogComponents(tag, uid string) []fluentbit.Component { | ||
| c := []fluentbit.Component{} | ||
| parseKey := "message" | ||
| parser, parserName := fluentbit.ParserComponentBase("%Y-%m-%dT%H:%M:%S.%L%z", "timestamp", map[string]string{ | ||
| "message": "string", | ||
| "id": "integer", | ||
| "s": "string", | ||
| "component": "string", | ||
| "context": "string", | ||
| }, fmt.Sprintf("%s_regex", tag), uid) | ||
| parser.Config["Format"] = "regex" | ||
| parser.Config["Regex"] = `^(?<timestamp>[^ ]*)\s+(?<s>\w)\s+(?<component>[^ ]+)\s+\[(?<context>[^\]]+)]\s+(?<message>.*?) *(?<ms>(\d+))?(:?ms)?$` | ||
| parser.Config["Key_Name"] = parseKey | ||
|
|
||
| nestFilters := fluentbit.LuaFilterComponents(tag, fluentbit.ParserNestLuaFunction, fmt.Sprintf(fluentbit.ParserNestLuaScriptContents, parseKey)) | ||
| parserFilter := fluentbit.Component{ | ||
| Kind: "FILTER", | ||
| Config: map[string]string{ | ||
| "Match": tag, | ||
| "Name": "parser", | ||
| "Parser": parserName, | ||
| "Reserve_Data": "True", | ||
| "Key_Name": parseKey, | ||
| }, | ||
| } | ||
| mergeFilters := fluentbit.LuaFilterComponents(tag, fluentbit.ParserMergeLuaFunction, fluentbit.ParserMergeLuaScriptContents) | ||
| func (p LoggingProcessorMacroMongodb) RegexLogComponents() []confgenerator.InternalLoggingProcessor { | ||
| c := []confgenerator.InternalLoggingProcessor{} | ||
|
|
||
| c = append(c, parser) | ||
| c = append(c, nestFilters...) | ||
| c = append(c, parserFilter) | ||
| c = append(c, mergeFilters...) | ||
| c = append(c, &confgenerator.LoggingProcessorParseRegex{ | ||
| ParserShared: confgenerator.ParserShared{ | ||
| TimeKey: "timestamp", | ||
| TimeFormat: "%Y-%m-%dT%H:%M:%S.%L%z", | ||
| Types: map[string]string{ | ||
| "message": "string", | ||
| "id": "integer", | ||
| "s": "string", | ||
| "component": "string", | ||
| "context": "string", | ||
| }, | ||
| }, | ||
| Regex: `^(?<timestamp>[^ ]*)\s+(?<s>\w)\s+(?<component>[^ ]+)\s+\[(?<context>[^\]]+)]\s+(?<message>.*?) *(?<ms>(\d+))?(:?ms)?$`, | ||
| Field: "message", | ||
| }) | ||
|
|
||
| return c | ||
| } | ||
|
|
||
| type LoggingReceiverMongodb struct { | ||
| LoggingProcessorMongodb `yaml:",inline"` | ||
| ReceiverMixin confgenerator.LoggingReceiverFilesMixin `yaml:",inline" validate:"structonly"` | ||
| type LoggingReceiverMacroMongodb struct { | ||
| ReceiverMixin confgenerator.LoggingReceiverFilesMixin `yaml:",inline"` | ||
| LoggingProcessorMacroMongodb `yaml:",inline"` | ||
| } | ||
|
|
||
| func (r *LoggingReceiverMongodb) Components(ctx context.Context, tag string) []fluentbit.Component { | ||
| func (r LoggingReceiverMacroMongodb) Expand(ctx context.Context) (confgenerator.InternalLoggingReceiver, []confgenerator.InternalLoggingProcessor) { | ||
| if len(r.ReceiverMixin.IncludePaths) == 0 { | ||
| r.ReceiverMixin.IncludePaths = []string{ | ||
| // default logging location | ||
| "/var/log/mongodb/mongod.log*", | ||
| } | ||
| r.ReceiverMixin.IncludePaths = []string{"/var/log/mongodb/mongod.log*"} | ||
| } | ||
| c := r.ReceiverMixin.Components(ctx, tag) | ||
| c = append(c, r.LoggingProcessorMongodb.Components(ctx, tag, "mongodb")...) | ||
| return c | ||
|
|
||
| return &r.ReceiverMixin, r.LoggingProcessorMacroMongodb.Expand(ctx) | ||
| } | ||
|
|
||
| func init() { | ||
| confgenerator.LoggingReceiverTypes.RegisterType(func() confgenerator.LoggingReceiver { return &LoggingReceiverMongodb{} }) | ||
| confgenerator.RegisterLoggingReceiverMacro(func() LoggingReceiverMacroMongodb { | ||
|
||
| return LoggingReceiverMacroMongodb{} | ||
| }) | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -461,6 +461,58 @@ func (p LoggingProcessorNestWildcard) Components(ctx context.Context, tag, uid s | |
| } | ||
| } | ||
|
|
||
| type LoggingProcessorRemoveField struct { | ||
| Field string | ||
| } | ||
|
|
||
| func (p LoggingProcessorRemoveField) Components(ctx context.Context, tag, uid string) []fluentbit.Component { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've thought about this a bit and realized that the operations done by Here is a list of this operations :
Using
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Most of the solutions are valid here except there is one condition where When dealing with a log from From my understanding, I can't think of a solution that allows us to preserve the old
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we can use the Note : I'm partially guiding myself with the 1 Footnotes
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I tried that and the From the docs you linked so we end up deleting the
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ohhh! Ok! 🤔 Please try further with other variations of the AlternativeIf this doesn't work, we can try a solution that uses a custom I outlined a similar idea here #2014 (comment) . In summary you would create a struct You can probably also reuse
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was not able to come up with anything that worked with Will wait for @quentinmit to see if he has a better solution |
||
| filter := fluentbit.Component{ | ||
| Kind: "FILTER", | ||
| Config: map[string]string{ | ||
| "Name": "modify", | ||
| "Match": tag, | ||
| "Remove": p.Field, | ||
| }, | ||
| } | ||
| return []fluentbit.Component{filter} | ||
| } | ||
|
|
||
| type LoggingProcessorHardRename struct { | ||
| Field string | ||
| NewName string | ||
| } | ||
|
|
||
| func (p LoggingProcessorHardRename) Components(ctx context.Context, tag, uid string) []fluentbit.Component { | ||
| c := fluentbit.Component{ | ||
| Kind: "FILTER", | ||
| Config: map[string]string{ | ||
| "Name": "modify", | ||
| "Match": tag, | ||
| "Hard_rename": fmt.Sprintf("%s %s", p.Field, p.NewName), | ||
| }, | ||
| } | ||
| return []fluentbit.Component{c} | ||
| } | ||
|
|
||
| type LoggingProcessorNestLift struct { | ||
| NestedUnder string | ||
| AddPrefix string | ||
| } | ||
|
|
||
| func (p LoggingProcessorNestLift) Components(ctx context.Context, tag, uid string) []fluentbit.Component { | ||
| filter := fluentbit.Component{ | ||
| Kind: "FILTER", | ||
| Config: map[string]string{ | ||
| "Name": "nest", | ||
| "Match": tag, | ||
| "Operation": "lift", | ||
| "Nested_under": p.NestedUnder, | ||
| "Add_prefix": p.AddPrefix, | ||
| }, | ||
| } | ||
| return []fluentbit.Component{filter} | ||
| } | ||
|
|
||
| var LegacyBuiltinProcessors = map[string]LoggingProcessor{ | ||
| "lib:default_message_parser": &LoggingProcessorParseRegex{ | ||
| Regex: `^(?<message>.*)$`, | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After registering with
RegisterLoggingFilesProcessorMacrothere is no need to have aLoggingReceiverMacroMongodb.