diff --git a/internal/pkg/agent/application/coordinator/coordinator_unit_test.go b/internal/pkg/agent/application/coordinator/coordinator_unit_test.go index e1a56933c62..4433c5f1e68 100644 --- a/internal/pkg/agent/application/coordinator/coordinator_unit_test.go +++ b/internal/pkg/agent/application/coordinator/coordinator_unit_test.go @@ -937,12 +937,206 @@ service: configChan <- cfgChange coord.runLoopIteration(ctx) assert.True(t, cfgChange.acked, "empty policy should be acknowledged") + assert.NoError(t, cfgChange.err, "config processing shouldn't report an error") assert.True(t, updated, "empty policy should cause runtime manager update") assert.Empty(t, components, "empty policy should produce empty component model") assert.True(t, otelUpdated, "empty policy should cause otel manager update") assert.Nil(t, otelConfig, "empty policy should cause otel manager to get nil config") } +<<<<<<< HEAD +======= +func TestCoordinatorPolicyChangeUpdatesRuntimeAndOTelManagerWithOtelComponents(t *testing.T) { + // Send a test policy to the Coordinator as a Config Manager update, + // verify it generates the right component model and sends components + // to both the runtime manager and the otel manager. + + // Set a one-second timeout -- nothing here should block, but if it + // does let's report a failure instead of timing out the test runner. + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + logger := logp.NewLogger("testing") + + configChan := make(chan ConfigChange, 1) + + // Create a mocked runtime manager that will report the update call + var updated bool // Set by runtime manager callback + var components []component.Component // Set by runtime manager callback + runtimeManager := &fakeRuntimeManager{ + updateCallback: func(comp []component.Component) error { + updated = true + components = comp + return nil + }, + } + var otelUpdated bool // Set by otel manager callback + var otelConfig *confmap.Conf // Set by otel manager callback + otelManager := &fakeOTelManager{ + updateCollectorCallback: func(cfg *confmap.Conf) error { + otelUpdated = true + otelConfig = cfg + return nil + }, + } + + // we need the filestream spec to be able to convert to Otel config + componentSpec := component.InputRuntimeSpec{ + InputType: "filestream", + BinaryName: "agentbeat", + Spec: component.InputSpec{ + Name: "filestream", + Command: &component.CommandSpec{ + Args: []string{"filebeat"}, + }, + Platforms: []string{ + "linux/amd64", + "linux/arm64", + "darwin/amd64", + "darwin/arm64", + "windows/amd64", + "container/amd64", + "container/arm64", + }, + }, + } + + platform, err := component.LoadPlatformDetail() + require.NoError(t, err) + specs, err := component.NewRuntimeSpecs(platform, []component.InputRuntimeSpec{componentSpec}) + require.NoError(t, err) + + monitoringMgr := newTestMonitoringMgr() + coord := &Coordinator{ + logger: logger, + agentInfo: &info.AgentInfo{}, + stateBroadcaster: broadcaster.New(State{}, 0, 0), + managerChans: managerChans{ + configManagerUpdate: configChan, + }, + monitorMgr: monitoringMgr, + runtimeMgr: runtimeManager, + otelMgr: otelManager, + specs: specs, + vars: emptyVars(t), + componentPIDTicker: time.NewTicker(time.Second * 30), + secretMarkerFunc: testSecretMarkerFunc, + } + + t.Run("mixed policy", func(t *testing.T) { + // Create a policy with one input and one output (no otel configuration) + cfg := config.MustNewConfigFrom(` +outputs: + default: + type: elasticsearch + hosts: + - localhost:9200 +inputs: + - id: test-input + type: filestream + use_output: default + _runtime_experimental: otel + - id: test-other-input + type: system/metrics + use_output: default +receivers: + nop: +exporters: + nop: +service: + pipelines: + traces: + receivers: + - nop + exporters: + - nop +`) + + // Send the policy change and make sure it was acknowledged. + cfgChange := &configChange{cfg: cfg} + configChan <- cfgChange + coord.runLoopIteration(ctx) + assert.True(t, cfgChange.acked, "Coordinator should ACK a successful policy change") + assert.NoError(t, cfgChange.err, "config processing shouldn't report an error") + + // Make sure the runtime manager received the expected component update. + // An assert.Equal on the full component model doesn't play nice with + // the embedded proto structs, so instead we verify the important fields + // manually (sorry). + assert.True(t, updated, "Runtime manager should be updated after a policy change") + require.Equal(t, 1, len(components), "Test policy should generate one component") + assert.True(t, otelUpdated, "OTel manager should be updated after a policy change") + require.NotNil(t, otelConfig, "OTel manager should have config") + + runtimeComponent := components[0] + assert.Equal(t, "system/metrics-default", runtimeComponent.ID) + require.NotNil(t, runtimeComponent.Err, "Input with no spec should produce a component error") + assert.Equal(t, "input not supported", runtimeComponent.Err.Error(), "Input with no spec should report 'input not supported'") + require.Equal(t, 2, len(runtimeComponent.Units)) + + units := runtimeComponent.Units + // Verify the input unit + assert.Equal(t, "system/metrics-default-test-other-input", units[0].ID) + assert.Equal(t, client.UnitTypeInput, units[0].Type) + assert.Equal(t, "test-other-input", units[0].Config.Id) + assert.Equal(t, "system/metrics", units[0].Config.Type) + + // Verify the output unit + assert.Equal(t, "system/metrics-default", units[1].ID) + assert.Equal(t, client.UnitTypeOutput, units[1].Type) + assert.Equal(t, "elasticsearch", units[1].Config.Type) + }) + + t.Run("unsupported otel output option", func(t *testing.T) { + // Create a policy with one input and one output (no otel configuration) + cfg := config.MustNewConfigFrom(` +outputs: + default: + type: elasticsearch + hosts: + - localhost:9200 + indices: [] # not supported by the elasticsearch exporter +inputs: + - id: test-input + type: filestream + use_output: default + _runtime_experimental: otel + - id: test-other-input + type: system/metrics + use_output: default +receivers: + nop: +exporters: + nop: +service: + pipelines: + traces: + receivers: + - nop + exporters: + - nop +`) + + // Send the policy change and make sure it was acknowledged. + cfgChange := &configChange{cfg: cfg} + configChan <- cfgChange + coord.runLoopIteration(ctx) + assert.True(t, cfgChange.acked, "Coordinator should ACK a successful policy change") + assert.NoError(t, cfgChange.err, "config processing shouldn't report an error") + + // Make sure the runtime manager received the expected component update. + // An assert.Equal on the full component model doesn't play nice with + // the embedded proto structs, so instead we verify the important fields + // manually (sorry). + assert.True(t, updated, "Runtime manager should be updated after a policy change") + assert.True(t, otelUpdated, "OTel manager should be updated after a policy change") + require.NotNil(t, otelConfig, "OTel manager should have config") + + assert.Len(t, components, 2, "both components should be assigned to the runtime manager") + }) + +} + +>>>>>>> 0ba2ea683 (Check for config processing errors in coordinator unit tests (#10203)) func TestCoordinatorReportsRuntimeManagerUpdateFailure(t *testing.T) { // Set a one-second timeout -- nothing here should block, but if it // does let's report a failure instead of timing out the test runner.