Skip to content

Commit 297e2f6

Browse files
swiatekmmergify[bot]
authored andcommitted
Check for config processing errors in coordinator unit tests (#10203)
(cherry picked from commit 0ba2ea6) # Conflicts: # internal/pkg/agent/application/coordinator/coordinator_unit_test.go
1 parent d587329 commit 297e2f6

File tree

1 file changed

+194
-0
lines changed

1 file changed

+194
-0
lines changed

internal/pkg/agent/application/coordinator/coordinator_unit_test.go

Lines changed: 194 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -937,12 +937,206 @@ service:
937937
configChan <- cfgChange
938938
coord.runLoopIteration(ctx)
939939
assert.True(t, cfgChange.acked, "empty policy should be acknowledged")
940+
assert.NoError(t, cfgChange.err, "config processing shouldn't report an error")
940941
assert.True(t, updated, "empty policy should cause runtime manager update")
941942
assert.Empty(t, components, "empty policy should produce empty component model")
942943
assert.True(t, otelUpdated, "empty policy should cause otel manager update")
943944
assert.Nil(t, otelConfig, "empty policy should cause otel manager to get nil config")
944945
}
945946

947+
<<<<<<< HEAD
948+
=======
949+
func TestCoordinatorPolicyChangeUpdatesRuntimeAndOTelManagerWithOtelComponents(t *testing.T) {
950+
// Send a test policy to the Coordinator as a Config Manager update,
951+
// verify it generates the right component model and sends components
952+
// to both the runtime manager and the otel manager.
953+
954+
// Set a one-second timeout -- nothing here should block, but if it
955+
// does let's report a failure instead of timing out the test runner.
956+
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
957+
defer cancel()
958+
logger := logp.NewLogger("testing")
959+
960+
configChan := make(chan ConfigChange, 1)
961+
962+
// Create a mocked runtime manager that will report the update call
963+
var updated bool // Set by runtime manager callback
964+
var components []component.Component // Set by runtime manager callback
965+
runtimeManager := &fakeRuntimeManager{
966+
updateCallback: func(comp []component.Component) error {
967+
updated = true
968+
components = comp
969+
return nil
970+
},
971+
}
972+
var otelUpdated bool // Set by otel manager callback
973+
var otelConfig *confmap.Conf // Set by otel manager callback
974+
otelManager := &fakeOTelManager{
975+
updateCollectorCallback: func(cfg *confmap.Conf) error {
976+
otelUpdated = true
977+
otelConfig = cfg
978+
return nil
979+
},
980+
}
981+
982+
// we need the filestream spec to be able to convert to Otel config
983+
componentSpec := component.InputRuntimeSpec{
984+
InputType: "filestream",
985+
BinaryName: "agentbeat",
986+
Spec: component.InputSpec{
987+
Name: "filestream",
988+
Command: &component.CommandSpec{
989+
Args: []string{"filebeat"},
990+
},
991+
Platforms: []string{
992+
"linux/amd64",
993+
"linux/arm64",
994+
"darwin/amd64",
995+
"darwin/arm64",
996+
"windows/amd64",
997+
"container/amd64",
998+
"container/arm64",
999+
},
1000+
},
1001+
}
1002+
1003+
platform, err := component.LoadPlatformDetail()
1004+
require.NoError(t, err)
1005+
specs, err := component.NewRuntimeSpecs(platform, []component.InputRuntimeSpec{componentSpec})
1006+
require.NoError(t, err)
1007+
1008+
monitoringMgr := newTestMonitoringMgr()
1009+
coord := &Coordinator{
1010+
logger: logger,
1011+
agentInfo: &info.AgentInfo{},
1012+
stateBroadcaster: broadcaster.New(State{}, 0, 0),
1013+
managerChans: managerChans{
1014+
configManagerUpdate: configChan,
1015+
},
1016+
monitorMgr: monitoringMgr,
1017+
runtimeMgr: runtimeManager,
1018+
otelMgr: otelManager,
1019+
specs: specs,
1020+
vars: emptyVars(t),
1021+
componentPIDTicker: time.NewTicker(time.Second * 30),
1022+
secretMarkerFunc: testSecretMarkerFunc,
1023+
}
1024+
1025+
t.Run("mixed policy", func(t *testing.T) {
1026+
// Create a policy with one input and one output (no otel configuration)
1027+
cfg := config.MustNewConfigFrom(`
1028+
outputs:
1029+
default:
1030+
type: elasticsearch
1031+
hosts:
1032+
- localhost:9200
1033+
inputs:
1034+
- id: test-input
1035+
type: filestream
1036+
use_output: default
1037+
_runtime_experimental: otel
1038+
- id: test-other-input
1039+
type: system/metrics
1040+
use_output: default
1041+
receivers:
1042+
nop:
1043+
exporters:
1044+
nop:
1045+
service:
1046+
pipelines:
1047+
traces:
1048+
receivers:
1049+
- nop
1050+
exporters:
1051+
- nop
1052+
`)
1053+
1054+
// Send the policy change and make sure it was acknowledged.
1055+
cfgChange := &configChange{cfg: cfg}
1056+
configChan <- cfgChange
1057+
coord.runLoopIteration(ctx)
1058+
assert.True(t, cfgChange.acked, "Coordinator should ACK a successful policy change")
1059+
assert.NoError(t, cfgChange.err, "config processing shouldn't report an error")
1060+
1061+
// Make sure the runtime manager received the expected component update.
1062+
// An assert.Equal on the full component model doesn't play nice with
1063+
// the embedded proto structs, so instead we verify the important fields
1064+
// manually (sorry).
1065+
assert.True(t, updated, "Runtime manager should be updated after a policy change")
1066+
require.Equal(t, 1, len(components), "Test policy should generate one component")
1067+
assert.True(t, otelUpdated, "OTel manager should be updated after a policy change")
1068+
require.NotNil(t, otelConfig, "OTel manager should have config")
1069+
1070+
runtimeComponent := components[0]
1071+
assert.Equal(t, "system/metrics-default", runtimeComponent.ID)
1072+
require.NotNil(t, runtimeComponent.Err, "Input with no spec should produce a component error")
1073+
assert.Equal(t, "input not supported", runtimeComponent.Err.Error(), "Input with no spec should report 'input not supported'")
1074+
require.Equal(t, 2, len(runtimeComponent.Units))
1075+
1076+
units := runtimeComponent.Units
1077+
// Verify the input unit
1078+
assert.Equal(t, "system/metrics-default-test-other-input", units[0].ID)
1079+
assert.Equal(t, client.UnitTypeInput, units[0].Type)
1080+
assert.Equal(t, "test-other-input", units[0].Config.Id)
1081+
assert.Equal(t, "system/metrics", units[0].Config.Type)
1082+
1083+
// Verify the output unit
1084+
assert.Equal(t, "system/metrics-default", units[1].ID)
1085+
assert.Equal(t, client.UnitTypeOutput, units[1].Type)
1086+
assert.Equal(t, "elasticsearch", units[1].Config.Type)
1087+
})
1088+
1089+
t.Run("unsupported otel output option", func(t *testing.T) {
1090+
// Create a policy with one input and one output (no otel configuration)
1091+
cfg := config.MustNewConfigFrom(`
1092+
outputs:
1093+
default:
1094+
type: elasticsearch
1095+
hosts:
1096+
- localhost:9200
1097+
indices: [] # not supported by the elasticsearch exporter
1098+
inputs:
1099+
- id: test-input
1100+
type: filestream
1101+
use_output: default
1102+
_runtime_experimental: otel
1103+
- id: test-other-input
1104+
type: system/metrics
1105+
use_output: default
1106+
receivers:
1107+
nop:
1108+
exporters:
1109+
nop:
1110+
service:
1111+
pipelines:
1112+
traces:
1113+
receivers:
1114+
- nop
1115+
exporters:
1116+
- nop
1117+
`)
1118+
1119+
// Send the policy change and make sure it was acknowledged.
1120+
cfgChange := &configChange{cfg: cfg}
1121+
configChan <- cfgChange
1122+
coord.runLoopIteration(ctx)
1123+
assert.True(t, cfgChange.acked, "Coordinator should ACK a successful policy change")
1124+
assert.NoError(t, cfgChange.err, "config processing shouldn't report an error")
1125+
1126+
// Make sure the runtime manager received the expected component update.
1127+
// An assert.Equal on the full component model doesn't play nice with
1128+
// the embedded proto structs, so instead we verify the important fields
1129+
// manually (sorry).
1130+
assert.True(t, updated, "Runtime manager should be updated after a policy change")
1131+
assert.True(t, otelUpdated, "OTel manager should be updated after a policy change")
1132+
require.NotNil(t, otelConfig, "OTel manager should have config")
1133+
1134+
assert.Len(t, components, 2, "both components should be assigned to the runtime manager")
1135+
})
1136+
1137+
}
1138+
1139+
>>>>>>> 0ba2ea683 (Check for config processing errors in coordinator unit tests (#10203))
9461140
func TestCoordinatorReportsRuntimeManagerUpdateFailure(t *testing.T) {
9471141
// Set a one-second timeout -- nothing here should block, but if it
9481142
// does let's report a failure instead of timing out the test runner.

0 commit comments

Comments
 (0)