Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
5e26c86
mongodb started refactor
dyl10s Jul 22, 2025
f620ecb
updated input log and tests
dyl10s Jul 22, 2025
9e26aae
wip
dyl10s Jul 22, 2025
cbe4dbb
mongodb first refactor pass
dyl10s Jul 22, 2025
2aafb05
only export receiver
dyl10s Jul 22, 2025
f0ae87d
cleaned up code
dyl10s Jul 23, 2025
1daa2bb
Merge remote-tracking branch 'origin/master' into feat/mongodb-refactor
dyl10s Aug 19, 2025
087c68e
added new line to input.log
dyl10s Aug 19, 2025
0b2f024
Merge remote-tracking branch 'origin/master' into feat/mongodb-refactor
dyl10s Aug 20, 2025
f6a140b
Merge remote-tracking branch 'origin/master' into feat/mongodb-refactor
dyl10s Aug 25, 2025
ba3d258
updated mixin files
dyl10s Aug 25, 2025
38391cf
refactor to remove nest commands
dyl10s Aug 25, 2025
87897ed
added timestamp parser
dyl10s Aug 25, 2025
75b5492
remove LoggingProcessorNestLift
dyl10s Aug 25, 2025
1bba7a3
updated golden files
dyl10s Aug 25, 2025
7b4c005
refactor: rename HardRename to RenameIfExists with otel support
dyl10s Aug 25, 2025
866ee08
Merge branch 'master' into feat/mongodb-refactor
dyl10s Aug 25, 2025
266d929
refactor to use existing functions
dyl10s Aug 26, 2025
d864b85
Merge branch 'master' into feat/mongodb-refactor
dyl10s Aug 26, 2025
7796e98
Merge branch 'master' into feat/mongodb-refactor
dyl10s Aug 26, 2025
34beec6
Merge branch 'master' into feat/mongodb-refactor
franciscovalentecastro Nov 3, 2025
dc8cdcd
Merge branch 'master' into feat/mongodb-refactor
franciscovalentecastro Nov 11, 2025
37e85b3
Merge branch 'master' into feat/mongodb-refactor
franciscovalentecastro Dec 23, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
240 changes: 84 additions & 156 deletions apps/mongodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,9 @@ package apps

import (
"context"
"fmt"
"strings"

"github.com/GoogleCloudPlatform/ops-agent/confgenerator"
"github.com/GoogleCloudPlatform/ops-agent/confgenerator/fluentbit"
"github.com/GoogleCloudPlatform/ops-agent/confgenerator/fluentbit/modify"
"github.com/GoogleCloudPlatform/ops-agent/confgenerator/otel"
"github.com/GoogleCloudPlatform/ops-agent/internal/secret"
)
Expand Down Expand Up @@ -95,120 +92,81 @@ func init() {
confgenerator.MetricsReceiverTypes.RegisterType(func() confgenerator.MetricsReceiver { return &MetricsReceiverMongoDB{} })
}

type LoggingProcessorMongodb struct {
confgenerator.ConfigComponent `yaml:",inline"`
type LoggingProcessorMacroMongodb struct {
}

func (*LoggingProcessorMongodb) Type() string {
func (LoggingProcessorMacroMongodb) Type() string {
return "mongodb"
}

func (p *LoggingProcessorMongodb) Components(ctx context.Context, tag, uid string) []fluentbit.Component {
c := []fluentbit.Component{}
func (p LoggingProcessorMacroMongodb) Expand(ctx context.Context) []confgenerator.InternalLoggingProcessor {
c := []confgenerator.InternalLoggingProcessor{}

c = append(c, p.JsonLogComponents(ctx, tag, uid)...)
c = append(c, p.RegexLogComponents(tag, uid)...)
c = append(c, p.severityParser(ctx, tag, uid)...)
c = append(c, p.JsonLogComponents(ctx)...)
c = append(c, p.RegexLogComponents()...)
c = append(c, p.severityParser()...)

return c
}

// JsonLogComponents are the fluentbit components for parsing log messages that are json formatted.
// these are generally messages from mongo with versions greater than or equal to 4.4
// documentation: https://docs.mongodb.com/v4.4/reference/log-messages/#log-message-format
func (p *LoggingProcessorMongodb) JsonLogComponents(ctx context.Context, tag, uid string) []fluentbit.Component {
c := p.jsonParserWithTimeKey(ctx, tag, uid)
func (p LoggingProcessorMacroMongodb) JsonLogComponents(ctx context.Context) []confgenerator.InternalLoggingProcessor {
c := p.jsonParserWithTimeKey()

c = append(c, p.promoteWiredTiger(tag, uid)...)
c = append(c, p.renames(tag, uid)...)
c = append(c, p.promoteWiredTiger()...)
c = append(c, p.renames()...)

return c
}

// jsonParserWithTimeKey requires promotion of the nested timekey for the json parser so we must
// first promote the $date field from the "t" field before declaring the parser
func (p *LoggingProcessorMongodb) jsonParserWithTimeKey(ctx context.Context, tag, uid string) []fluentbit.Component {
c := []fluentbit.Component{}
func (p LoggingProcessorMacroMongodb) jsonParserWithTimeKey() []confgenerator.InternalLoggingProcessor {
c := []confgenerator.InternalLoggingProcessor{}

jsonParser := &confgenerator.LoggingProcessorParseJson{
c = append(c, &confgenerator.LoggingProcessorParseJson{
ParserShared: confgenerator.ParserShared{
TimeKey: "time",
TimeFormat: "%Y-%m-%dT%H:%M:%S.%L%z",
Types: map[string]string{
"id": "integer",
"message": "string",
},
},
}
jpComponents := jsonParser.Components(ctx, tag, uid)

// The parserFilterComponent is the actual filter component that configures and defines
// which parser to use. We need the component to determine which parser to use when
// re-parsing below. Each time a parser filter is used, there are 2 filter components right
// before it to account for the nest lua script (see confgenerator/fluentbit/parse_deduplication.go).
// Therefore, the parse filter component is actually the third component in the list.
parserFilterComponent := jpComponents[2]
c = append(c, jpComponents...)

tempPrefix := "temp_ts_"
timeKey := "time"
})

// have to bring $date to top level in order for it to be parsed as timeKey
// see https://github.com/fluent/fluent-bit/issues/1013
liftTs := fluentbit.Component{
Kind: "FILTER",
Config: map[string]string{
"Name": "nest",
"Match": tag,
"Operation": "lift",
"Nested_under": "t",
"Add_prefix": tempPrefix,
c = append(c, &confgenerator.LoggingProcessorModifyFields{
Fields: map[string]*confgenerator.ModifyField{
"jsonPayload.time": {
MoveFrom: "jsonPayload.t.$date",
},
},
}

renameTsOption := modify.NewHardRenameOptions(fmt.Sprintf("%s$date", tempPrefix), timeKey)
renameTs := renameTsOption.Component(tag)
})

c = append(c, liftTs, renameTs)
c = append(c, &confgenerator.LoggingProcessorRemoveField{
Field: "t",
})

// IMPORTANT: now that we have lifted the json to top level
// we need to re-parse in order to properly set time at the
// parser level
nestFilters := fluentbit.LuaFilterComponents(tag, fluentbit.ParserNestLuaFunction, fmt.Sprintf(fluentbit.ParserNestLuaScriptContents, "message"))
parserFilter := fluentbit.Component{
Kind: "FILTER",
Config: map[string]string{
"Name": "parser",
"Match": tag,
"Key_Name": "message",
"Reserve_Data": "True",
"Parser": parserFilterComponent.OrderedConfig[0][1],
},
}
mergeFilters := fluentbit.LuaFilterComponents(tag, fluentbit.ParserMergeLuaFunction, fluentbit.ParserMergeLuaScriptContents)
c = append(c, nestFilters...)
c = append(c, parserFilter)
c = append(c, mergeFilters...)

removeTimestamp := fluentbit.Component{
Kind: "FILTER",
Config: map[string]string{
"Name": "modify",
"Match": tag,
"Remove": timeKey,
c = append(c, &confgenerator.LoggingProcessorParseTimestamp{
ParserShared: confgenerator.ParserShared{
TimeKey: "time",
TimeFormat: "%Y-%m-%dT%H:%M:%S.%L%z",
},
}
c = append(c, removeTimestamp)
})

return c
}

// severityParser is used by both regex and json parser to ensure an "s" field on the entry gets translated
// to a valid logging.googleapis.com/seveirty field
func (p *LoggingProcessorMongodb) severityParser(ctx context.Context, tag, uid string) []fluentbit.Component {
severityComponents := []fluentbit.Component{}

severityComponents = append(severityComponents,
confgenerator.LoggingProcessorModifyFields{
func (p LoggingProcessorMacroMongodb) severityParser() []confgenerator.InternalLoggingProcessor {
return []confgenerator.InternalLoggingProcessor{
&confgenerator.LoggingProcessorModifyFields{
Fields: map[string]*confgenerator.ModifyField{
"jsonPayload.severity": {
MoveFrom: "jsonPayload.s",
Expand All @@ -231,115 +189,85 @@ func (p *LoggingProcessorMongodb) severityParser(ctx context.Context, tag, uid s
},
InstrumentationSourceLabel: instrumentationSourceValue(p.Type()),
},
}.Components(ctx, tag, uid)...,
)

return severityComponents
},
}
}

func (p *LoggingProcessorMongodb) renames(tag, uid string) []fluentbit.Component {
r := []fluentbit.Component{}
func (p LoggingProcessorMacroMongodb) renames() []confgenerator.InternalLoggingProcessor {
r := []confgenerator.InternalLoggingProcessor{}
renames := []struct {
src string
dest string
}{
{"c", "component"},
{"ctx", "context"},
{"msg", "message"},
{"jsonPayload.c", "jsonPayload.component"},
{"jsonPayload.ctx", "jsonPayload.context"},
{"jsonPayload.msg", "jsonPayload.message"},
{"jsonPayload.attr", "jsonPayload.attributes"},
}

for _, rename := range renames {
rename := modify.NewRenameOptions(rename.src, rename.dest)
r = append(r, rename.Component(tag))
r = append(r, &confgenerator.LoggingProcessorModifyFields{
Fields: map[string]*confgenerator.ModifyField{
rename.dest: {
MoveFrom: rename.src,
},
},
})
}

return r
}

func (p *LoggingProcessorMongodb) promoteWiredTiger(tag, uid string) []fluentbit.Component {
func (p LoggingProcessorMacroMongodb) promoteWiredTiger() []confgenerator.InternalLoggingProcessor {
// promote messages that are WiredTiger messages and are nested in attr.message
c := []confgenerator.InternalLoggingProcessor{}

addPrefix := "temp_attributes_"
upNest := fluentbit.Component{
Kind: "FILTER",
Config: map[string]string{
"Name": "nest",
"Match": tag,
"Operation": "lift",
"Nested_under": "attr",
"Add_prefix": addPrefix,
},
}

hardRenameMessage := modify.NewHardRenameOptions(fmt.Sprintf("%smessage", addPrefix), "msg")
wiredTigerRename := hardRenameMessage.Component(tag)

renameRemainingAttributes := fluentbit.Component{
Kind: "FILTER",
Config: map[string]string{
"Name": "nest",
"Wildcard": fmt.Sprintf("%s*", addPrefix),
"Match": tag,
"Operation": "nest",
"Nest_under": "attributes",
"Remove_prefix": addPrefix,
c = append(c, &confgenerator.LoggingProcessorModifyFields{
Fields: map[string]*confgenerator.ModifyField{
"jsonPayload.temp_attributes_message": {
MoveFrom: "jsonPayload.attr.message",
},
},
}
})

c = append(c, &confgenerator.LoggingProcessorRenameIfExists{
Field: addPrefix + "message",
NewName: "msg",
})

return []fluentbit.Component{upNest, wiredTigerRename, renameRemainingAttributes}
return c
}

func (p *LoggingProcessorMongodb) RegexLogComponents(tag, uid string) []fluentbit.Component {
c := []fluentbit.Component{}
parseKey := "message"
parser, parserName := fluentbit.ParserComponentBase("%Y-%m-%dT%H:%M:%S.%L%z", "timestamp", map[string]string{
"message": "string",
"id": "integer",
"s": "string",
"component": "string",
"context": "string",
}, fmt.Sprintf("%s_regex", tag), uid)
parser.Config["Format"] = "regex"
parser.Config["Regex"] = `^(?<timestamp>[^ ]*)\s+(?<s>\w)\s+(?<component>[^ ]+)\s+\[(?<context>[^\]]+)]\s+(?<message>.*?) *(?<ms>(\d+))?(:?ms)?$`
parser.Config["Key_Name"] = parseKey

nestFilters := fluentbit.LuaFilterComponents(tag, fluentbit.ParserNestLuaFunction, fmt.Sprintf(fluentbit.ParserNestLuaScriptContents, parseKey))
parserFilter := fluentbit.Component{
Kind: "FILTER",
Config: map[string]string{
"Match": tag,
"Name": "parser",
"Parser": parserName,
"Reserve_Data": "True",
"Key_Name": parseKey,
},
}
mergeFilters := fluentbit.LuaFilterComponents(tag, fluentbit.ParserMergeLuaFunction, fluentbit.ParserMergeLuaScriptContents)
func (p LoggingProcessorMacroMongodb) RegexLogComponents() []confgenerator.InternalLoggingProcessor {
c := []confgenerator.InternalLoggingProcessor{}

c = append(c, parser)
c = append(c, nestFilters...)
c = append(c, parserFilter)
c = append(c, mergeFilters...)
c = append(c, &confgenerator.LoggingProcessorParseRegex{
ParserShared: confgenerator.ParserShared{
TimeKey: "timestamp",
TimeFormat: "%Y-%m-%dT%H:%M:%S.%L%z",
Types: map[string]string{
"message": "string",
"id": "integer",
"s": "string",
"component": "string",
"context": "string",
},
},
Regex: `^(?<timestamp>[^ ]*)\s+(?<s>\w)\s+(?<component>[^ ]+)\s+\[(?<context>[^\]]+)]\s+(?<message>.*?) *(?<ms>(\d+))?(:?ms)?$`,
Field: "message",
})

return c
}

type LoggingReceiverMongodb struct {
LoggingProcessorMongodb `yaml:",inline"`
ReceiverMixin confgenerator.LoggingReceiverFilesMixin `yaml:",inline" validate:"structonly"`
}

func (r *LoggingReceiverMongodb) Components(ctx context.Context, tag string) []fluentbit.Component {
if len(r.ReceiverMixin.IncludePaths) == 0 {
r.ReceiverMixin.IncludePaths = []string{
// default logging location
"/var/log/mongodb/mongod.log*",
}
func loggingReceiverFilesMixinMongodb() confgenerator.LoggingReceiverFilesMixin {
return confgenerator.LoggingReceiverFilesMixin{
IncludePaths: []string{"/var/log/mongodb/mongod.log*"},
}
c := r.ReceiverMixin.Components(ctx, tag)
c = append(c, r.LoggingProcessorMongodb.Components(ctx, tag, "mongodb")...)
return c
}

func init() {
confgenerator.LoggingReceiverTypes.RegisterType(func() confgenerator.LoggingReceiver { return &LoggingReceiverMongodb{} })
confgenerator.RegisterLoggingFilesProcessorMacro[LoggingProcessorMacroMongodb](loggingReceiverFilesMixinMongodb)
}
Loading
Loading