diff --git a/libbeat/beat/info.go b/libbeat/beat/info.go index aa1338091a79..0561156d97f9 100644 --- a/libbeat/beat/info.go +++ b/libbeat/beat/info.go @@ -42,10 +42,9 @@ type Info struct { UserAgent string // A string of the user-agent that can be passed to any outputs or network connections FIPSDistribution bool // If the beat was compiled as a FIPS distribution. - LogConsumer consumer.Logs // otel log consumer - ComponentID string // otel component id from the collector config e.g. "filebeatreceiver/logs" - UseDefaultProcessors bool // Whether to use the default processors - Logger *logp.Logger + LogConsumer consumer.Logs // otel log consumer + ComponentID string // otel component id from the collector config e.g. "filebeatreceiver/logs" + Logger *logp.Logger } func (i Info) FQDNAwareHostname(useFQDN bool) string { diff --git a/libbeat/publisher/processing/default.go b/libbeat/publisher/processing/default.go index d59f4c6aa88a..70f406e575ab 100644 --- a/libbeat/publisher/processing/default.go +++ b/libbeat/publisher/processing/default.go @@ -113,10 +113,9 @@ func MakeDefaultSupport( // don't try to "merge" the two lists somehow, if the supportFactory caller requests its own processors, use those // also makes it easier to disable global processors if needed, since they're otherwise hardcoded var rawProcessors processors.PluginConfig - shouldLoadDefaultProcessors := info.UseDefaultProcessors || management.UnderAgent() // don't check the array directly, use HasField, that way processors can easily be bypassed with -E processors=[] - if shouldLoadDefaultProcessors && !beatCfg.HasField("processors") { + if management.UnderAgent() && !beatCfg.HasField("processors") { log.Debugf("In fleet/otel mode with no processors specified, defaulting to global processors") rawProcessors = fleetDefaultProcessors diff --git a/x-pack/filebeat/fbreceiver/factory.go b/x-pack/filebeat/fbreceiver/factory.go index 519720b66cec..d49f27319461 100644 --- a/x-pack/filebeat/fbreceiver/factory.go +++ b/x-pack/filebeat/fbreceiver/factory.go @@ -45,7 +45,7 @@ func createReceiver(ctx context.Context, set receiver.Settings, baseCfg componen settings.ElasticLicensed = true settings.Initialize = append(settings.Initialize, include.InitializeModule) - b, err := xpInstance.NewBeatForReceiver(settings, cfg.Beatconfig, true, consumer, set.ID.String(), set.Logger.Core()) + b, err := xpInstance.NewBeatForReceiver(settings, cfg.Beatconfig, consumer, set.ID.String(), set.Logger.Core()) if err != nil { return nil, fmt.Errorf("error creating %s: %w", Name, err) } diff --git a/x-pack/filebeat/fbreceiver/receiver_test.go b/x-pack/filebeat/fbreceiver/receiver_test.go index 0763b13ac3b4..73099129a51c 100644 --- a/x-pack/filebeat/fbreceiver/receiver_test.go +++ b/x-pack/filebeat/fbreceiver/receiver_test.go @@ -69,9 +69,10 @@ func TestNewReceiver(t *testing.T) { "*", }, }, - "path.home": t.TempDir(), - "http.enabled": true, - "http.host": monitorHost, + "path.home": t.TempDir(), + "http.enabled": true, + "http.host": monitorHost, + "management.otel.enabled": true, }, } diff --git a/x-pack/filebeat/tests/integration/otel_test.go b/x-pack/filebeat/tests/integration/otel_test.go index c613f1bd2f29..80f4d2225443 100644 --- a/x-pack/filebeat/tests/integration/otel_test.go +++ b/x-pack/filebeat/tests/integration/otel_test.go @@ -359,6 +359,7 @@ func TestFilebeatOTelReceiverE2E(t *testing.T) { http.enabled: true http.host: localhost http.port: {{.MonitoringPort}} + management.otel.enabled: true exporters: debug: use_internal_logger: false diff --git a/x-pack/libbeat/cmd/instance/beat.go b/x-pack/libbeat/cmd/instance/beat.go index 0602e63cfb0c..a771608080b5 100644 --- a/x-pack/libbeat/cmd/instance/beat.go +++ b/x-pack/libbeat/cmd/instance/beat.go @@ -39,7 +39,7 @@ import ( const receiverPublisherCloseTimeout = 5 * time.Second // NewBeatForReceiver creates a Beat that will be used in the context of an otel receiver -func NewBeatForReceiver(settings instance.Settings, receiverConfig map[string]any, useDefaultProcessors bool, consumer consumer.Logs, componentID string, core zapcore.Core) (*instance.Beat, error) { +func NewBeatForReceiver(settings instance.Settings, receiverConfig map[string]any, consumer consumer.Logs, componentID string, core zapcore.Core) (*instance.Beat, error) { b, err := instance.NewBeat(settings.Name, settings.IndexPrefix, settings.Version, @@ -235,7 +235,6 @@ func NewBeatForReceiver(settings instance.Settings, receiverConfig map[string]an return nil, fmt.Errorf("error setting index supporter: %w", err) } - b.Info.UseDefaultProcessors = useDefaultProcessors processingFactory := settings.Processing if processingFactory == nil { processingFactory = processing.MakeDefaultBeatSupport(true) diff --git a/x-pack/libbeat/cmd/instance/beat_test.go b/x-pack/libbeat/cmd/instance/beat_test.go index 6caa713ff7a8..7dda2da1a872 100644 --- a/x-pack/libbeat/cmd/instance/beat_test.go +++ b/x-pack/libbeat/cmd/instance/beat_test.go @@ -36,7 +36,7 @@ func TestManager(t *testing.T) { "path.home": tmpDir, } t.Run("otel management disabled - key missing", func(t *testing.T) { - beat, err := NewBeatForReceiver(cmd.FilebeatSettings("filebeat"), cfg, false, consumertest.NewNop(), "testcomponent", zapcore.NewNopCore()) + beat, err := NewBeatForReceiver(cmd.FilebeatSettings("filebeat"), cfg, consumertest.NewNop(), "testcomponent", zapcore.NewNopCore()) assert.NoError(t, err) assert.NotNil(t, beat.Manager) // it should fallback to FallbackManager if key is missing @@ -50,7 +50,7 @@ func TestManager(t *testing.T) { defer func() { management.SetUnderAgent(false) // reset to false }() - beat, err := NewBeatForReceiver(cmd.FilebeatSettings("filebeat"), tmpCfg, false, consumertest.NewNop(), "testcomponent", zapcore.NewNopCore()) + beat, err := NewBeatForReceiver(cmd.FilebeatSettings("filebeat"), tmpCfg, consumertest.NewNop(), "testcomponent", zapcore.NewNopCore()) assert.NoError(t, err) assert.NotNil(t, beat.Manager) assert.IsType(t, beat.Manager, &otelmanager.OtelManager{}) @@ -63,7 +63,7 @@ func TestManager(t *testing.T) { defer func() { management.SetUnderAgent(false) // reset to false }() - beat, err := NewBeatForReceiver(cmd.FilebeatSettings("filebeat"), tmpCfg, false, consumertest.NewNop(), "testcomponent", zapcore.NewNopCore()) + beat, err := NewBeatForReceiver(cmd.FilebeatSettings("filebeat"), tmpCfg, consumertest.NewNop(), "testcomponent", zapcore.NewNopCore()) assert.NoError(t, err) assert.NotNil(t, beat.Manager) assert.IsType(t, beat.Manager, &management.FallbackManager{}) diff --git a/x-pack/libbeat/outputs/otelconsumer/otelconsumer.go b/x-pack/libbeat/outputs/otelconsumer/otelconsumer.go index c629e264c015..825f474e3fc7 100644 --- a/x-pack/libbeat/outputs/otelconsumer/otelconsumer.go +++ b/x-pack/libbeat/outputs/otelconsumer/otelconsumer.go @@ -115,7 +115,7 @@ func (out *otelConsumer) logsPublish(ctx context.Context, batch publisher.Batch) } } - beatEvent := event.Content.Fields + beatEvent := event.Content.Fields.Clone() if beatEvent == nil { beatEvent = mapstr.M{} } diff --git a/x-pack/libbeat/outputs/otelconsumer/otelconsumer_test.go b/x-pack/libbeat/outputs/otelconsumer/otelconsumer_test.go index df1ee483aea4..053b0c5907ee 100644 --- a/x-pack/libbeat/outputs/otelconsumer/otelconsumer_test.go +++ b/x-pack/libbeat/outputs/otelconsumer/otelconsumer_test.go @@ -274,10 +274,12 @@ func TestPublish(t *testing.T) { "_id": "abc123", }, } + ch := make(chan plog.Logs, 1) batch := outest.NewBatch(event) var countLogs int otelConsumer := makeOtelConsumer(t, func(ctx context.Context, ld plog.Logs) error { countLogs = countLogs + ld.LogRecordCount() + ch <- ld return nil }) otelConsumer.beatInfo.ComponentID = tc.componentID @@ -286,11 +288,31 @@ func TestPublish(t *testing.T) { assert.Len(t, batch.Signals, 1) assert.Equal(t, outest.BatchACK, batch.Signals[0].Tag) assert.Equal(t, len(batch.Events()), countLogs, "all events should be consumed") - for _, event := range batch.Events() { - beatEvent := event.Content.Fields.Flatten() - assert.Equal(t, tc.expectedComponentID, beatEvent["agent."+otelComponentIDKey], "expected agent.otelcol.component.id field in log record") - assert.Equal(t, tc.expectedComponentKind, beatEvent["agent."+otelComponentKindKey], "expected agent.otelcol.component.kind field in log record") + log := <-ch + for i := 0; i < log.ResourceLogs().Len(); i++ { + resourceLog := log.ResourceLogs().At(i) + for j := 0; j < resourceLog.ScopeLogs().Len(); j++ { + scopeLog := resourceLog.ScopeLogs().At(j) + for k := 0; k < scopeLog.LogRecords().Len(); k++ { + logRecord := scopeLog.LogRecords().At(k) + body := logRecord.Body().Map() + + // Traverse nested "agent.otelcol.component" structure + agentVal, ok := body.Get("agent") + require.True(t, ok, "expected 'agent' in log body") + + agentMap := agentVal.Map() + idVal, ok := agentMap.Get("otelcol.component.id") + require.True(t, ok, "expected 'agent.otelcol.component.id' in log body") + assert.Equal(t, tc.expectedComponentID, idVal.AsString()) + + kindVal, ok := agentMap.Get("otelcol.component.kind") + require.True(t, ok, "expected 'agent.otelcol.component.kind' in log body") + assert.Equal(t, tc.expectedComponentKind, kindVal.AsString()) + } + } } + }) } }) diff --git a/x-pack/metricbeat/mbreceiver/factory.go b/x-pack/metricbeat/mbreceiver/factory.go index 11a20361e701..cdac1fdc2047 100644 --- a/x-pack/metricbeat/mbreceiver/factory.go +++ b/x-pack/metricbeat/mbreceiver/factory.go @@ -51,7 +51,7 @@ func createReceiver(ctx context.Context, set receiver.Settings, baseCfg componen settings.ElasticLicensed = true settings.Initialize = append(settings.Initialize, include.InitializeModule) - b, err := xpInstance.NewBeatForReceiver(settings, cfg.Beatconfig, true, consumer, set.ID.String(), set.Logger.Core()) + b, err := xpInstance.NewBeatForReceiver(settings, cfg.Beatconfig, consumer, set.ID.String(), set.Logger.Core()) if err != nil { return nil, fmt.Errorf("error creating %s: %w", Name, err) } diff --git a/x-pack/metricbeat/mbreceiver/receiver_test.go b/x-pack/metricbeat/mbreceiver/receiver_test.go index 5be46b0294c1..cd8a3ab0cd15 100644 --- a/x-pack/metricbeat/mbreceiver/receiver_test.go +++ b/x-pack/metricbeat/mbreceiver/receiver_test.go @@ -65,9 +65,10 @@ func TestNewReceiver(t *testing.T) { "*", }, }, - "path.home": t.TempDir(), - "http.enabled": true, - "http.host": monitorHost, + "path.home": t.TempDir(), + "http.enabled": true, + "http.host": monitorHost, + "management.otel.enabled": true, }, }