Skip to content

Commit 48652dc

Browse files
[8.19](backport #47299) [beatreceiver] only add default processors if running under agent (#47412)
* [beatreceiver] only add default processors if running under agent (#47299) * chore: only add default processors if running unde ragent * fix test * fix test * tab * fix race condition * iterate over keys instead of map * fix test case (cherry picked from commit 78f115e) # Conflicts: # x-pack/metricbeat/tests/integration/otel_test.go * conflitcs --------- Co-authored-by: Vihas Makwana <[email protected]> Co-authored-by: Vihas <[email protected]>
1 parent 5e8fe2b commit 48652dc

File tree

11 files changed

+46
-24
lines changed

11 files changed

+46
-24
lines changed

libbeat/beat/info.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,10 +42,9 @@ type Info struct {
4242
UserAgent string // A string of the user-agent that can be passed to any outputs or network connections
4343
FIPSDistribution bool // If the beat was compiled as a FIPS distribution.
4444

45-
LogConsumer consumer.Logs // otel log consumer
46-
ComponentID string // otel component id from the collector config e.g. "filebeatreceiver/logs"
47-
UseDefaultProcessors bool // Whether to use the default processors
48-
Logger *logp.Logger
45+
LogConsumer consumer.Logs // otel log consumer
46+
ComponentID string // otel component id from the collector config e.g. "filebeatreceiver/logs"
47+
Logger *logp.Logger
4948
}
5049

5150
func (i Info) FQDNAwareHostname(useFQDN bool) string {

libbeat/publisher/processing/default.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -113,10 +113,9 @@ func MakeDefaultSupport(
113113
// don't try to "merge" the two lists somehow, if the supportFactory caller requests its own processors, use those
114114
// also makes it easier to disable global processors if needed, since they're otherwise hardcoded
115115
var rawProcessors processors.PluginConfig
116-
shouldLoadDefaultProcessors := info.UseDefaultProcessors || management.UnderAgent()
117116

118117
// don't check the array directly, use HasField, that way processors can easily be bypassed with -E processors=[]
119-
if shouldLoadDefaultProcessors && !beatCfg.HasField("processors") {
118+
if management.UnderAgent() && !beatCfg.HasField("processors") {
120119
log.Debugf("In fleet/otel mode with no processors specified, defaulting to global processors")
121120
rawProcessors = fleetDefaultProcessors
122121

x-pack/filebeat/fbreceiver/factory.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ func createReceiver(ctx context.Context, set receiver.Settings, baseCfg componen
4545
settings.ElasticLicensed = true
4646
settings.Initialize = append(settings.Initialize, include.InitializeModule)
4747

48-
b, err := xpInstance.NewBeatForReceiver(settings, cfg.Beatconfig, true, consumer, set.ID.String(), set.Logger.Core())
48+
b, err := xpInstance.NewBeatForReceiver(settings, cfg.Beatconfig, consumer, set.ID.String(), set.Logger.Core())
4949
if err != nil {
5050
return nil, fmt.Errorf("error creating %s: %w", Name, err)
5151
}

x-pack/filebeat/fbreceiver/receiver_test.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -69,9 +69,10 @@ func TestNewReceiver(t *testing.T) {
6969
"*",
7070
},
7171
},
72-
"path.home": t.TempDir(),
73-
"http.enabled": true,
74-
"http.host": monitorHost,
72+
"path.home": t.TempDir(),
73+
"http.enabled": true,
74+
"http.host": monitorHost,
75+
"management.otel.enabled": true,
7576
},
7677
}
7778

x-pack/filebeat/tests/integration/otel_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -359,6 +359,7 @@ func TestFilebeatOTelReceiverE2E(t *testing.T) {
359359
http.enabled: true
360360
http.host: localhost
361361
http.port: {{.MonitoringPort}}
362+
management.otel.enabled: true
362363
exporters:
363364
debug:
364365
use_internal_logger: false

x-pack/libbeat/cmd/instance/beat.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ import (
3939
const receiverPublisherCloseTimeout = 5 * time.Second
4040

4141
// NewBeatForReceiver creates a Beat that will be used in the context of an otel receiver
42-
func NewBeatForReceiver(settings instance.Settings, receiverConfig map[string]any, useDefaultProcessors bool, consumer consumer.Logs, componentID string, core zapcore.Core) (*instance.Beat, error) {
42+
func NewBeatForReceiver(settings instance.Settings, receiverConfig map[string]any, consumer consumer.Logs, componentID string, core zapcore.Core) (*instance.Beat, error) {
4343
b, err := instance.NewBeat(settings.Name,
4444
settings.IndexPrefix,
4545
settings.Version,
@@ -235,7 +235,6 @@ func NewBeatForReceiver(settings instance.Settings, receiverConfig map[string]an
235235
return nil, fmt.Errorf("error setting index supporter: %w", err)
236236
}
237237

238-
b.Info.UseDefaultProcessors = useDefaultProcessors
239238
processingFactory := settings.Processing
240239
if processingFactory == nil {
241240
processingFactory = processing.MakeDefaultBeatSupport(true)

x-pack/libbeat/cmd/instance/beat_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ func TestManager(t *testing.T) {
3636
"path.home": tmpDir,
3737
}
3838
t.Run("otel management disabled - key missing", func(t *testing.T) {
39-
beat, err := NewBeatForReceiver(cmd.FilebeatSettings("filebeat"), cfg, false, consumertest.NewNop(), "testcomponent", zapcore.NewNopCore())
39+
beat, err := NewBeatForReceiver(cmd.FilebeatSettings("filebeat"), cfg, consumertest.NewNop(), "testcomponent", zapcore.NewNopCore())
4040
assert.NoError(t, err)
4141
assert.NotNil(t, beat.Manager)
4242
// it should fallback to FallbackManager if key is missing
@@ -50,7 +50,7 @@ func TestManager(t *testing.T) {
5050
defer func() {
5151
management.SetUnderAgent(false) // reset to false
5252
}()
53-
beat, err := NewBeatForReceiver(cmd.FilebeatSettings("filebeat"), tmpCfg, false, consumertest.NewNop(), "testcomponent", zapcore.NewNopCore())
53+
beat, err := NewBeatForReceiver(cmd.FilebeatSettings("filebeat"), tmpCfg, consumertest.NewNop(), "testcomponent", zapcore.NewNopCore())
5454
assert.NoError(t, err)
5555
assert.NotNil(t, beat.Manager)
5656
assert.IsType(t, beat.Manager, &otelmanager.OtelManager{})
@@ -63,7 +63,7 @@ func TestManager(t *testing.T) {
6363
defer func() {
6464
management.SetUnderAgent(false) // reset to false
6565
}()
66-
beat, err := NewBeatForReceiver(cmd.FilebeatSettings("filebeat"), tmpCfg, false, consumertest.NewNop(), "testcomponent", zapcore.NewNopCore())
66+
beat, err := NewBeatForReceiver(cmd.FilebeatSettings("filebeat"), tmpCfg, consumertest.NewNop(), "testcomponent", zapcore.NewNopCore())
6767
assert.NoError(t, err)
6868
assert.NotNil(t, beat.Manager)
6969
assert.IsType(t, beat.Manager, &management.FallbackManager{})

x-pack/libbeat/outputs/otelconsumer/otelconsumer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ func (out *otelConsumer) logsPublish(ctx context.Context, batch publisher.Batch)
115115
}
116116
}
117117

118-
beatEvent := event.Content.Fields
118+
beatEvent := event.Content.Fields.Clone()
119119
if beatEvent == nil {
120120
beatEvent = mapstr.M{}
121121
}

x-pack/libbeat/outputs/otelconsumer/otelconsumer_test.go

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -274,10 +274,12 @@ func TestPublish(t *testing.T) {
274274
"_id": "abc123",
275275
},
276276
}
277+
ch := make(chan plog.Logs, 1)
277278
batch := outest.NewBatch(event)
278279
var countLogs int
279280
otelConsumer := makeOtelConsumer(t, func(ctx context.Context, ld plog.Logs) error {
280281
countLogs = countLogs + ld.LogRecordCount()
282+
ch <- ld
281283
return nil
282284
})
283285
otelConsumer.beatInfo.ComponentID = tc.componentID
@@ -286,11 +288,31 @@ func TestPublish(t *testing.T) {
286288
assert.Len(t, batch.Signals, 1)
287289
assert.Equal(t, outest.BatchACK, batch.Signals[0].Tag)
288290
assert.Equal(t, len(batch.Events()), countLogs, "all events should be consumed")
289-
for _, event := range batch.Events() {
290-
beatEvent := event.Content.Fields.Flatten()
291-
assert.Equal(t, tc.expectedComponentID, beatEvent["agent."+otelComponentIDKey], "expected agent.otelcol.component.id field in log record")
292-
assert.Equal(t, tc.expectedComponentKind, beatEvent["agent."+otelComponentKindKey], "expected agent.otelcol.component.kind field in log record")
291+
log := <-ch
292+
for i := 0; i < log.ResourceLogs().Len(); i++ {
293+
resourceLog := log.ResourceLogs().At(i)
294+
for j := 0; j < resourceLog.ScopeLogs().Len(); j++ {
295+
scopeLog := resourceLog.ScopeLogs().At(j)
296+
for k := 0; k < scopeLog.LogRecords().Len(); k++ {
297+
logRecord := scopeLog.LogRecords().At(k)
298+
body := logRecord.Body().Map()
299+
300+
// Traverse nested "agent.otelcol.component" structure
301+
agentVal, ok := body.Get("agent")
302+
require.True(t, ok, "expected 'agent' in log body")
303+
304+
agentMap := agentVal.Map()
305+
idVal, ok := agentMap.Get("otelcol.component.id")
306+
require.True(t, ok, "expected 'agent.otelcol.component.id' in log body")
307+
assert.Equal(t, tc.expectedComponentID, idVal.AsString())
308+
309+
kindVal, ok := agentMap.Get("otelcol.component.kind")
310+
require.True(t, ok, "expected 'agent.otelcol.component.kind' in log body")
311+
assert.Equal(t, tc.expectedComponentKind, kindVal.AsString())
312+
}
313+
}
293314
}
315+
294316
})
295317
}
296318
})

x-pack/metricbeat/mbreceiver/factory.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ func createReceiver(ctx context.Context, set receiver.Settings, baseCfg componen
5151
settings.ElasticLicensed = true
5252
settings.Initialize = append(settings.Initialize, include.InitializeModule)
5353

54-
b, err := xpInstance.NewBeatForReceiver(settings, cfg.Beatconfig, true, consumer, set.ID.String(), set.Logger.Core())
54+
b, err := xpInstance.NewBeatForReceiver(settings, cfg.Beatconfig, consumer, set.ID.String(), set.Logger.Core())
5555
if err != nil {
5656
return nil, fmt.Errorf("error creating %s: %w", Name, err)
5757
}

0 commit comments

Comments
 (0)