Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions libbeat/beat/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,9 @@ type Info struct {
UserAgent string // A string of the user-agent that can be passed to any outputs or network connections
FIPSDistribution bool // If the beat was compiled as a FIPS distribution.

LogConsumer consumer.Logs // otel log consumer
ComponentID string // otel component id from the collector config e.g. "filebeatreceiver/logs"
UseDefaultProcessors bool // Whether to use the default processors
Logger *logp.Logger
LogConsumer consumer.Logs // otel log consumer
ComponentID string // otel component id from the collector config e.g. "filebeatreceiver/logs"
Logger *logp.Logger
}

func (i Info) FQDNAwareHostname(useFQDN bool) string {
Expand Down
3 changes: 1 addition & 2 deletions libbeat/publisher/processing/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,9 @@ func MakeDefaultSupport(
// don't try to "merge" the two lists somehow, if the supportFactory caller requests its own processors, use those
// also makes it easier to disable global processors if needed, since they're otherwise hardcoded
var rawProcessors processors.PluginConfig
shouldLoadDefaultProcessors := info.UseDefaultProcessors || management.UnderAgent()

// don't check the array directly, use HasField, that way processors can easily be bypassed with -E processors=[]
if shouldLoadDefaultProcessors && !beatCfg.HasField("processors") {
if management.UnderAgent() && !beatCfg.HasField("processors") {
log.Debugf("In fleet/otel mode with no processors specified, defaulting to global processors")
rawProcessors = fleetDefaultProcessors

Expand Down
2 changes: 1 addition & 1 deletion x-pack/filebeat/fbreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func createReceiver(ctx context.Context, set receiver.Settings, baseCfg componen
settings.ElasticLicensed = true
settings.Initialize = append(settings.Initialize, include.InitializeModule)

b, err := xpInstance.NewBeatForReceiver(settings, cfg.Beatconfig, true, consumer, set.ID.String(), set.Logger.Core())
b, err := xpInstance.NewBeatForReceiver(settings, cfg.Beatconfig, consumer, set.ID.String(), set.Logger.Core())
if err != nil {
return nil, fmt.Errorf("error creating %s: %w", Name, err)
}
Expand Down
7 changes: 4 additions & 3 deletions x-pack/filebeat/fbreceiver/receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,10 @@ func TestNewReceiver(t *testing.T) {
"*",
},
},
"path.home": t.TempDir(),
"http.enabled": true,
"http.host": monitorHost,
"path.home": t.TempDir(),
"http.enabled": true,
"http.host": monitorHost,
"management.otel.enabled": true,
},
}

Expand Down
1 change: 1 addition & 0 deletions x-pack/filebeat/tests/integration/otel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,7 @@ func TestFilebeatOTelReceiverE2E(t *testing.T) {
http.enabled: true
http.host: localhost
http.port: {{.MonitoringPort}}
management.otel.enabled: true
exporters:
debug:
use_internal_logger: false
Expand Down
3 changes: 1 addition & 2 deletions x-pack/libbeat/cmd/instance/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import (
const receiverPublisherCloseTimeout = 5 * time.Second

// NewBeatForReceiver creates a Beat that will be used in the context of an otel receiver
func NewBeatForReceiver(settings instance.Settings, receiverConfig map[string]any, useDefaultProcessors bool, consumer consumer.Logs, componentID string, core zapcore.Core) (*instance.Beat, error) {
func NewBeatForReceiver(settings instance.Settings, receiverConfig map[string]any, consumer consumer.Logs, componentID string, core zapcore.Core) (*instance.Beat, error) {
b, err := instance.NewBeat(settings.Name,
settings.IndexPrefix,
settings.Version,
Expand Down Expand Up @@ -235,7 +235,6 @@ func NewBeatForReceiver(settings instance.Settings, receiverConfig map[string]an
return nil, fmt.Errorf("error setting index supporter: %w", err)
}

b.Info.UseDefaultProcessors = useDefaultProcessors
processingFactory := settings.Processing
if processingFactory == nil {
processingFactory = processing.MakeDefaultBeatSupport(true)
Expand Down
6 changes: 3 additions & 3 deletions x-pack/libbeat/cmd/instance/beat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func TestManager(t *testing.T) {
"path.home": tmpDir,
}
t.Run("otel management disabled - key missing", func(t *testing.T) {
beat, err := NewBeatForReceiver(cmd.FilebeatSettings("filebeat"), cfg, false, consumertest.NewNop(), "testcomponent", zapcore.NewNopCore())
beat, err := NewBeatForReceiver(cmd.FilebeatSettings("filebeat"), cfg, consumertest.NewNop(), "testcomponent", zapcore.NewNopCore())
assert.NoError(t, err)
assert.NotNil(t, beat.Manager)
// it should fallback to FallbackManager if key is missing
Expand All @@ -50,7 +50,7 @@ func TestManager(t *testing.T) {
defer func() {
management.SetUnderAgent(false) // reset to false
}()
beat, err := NewBeatForReceiver(cmd.FilebeatSettings("filebeat"), tmpCfg, false, consumertest.NewNop(), "testcomponent", zapcore.NewNopCore())
beat, err := NewBeatForReceiver(cmd.FilebeatSettings("filebeat"), tmpCfg, consumertest.NewNop(), "testcomponent", zapcore.NewNopCore())
assert.NoError(t, err)
assert.NotNil(t, beat.Manager)
assert.IsType(t, beat.Manager, &otelmanager.OtelManager{})
Expand All @@ -63,7 +63,7 @@ func TestManager(t *testing.T) {
defer func() {
management.SetUnderAgent(false) // reset to false
}()
beat, err := NewBeatForReceiver(cmd.FilebeatSettings("filebeat"), tmpCfg, false, consumertest.NewNop(), "testcomponent", zapcore.NewNopCore())
beat, err := NewBeatForReceiver(cmd.FilebeatSettings("filebeat"), tmpCfg, consumertest.NewNop(), "testcomponent", zapcore.NewNopCore())
assert.NoError(t, err)
assert.NotNil(t, beat.Manager)
assert.IsType(t, beat.Manager, &management.FallbackManager{})
Expand Down
2 changes: 1 addition & 1 deletion x-pack/libbeat/outputs/otelconsumer/otelconsumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func (out *otelConsumer) logsPublish(ctx context.Context, batch publisher.Batch)
}
}

beatEvent := event.Content.Fields
beatEvent := event.Content.Fields.Clone()
if beatEvent == nil {
beatEvent = mapstr.M{}
}
Expand Down
30 changes: 26 additions & 4 deletions x-pack/libbeat/outputs/otelconsumer/otelconsumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,10 +274,12 @@ func TestPublish(t *testing.T) {
"_id": "abc123",
},
}
ch := make(chan plog.Logs, 1)
batch := outest.NewBatch(event)
var countLogs int
otelConsumer := makeOtelConsumer(t, func(ctx context.Context, ld plog.Logs) error {
countLogs = countLogs + ld.LogRecordCount()
ch <- ld
return nil
})
otelConsumer.beatInfo.ComponentID = tc.componentID
Expand All @@ -286,11 +288,31 @@ func TestPublish(t *testing.T) {
assert.Len(t, batch.Signals, 1)
assert.Equal(t, outest.BatchACK, batch.Signals[0].Tag)
assert.Equal(t, len(batch.Events()), countLogs, "all events should be consumed")
for _, event := range batch.Events() {
beatEvent := event.Content.Fields.Flatten()
assert.Equal(t, tc.expectedComponentID, beatEvent["agent."+otelComponentIDKey], "expected agent.otelcol.component.id field in log record")
assert.Equal(t, tc.expectedComponentKind, beatEvent["agent."+otelComponentKindKey], "expected agent.otelcol.component.kind field in log record")
log := <-ch
for i := 0; i < log.ResourceLogs().Len(); i++ {
resourceLog := log.ResourceLogs().At(i)
for j := 0; j < resourceLog.ScopeLogs().Len(); j++ {
scopeLog := resourceLog.ScopeLogs().At(j)
for k := 0; k < scopeLog.LogRecords().Len(); k++ {
logRecord := scopeLog.LogRecords().At(k)
body := logRecord.Body().Map()

// Traverse nested "agent.otelcol.component" structure
agentVal, ok := body.Get("agent")
require.True(t, ok, "expected 'agent' in log body")

agentMap := agentVal.Map()
idVal, ok := agentMap.Get("otelcol.component.id")
require.True(t, ok, "expected 'agent.otelcol.component.id' in log body")
assert.Equal(t, tc.expectedComponentID, idVal.AsString())

kindVal, ok := agentMap.Get("otelcol.component.kind")
require.True(t, ok, "expected 'agent.otelcol.component.kind' in log body")
assert.Equal(t, tc.expectedComponentKind, kindVal.AsString())
}
}
}

})
}
})
Expand Down
2 changes: 1 addition & 1 deletion x-pack/metricbeat/mbreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func createReceiver(ctx context.Context, set receiver.Settings, baseCfg componen
settings.ElasticLicensed = true
settings.Initialize = append(settings.Initialize, include.InitializeModule)

b, err := xpInstance.NewBeatForReceiver(settings, cfg.Beatconfig, true, consumer, set.ID.String(), set.Logger.Core())
b, err := xpInstance.NewBeatForReceiver(settings, cfg.Beatconfig, consumer, set.ID.String(), set.Logger.Core())
if err != nil {
return nil, fmt.Errorf("error creating %s: %w", Name, err)
}
Expand Down
7 changes: 4 additions & 3 deletions x-pack/metricbeat/mbreceiver/receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,10 @@ func TestNewReceiver(t *testing.T) {
"*",
},
},
"path.home": t.TempDir(),
"http.enabled": true,
"http.host": monitorHost,
"path.home": t.TempDir(),
"http.enabled": true,
"http.host": monitorHost,
"management.otel.enabled": true,
},
}

Expand Down