diff --git a/config/config.go b/config/config.go index 5ac34ad9d27..edb033277d3 100644 --- a/config/config.go +++ b/config/config.go @@ -71,6 +71,10 @@ type Config struct { // Revision contains the current revision of the config. Revision string + // DisableLogDeduplication controls whether deduplication of noisy logs + // should be turned off. Defaults to false. + DisableLogDeduplication bool + // toCache stores the JSON marshalled version of the config to be cached. It should be a copy of // the config pulled from cloud with minor changes. // This version is kept because the config is changed as it moves through the system. @@ -86,21 +90,22 @@ type MaintenanceConfig struct { // NOTE: This data must be maintained with what is in Config. type configData struct { - Cloud *Cloud `json:"cloud,omitempty"` - Modules []Module `json:"modules,omitempty"` - Remotes []Remote `json:"remotes,omitempty"` - Components []resource.Config `json:"components,omitempty"` - Processes []pexec.ProcessConfig `json:"processes,omitempty"` - Services []resource.Config `json:"services,omitempty"` - Packages []PackageConfig `json:"packages,omitempty"` - Network NetworkConfig `json:"network"` - Auth AuthConfig `json:"auth"` - Debug bool `json:"debug,omitempty"` - DisablePartialStart bool `json:"disable_partial_start"` - EnableWebProfile bool `json:"enable_web_profile"` - LogConfig []logging.LoggerPatternConfig `json:"log,omitempty"` - Revision string `json:"revision,omitempty"` - MaintenanceConfig *MaintenanceConfig `json:"maintenance,omitempty"` + Cloud *Cloud `json:"cloud,omitempty"` + Modules []Module `json:"modules,omitempty"` + Remotes []Remote `json:"remotes,omitempty"` + Components []resource.Config `json:"components,omitempty"` + Processes []pexec.ProcessConfig `json:"processes,omitempty"` + Services []resource.Config `json:"services,omitempty"` + Packages []PackageConfig `json:"packages,omitempty"` + Network NetworkConfig `json:"network"` + Auth AuthConfig `json:"auth"` + Debug bool `json:"debug,omitempty"` + DisablePartialStart bool `json:"disable_partial_start"` + EnableWebProfile bool `json:"enable_web_profile"` + LogConfig []logging.LoggerPatternConfig `json:"log,omitempty"` + Revision string `json:"revision,omitempty"` + MaintenanceConfig *MaintenanceConfig `json:"maintenance,omitempty"` + DisableLogDeduplication bool `json:"disable_log_deduplication"` } // AppValidationStatus refers to the. diff --git a/config/proto_conversions.go b/config/proto_conversions.go index e318523d6fc..d1ecb8e1aac 100644 --- a/config/proto_conversions.go +++ b/config/proto_conversions.go @@ -106,6 +106,10 @@ func FromProto(proto *pb.RobotConfig, logger logging.Logger) (*Config, error) { cfg.MaintenanceConfig = maintenanceConfig } + if proto.DisableLogDeduplication { + cfg.DisableLogDeduplication = true + } + return &cfg, nil } diff --git a/go.mod b/go.mod index ae0d6f3459a..5c837c27a8a 100644 --- a/go.mod +++ b/go.mod @@ -75,7 +75,7 @@ require ( go.uber.org/atomic v1.11.0 go.uber.org/multierr v1.11.0 go.uber.org/zap v1.27.0 - go.viam.com/api v0.1.372 + go.viam.com/api v0.1.373 go.viam.com/test v1.2.4 go.viam.com/utils v0.1.118 goji.io v2.0.2+incompatible diff --git a/go.sum b/go.sum index ff9673c7a94..9b91e00af7c 100644 --- a/go.sum +++ b/go.sum @@ -1513,8 +1513,8 @@ go.uber.org/zap v1.18.1/go.mod h1:xg/QME4nWcxGxrpdeYfq7UvYrLh66cuVKdrbD1XF/NI= go.uber.org/zap v1.23.0/go.mod h1:D+nX8jyLsMHMYrln8A0rJjFt/T/9/bGgIhAqxv5URuY= go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= -go.viam.com/api v0.1.372 h1:Al9P7yojBDdNVAF7nrr5BAbzCvb+vrSp8N7BitbV0mQ= -go.viam.com/api v0.1.372/go.mod h1:g5eipXHNm0rQmW7DWya6avKcmzoypLmxnMlAaIsE5Ls= +go.viam.com/api v0.1.373 h1:1vpxO9thQFeEOJhvGWcjHAHyBH5EKHWKTaJahAo5b/A= +go.viam.com/api v0.1.373/go.mod h1:g5eipXHNm0rQmW7DWya6avKcmzoypLmxnMlAaIsE5Ls= go.viam.com/test v1.2.4 h1:JYgZhsuGAQ8sL9jWkziAXN9VJJiKbjoi9BsO33TW3ug= go.viam.com/test v1.2.4/go.mod h1:zI2xzosHdqXAJ/kFqcN+OIF78kQuTV2nIhGZ8EzvaJI= go.viam.com/utils v0.1.118 h1:Kp6ebrCBiYReeSC1XnWPTjtBJoTUsQ6YWAomQkQF/mE= diff --git a/logging/impl.go b/logging/impl.go index 5492386699f..14c0cc5f946 100644 --- a/logging/impl.go +++ b/logging/impl.go @@ -6,6 +6,7 @@ import ( "fmt" "os" "runtime" + "sync" "testing" "time" @@ -15,6 +16,14 @@ import ( "go.uber.org/zap/zaptest" ) +var ( + // Window duration over which to consider log messages "noisy.". + noisyMessageWindowDuration = 10 * time.Second + // Count threshold within `noisyMessageWindowDuration` after which to + // consider log messages "noisy.". + noisyMessageCountThreshold = 3 +) + type ( impl struct { name string @@ -26,6 +35,15 @@ type ( // avoid that. This function is a no-op for non-test loggers. See `NewTestAppender` // documentation for more details. testHelper func() + + // recentMessageMu guards the recentMessage fields below. + recentMessageMu sync.Mutex + // Map of messages to counts of that message being `Write`ten within window. + recentMessageCounts map[string]int + // Map of messages to last `LogEntry` with that message within window. + recentMessageEntries map[string]LogEntry + // Start of current window. + recentMessageWindowStart time.Time } // LogEntry embeds a zapcore Entry and slice of Fields. @@ -41,6 +59,32 @@ type ( } ) +// String stringifies a `LogEntry`. Should be used to emplace a log entry in +// `recentMessageEntries`, i.e. `LogEntry`s that `String`ify identically should +// be treated as identical with respect to noisiness and deduplication. Should +// not be used for actual writing of the entry to an appender. +func (le *LogEntry) String() string { + ret := le.Message + for _, field := range le.Fields { + ret += " " + field.Key + " " + + // Assume field's value is held in one of `Integer`, `Interface`, or + // `String`. Otherwise (field has no value or is equivalent to 0 or "") use + // the string "undefined". + switch { + case field.Integer != 0: + ret += fmt.Sprintf("%d", field.Integer) + case field.Interface != nil: + ret += fmt.Sprintf("%v", field.Interface) + case field.String != "": + ret += field.String + default: + ret += "undefined" + } + } + return ret +} + func (imp *impl) NewLogEntry() *LogEntry { ret := &LogEntry{} ret.Time = time.Now() @@ -84,6 +128,10 @@ func (imp *impl) Sublogger(subname string) Logger { imp.appenders, imp.registry, imp.testHelper, + sync.Mutex{}, + make(map[string]int), + make(map[string]LogEntry), + time.Now(), } // If there are multiple callers racing to create the same logger name (e.g: `viam.networking`), @@ -198,6 +246,46 @@ func (imp *impl) shouldLog(logLevel Level) bool { } func (imp *impl) Write(entry *LogEntry) { + if !DisableLogDeduplication.Load() { + // If we have entered a new recentMessage window, output noisy logs from + // the last window. + imp.recentMessageMu.Lock() + if time.Since(imp.recentMessageWindowStart) > noisyMessageWindowDuration { + for stringifiedEntry, count := range imp.recentMessageCounts { + if count > noisyMessageCountThreshold { + collapsedEntry := imp.recentMessageEntries[stringifiedEntry] + collapsedEntry.Message = fmt.Sprintf("Message logged %d times in past %v: %s", + count, noisyMessageWindowDuration, collapsedEntry.Message) + + imp.testHelper() + for _, appender := range imp.appenders { + err := appender.Write(collapsedEntry.Entry, collapsedEntry.Fields) + if err != nil { + fmt.Fprint(os.Stderr, err) + } + } + } + } + + // Clear maps and reset window. + clear(imp.recentMessageCounts) + clear(imp.recentMessageEntries) + imp.recentMessageWindowStart = time.Now() + } + + // Track entry in recentMessage maps. + stringifiedEntry := entry.String() + imp.recentMessageCounts[stringifiedEntry]++ + imp.recentMessageEntries[stringifiedEntry] = *entry + + if imp.recentMessageCounts[stringifiedEntry] > noisyMessageCountThreshold { + // If entry's message is reportedly "noisy," return early. + imp.recentMessageMu.Unlock() + return + } + imp.recentMessageMu.Unlock() + } + imp.testHelper() for _, appender := range imp.appenders { err := appender.Write(entry.Entry, entry.Fields) diff --git a/logging/impl_test.go b/logging/impl_test.go index 45a190a9cb7..267b668cb34 100644 --- a/logging/impl_test.go +++ b/logging/impl_test.go @@ -9,7 +9,9 @@ import ( "strconv" "strings" "testing" + "time" + "go.uber.org/zap/zapcore" "go.viam.com/test" ) @@ -146,11 +148,14 @@ func TestConsoleOutputFormat(t *testing.T) { // A logger object that will write to the `notStdout` buffer. notStdout := &bytes.Buffer{} impl := &impl{ - name: "impl", - level: NewAtomicLevelAt(DEBUG), - appenders: []Appender{NewWriterAppender(notStdout)}, - registry: newRegistry(), - testHelper: func() {}, + name: "impl", + level: NewAtomicLevelAt(DEBUG), + appenders: []Appender{NewWriterAppender(notStdout)}, + registry: newRegistry(), + testHelper: func() {}, + recentMessageCounts: make(map[string]int), + recentMessageEntries: make(map[string]LogEntry), + recentMessageWindowStart: time.Now(), } impl.Info("impl Info log") @@ -211,11 +216,14 @@ func TestContextLogging(t *testing.T) { notStdout := &bytes.Buffer{} // The default log level is error. logger := &impl{ - name: "impl", - level: NewAtomicLevelAt(ERROR), - appenders: []Appender{NewWriterAppender(notStdout)}, - registry: newRegistry(), - testHelper: func() {}, + name: "impl", + level: NewAtomicLevelAt(ERROR), + appenders: []Appender{NewWriterAppender(notStdout)}, + registry: newRegistry(), + testHelper: func() {}, + recentMessageCounts: make(map[string]int), + recentMessageEntries: make(map[string]LogEntry), + recentMessageWindowStart: time.Now(), } logger.CDebug(ctxNoDebug, "Debug log") @@ -286,11 +294,14 @@ func TestSublogger(t *testing.T) { // A logger object that will write to the `notStdout` buffer. notStdout := &bytes.Buffer{} logger := &impl{ - name: "impl", - level: NewAtomicLevelAt(DEBUG), - appenders: []Appender{NewWriterAppender(notStdout)}, - registry: newRegistry(), - testHelper: func() {}, + name: "impl", + level: NewAtomicLevelAt(DEBUG), + appenders: []Appender{NewWriterAppender(notStdout)}, + registry: newRegistry(), + testHelper: func() {}, + recentMessageCounts: make(map[string]int), + recentMessageEntries: make(map[string]LogEntry), + recentMessageWindowStart: time.Now(), } logger.Info("info log") @@ -304,6 +315,13 @@ func TestSublogger(t *testing.T) { } func TestLoggingWithFields(t *testing.T) { + // Disable log deduplication for this test, as it logs "noisily" and makes + // assertions on those logs. + DisableLogDeduplication.Store(true) + defer func() { + DisableLogDeduplication.Store(false) + }() + // A logger object that will write to the `notStdout` buffer. notStdout := &bytes.Buffer{} var logger Logger @@ -420,3 +438,124 @@ func TestLoggingWithFields(t *testing.T) { assertLogMatches(t, notStdout, `2023-10-30T09:12:09.459Z DEBUG impl logging/impl_test.go:200 Debugw log {"traceKey":"foobar","k":"v","key":"value"}`) } + +func TestLogEntryStringify(t *testing.T) { + testCases := []struct { + name string + logEntry *LogEntry + expectedStringification string + }{ + { + "no fields", + &LogEntry{ + Entry: zapcore.Entry{ + Message: "these are not the droids you are looking for", + }, + }, + "these are not the droids you are looking for", + }, + { + "fields", + &LogEntry{ + Entry: zapcore.Entry{ + Message: "these are not the droids you are looking for", + }, + Fields: []zapcore.Field{ + { + Key: "obi", + String: "wan", + }, + { + Key: "r2d", + Integer: 2, + }, + { + Key: "c3", + Interface: "po", + }, + }, + }, + "these are not the droids you are looking for obi wan r2d 2 c3 po", + }, + { + "undefined field", + &LogEntry{ + Entry: zapcore.Entry{ + Message: "these are not the droids you are looking for", + }, + Fields: []zapcore.Field{ + { + Key: "obi", + }, + }, + }, + "these are not the droids you are looking for obi undefined", + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + actualStringification := tc.logEntry.String() + test.That(t, actualStringification, test.ShouldEqual, tc.expectedStringification) + }) + } +} + +func TestLoggingDeduplication(t *testing.T) { + // Create a logger object that will write to the `notStdout` buffer. + notStdout := &bytes.Buffer{} + logger := &impl{ + name: "impl", + level: NewAtomicLevelAt(DEBUG), + appenders: []Appender{NewWriterAppender(notStdout)}, + registry: newRegistry(), + testHelper: func() {}, + recentMessageCounts: make(map[string]int), + recentMessageEntries: make(map[string]LogEntry), + recentMessageWindowStart: time.Now(), + } + + // Artificially lower noisy message window for testing. + originalNoisyMessageWindowDuration := noisyMessageWindowDuration + noisyMessageWindowDuration = 500 * time.Millisecond + defer func() { + noisyMessageWindowDuration = originalNoisyMessageWindowDuration + }() + + // Log 4 identical messages (same sublogger, messages, and fields) in quick + // succession. Sleep for noisy message window duration. Assert that a final, + // separate log is an aggregation log. + identicalMsg := "identical message" + loggerWith := logger.WithFields("key", "value") + for range 3 { + loggerWith.Info(identicalMsg) + assertLogMatches(t, notStdout, + `2023-10-30T13:19:45.806Z INFO impl logging/impl_test.go:132 identical message {"key":"value"}`) + } + loggerWith.Info(identicalMsg) // not output due to being noisy + time.Sleep(noisyMessageWindowDuration) + loggerWith.Info("foo") // log arbitrary message to force output of aggregated message + assertLogMatches(t, notStdout, + `2023-10-30T13:19:45.806Z INFO impl logging/impl_test.go:132 Message logged 4 times in past 500ms: identical message {"key":"value"}`) + assertLogMatches(t, notStdout, + `2023-10-30T13:19:45.806Z INFO impl logging/impl_test.go:132 foo {"key":"value"}`) + + // Assert aggregation resets after sleep (same aggregation occurs again.) + for range 3 { + loggerWith.Info(identicalMsg) + assertLogMatches(t, notStdout, + `2023-10-30T13:19:45.806Z INFO impl logging/impl_test.go:132 identical message {"key":"value"}`) + } + loggerWith.Info(identicalMsg) // not output due to being noisy + time.Sleep(noisyMessageWindowDuration) + loggerWith.Info("foo") // log arbitrary message to force output of aggregated message + assertLogMatches(t, notStdout, + `2023-10-30T13:19:45.806Z INFO impl logging/impl_test.go:132 Message logged 4 times in past 500ms: identical message {"key":"value"}`) + assertLogMatches(t, notStdout, + `2023-10-30T13:19:45.806Z INFO impl logging/impl_test.go:132 foo {"key":"value"}`) + + // TODO(benji): Add the following assertions to test more deduplication logic. + // + // Assert that using a different sublogger uses separate aggregation. + // Assert that using different fields uses separate aggregation. + // Assert that using different levels does _not_ use separate aggregation. +} diff --git a/logging/logging.go b/logging/logging.go index 40060c14933..424dfdf89f5 100644 --- a/logging/logging.go +++ b/logging/logging.go @@ -4,7 +4,9 @@ package logging import ( "sync" "testing" + "time" + "go.uber.org/atomic" "go.uber.org/zap" "go.uber.org/zap/zapcore" "go.uber.org/zap/zaptest/observer" @@ -18,6 +20,10 @@ var ( // GlobalLogLevel should be used whenever a zap logger is created that wants to obey the debug // flag from the CLI or robot config. GlobalLogLevel = zap.NewAtomicLevelAt(zap.InfoLevel) + + // DisableLogDeduplication controls whether to disable de-duplicating noisy + // logs; defaults to false and can be specified in robot config. + DisableLogDeduplication = atomic.Bool{} ) // ReplaceGlobal replaces the global loggers. @@ -62,11 +68,14 @@ func NewZapLoggerConfig() zap.Config { // NewLogger returns a new logger that outputs Info+ logs to stdout in UTC. func NewLogger(name string) Logger { logger := &impl{ - name: name, - level: NewAtomicLevelAt(INFO), - appenders: []Appender{NewStdoutAppender()}, - registry: newRegistry(), - testHelper: func() {}, + name: name, + level: NewAtomicLevelAt(INFO), + appenders: []Appender{NewStdoutAppender()}, + registry: newRegistry(), + testHelper: func() {}, + recentMessageCounts: make(map[string]int), + recentMessageEntries: make(map[string]LogEntry), + recentMessageWindowStart: time.Now(), } logger.registry.registerLogger(name, logger) @@ -78,11 +87,14 @@ func NewLogger(name string) Logger { func NewLoggerWithRegistry(name string) (Logger, *Registry) { reg := newRegistry() logger := &impl{ - name: name, - level: NewAtomicLevelAt(INFO), - appenders: []Appender{NewStdoutAppender()}, - registry: reg, - testHelper: func() {}, + name: name, + level: NewAtomicLevelAt(INFO), + appenders: []Appender{NewStdoutAppender()}, + registry: reg, + testHelper: func() {}, + recentMessageCounts: make(map[string]int), + recentMessageEntries: make(map[string]LogEntry), + recentMessageWindowStart: time.Now(), } logger.registry.registerLogger(name, logger) @@ -92,11 +104,14 @@ func NewLoggerWithRegistry(name string) (Logger, *Registry) { // NewDebugLogger returns a new logger that outputs Debug+ logs to stdout in UTC. func NewDebugLogger(name string) Logger { logger := &impl{ - name: name, - level: NewAtomicLevelAt(DEBUG), - appenders: []Appender{NewStdoutAppender()}, - registry: newRegistry(), - testHelper: func() {}, + name: name, + level: NewAtomicLevelAt(DEBUG), + appenders: []Appender{NewStdoutAppender()}, + registry: newRegistry(), + testHelper: func() {}, + recentMessageCounts: make(map[string]int), + recentMessageEntries: make(map[string]LogEntry), + recentMessageWindowStart: time.Now(), } logger.registry.registerLogger(name, logger) @@ -107,11 +122,14 @@ func NewDebugLogger(name string) Logger { // pre-existing appenders/outputs. func NewBlankLogger(name string) Logger { logger := &impl{ - name: name, - level: NewAtomicLevelAt(DEBUG), - appenders: []Appender{}, - registry: newRegistry(), - testHelper: func() {}, + name: name, + level: NewAtomicLevelAt(DEBUG), + appenders: []Appender{}, + registry: newRegistry(), + testHelper: func() {}, + recentMessageCounts: make(map[string]int), + recentMessageEntries: make(map[string]LogEntry), + recentMessageWindowStart: time.Now(), } logger.registry.registerLogger(name, logger) @@ -134,8 +152,11 @@ func NewObservedTestLogger(tb testing.TB) (Logger, *observer.ObservedLogs) { NewTestAppender(tb), observerCore, }, - registry: newRegistry(), - testHelper: tb.Helper, + registry: newRegistry(), + testHelper: tb.Helper, + recentMessageCounts: make(map[string]int), + recentMessageEntries: make(map[string]LogEntry), + recentMessageWindowStart: time.Now(), } return logger, observedLogs @@ -153,8 +174,11 @@ func NewObservedTestLoggerWithRegistry(tb testing.TB, name string) (Logger, *obs NewTestAppender(tb), observerCore, }, - registry: registry, - testHelper: tb.Helper, + registry: registry, + testHelper: tb.Helper, + recentMessageCounts: make(map[string]int), + recentMessageEntries: make(map[string]LogEntry), + recentMessageWindowStart: time.Now(), } return logger, observedLogs, registry @@ -187,8 +211,11 @@ func NewInMemoryLogger(tb testing.TB) *MemLogger { appenders: []Appender{ observerCore, }, - registry: newRegistry(), - testHelper: tb.Helper, + registry: newRegistry(), + testHelper: tb.Helper, + recentMessageCounts: make(map[string]int), + recentMessageEntries: make(map[string]LogEntry), + recentMessageWindowStart: time.Now(), } memLogger := &MemLogger{logger, tb, observedLogs} diff --git a/robot/impl/local_robot.go b/robot/impl/local_robot.go index 0822cfcdef7..2b8aa7fc75c 100644 --- a/robot/impl/local_robot.go +++ b/robot/impl/local_robot.go @@ -1134,6 +1134,15 @@ func (r *localRobot) reconfigure(ctx context.Context, newConfig *config.Config, var allErrs error + // Check incoming disable log deduplication value for any diff. + if newConfig.DisableLogDeduplication != logging.DisableLogDeduplication.Load() { + state := "enabled" + if newConfig.DisableLogDeduplication { + state = "disabled" + } + r.Logger().CInfof(ctx, "noisy log deduplication now %s", state) + } + // Sync Packages before reconfiguring rest of robot and resolving references to any packages // in the config. // TODO(RSDK-1849): Make this non-blocking so other resources that do not require packages can run before package sync finishes.