@@ -794,263 +794,6 @@ func mapAtLeastOneTrue(mm map[string]bool) bool {
794794 return false
795795}
796796
797- func TestFileBeatReceiver (t * testing.T ) {
798- define .Require (t , define.Requirements {
799- Group : integration .Default ,
800- Local : true ,
801- OS : []define.OS {
802- {Type : define .Windows },
803- {Type : define .Linux },
804- {Type : define .Darwin },
805- },
806- })
807-
808- type otelConfigOptions struct {
809- Message string
810- Output string
811- HomeDir string
812- }
813- testMessage := "supercalifragilisticexpialidocious"
814- tmpDir := t .TempDir ()
815- exporterOutputPath := filepath .Join (tmpDir , "output.json" )
816- t .Cleanup (func () {
817- if t .Failed () {
818- contents , err := os .ReadFile (exporterOutputPath )
819- if err != nil {
820- t .Logf ("No exporter output file" )
821- return
822- }
823- t .Logf ("Contents of exporter output file:\n %s\n " , string (contents ))
824- }
825- })
826- otelConfigPath := filepath .Join (tmpDir , "otel.yml" )
827- otelConfigTemplate := `receivers:
828- filebeatreceiver:
829- filebeat:
830- inputs:
831- - type: benchmark
832- enabled: true
833- count: 1
834- message: {{.Message}}
835- output:
836- otelconsumer:
837- logging:
838- level: info
839- selectors:
840- - '*'
841- path.home: {{.HomeDir}}
842- exporters:
843- file/no_rotation:
844- path: {{.Output}}
845- service:
846- pipelines:
847- logs:
848- receivers: [filebeatreceiver]
849- exporters: [file/no_rotation]
850- `
851-
852- var otelConfigBuffer bytes.Buffer
853- require .NoError (t ,
854- template .Must (template .New ("otelConfig" ).Parse (otelConfigTemplate )).Execute (& otelConfigBuffer ,
855- otelConfigOptions {
856- Message : testMessage ,
857- Output : exporterOutputPath ,
858- HomeDir : tmpDir ,
859- }))
860- require .NoError (t , os .WriteFile (otelConfigPath , otelConfigBuffer .Bytes (), 0o600 ))
861- t .Cleanup (func () {
862- if t .Failed () {
863- contents , err := os .ReadFile (otelConfigPath )
864- if err != nil {
865- t .Logf ("no otel config file" )
866- return
867- }
868- t .Logf ("Contents of otel config file:\n %s\n " , string (contents ))
869- }
870- })
871- fixture , err := define .NewFixtureFromLocalBuild (t , define .Version (), aTesting .WithAdditionalArgs ([]string {"--config" , otelConfigPath }))
872- require .NoError (t , err )
873-
874- ctx , cancel := testcontext .WithDeadline (t , context .Background (), time .Now ().Add (5 * time .Minute ))
875- defer cancel ()
876- err = fixture .Prepare (ctx , fakeComponent )
877- require .NoError (t , err )
878-
879- var fixtureWg sync.WaitGroup
880- fixtureWg .Add (1 )
881- go func () {
882- defer fixtureWg .Done ()
883- err = fixture .RunOtelWithClient (ctx )
884- }()
885-
886- require .Eventually (t ,
887- func () bool {
888- content , err := os .ReadFile (exporterOutputPath )
889- if err != nil || len (content ) == 0 {
890- return false
891- }
892- return bytes .Contains (content , []byte (testMessage ))
893- },
894- 3 * time .Minute , 1 * time .Second ,
895- fmt .Sprintf ("there should be exported logs by now" ))
896-
897- cancel ()
898- fixtureWg .Wait ()
899- require .True (t , err == nil || err == context .Canceled || err == context .DeadlineExceeded , "Retrieved unexpected error: %s" , err .Error ())
900- }
901-
902- func TestOtelFBReceiverE2E (t * testing.T ) {
903- info := define .Require (t , define.Requirements {
904- Group : integration .Default ,
905- Local : true ,
906- OS : []define.OS {
907- {Type : define .Windows },
908- {Type : define .Linux },
909- {Type : define .Darwin },
910- },
911- Stack : & define.Stack {},
912- })
913- tmpDir := t .TempDir ()
914- numEvents := 50
915- // Create the data file to ingest
916- inputFile , err := os .CreateTemp (tmpDir , "input.txt" )
917- require .NoError (t , err , "failed to create temp file to hold data to ingest" )
918- inputFilePath := inputFile .Name ()
919- for i := 0 ; i < numEvents ; i ++ {
920- _ , err = inputFile .Write ([]byte (fmt .Sprintf ("Line %d\n " , i )))
921- require .NoErrorf (t , err , "failed to write line %d to temp file" , i )
922- }
923- err = inputFile .Close ()
924- require .NoError (t , err , "failed to close data temp file" )
925- t .Cleanup (func () {
926- if t .Failed () {
927- contents , err := os .ReadFile (inputFilePath )
928- if err != nil {
929- t .Logf ("no data file to import at %s" , inputFilePath )
930- return
931- }
932- t .Logf ("contents of import file:\n %s\n " , string (contents ))
933- }
934- })
935-
936- // Create the otel configuration file
937- type otelConfigOptions struct {
938- InputPath string
939- HomeDir string
940- ESEndpoint string
941- ESApiKey string
942- Index string
943- MinItems int
944- }
945- esEndpoint , err := integration .GetESHost ()
946- require .NoError (t , err , "error getting elasticsearch endpoint" )
947- esApiKey , err := createESApiKey (info .ESClient )
948- require .NoError (t , err , "error creating API key" )
949- require .True (t , len (esApiKey .Encoded ) > 1 , "api key is invalid %q" , esApiKey )
950- index := "logs-integration-default"
951- otelConfigTemplate := `receivers:
952- filebeatreceiver:
953- filebeat:
954- inputs:
955- - type: filestream
956- id: filestream-end-to-end
957- enabled: true
958- paths:
959- - {{.InputPath}}
960- prospector.scanner.fingerprint.enabled: false
961- file_identity.native: ~
962- output:
963- otelconsumer:
964- logging:
965- level: info
966- selectors:
967- - '*'
968- path.home: {{.HomeDir}}
969- queue.mem.flush.timeout: 0s
970- exporters:
971- elasticsearch/log:
972- endpoints:
973- - {{.ESEndpoint}}
974- api_key: {{.ESApiKey}}
975- logs_index: {{.Index}}
976- sending_queue:
977- wait_for_result: true # Avoid losing data on shutdown
978- block_on_overflow: true
979- batch:
980- flush_timeout: 1s
981- min_size: {{.MinItems}}
982- mapping:
983- mode: bodymap
984- service:
985- pipelines:
986- logs:
987- receivers:
988- - filebeatreceiver
989- exporters:
990- - elasticsearch/log
991- `
992- otelConfigPath := filepath .Join (tmpDir , "otel.yml" )
993- var otelConfigBuffer bytes.Buffer
994- require .NoError (t ,
995- template .Must (template .New ("otelConfig" ).Parse (otelConfigTemplate )).Execute (& otelConfigBuffer ,
996- otelConfigOptions {
997- InputPath : inputFilePath ,
998- HomeDir : tmpDir ,
999- ESEndpoint : esEndpoint ,
1000- ESApiKey : esApiKey .Encoded ,
1001- Index : index ,
1002- MinItems : numEvents ,
1003- }))
1004- require .NoError (t , os .WriteFile (otelConfigPath , otelConfigBuffer .Bytes (), 0o600 ))
1005- t .Cleanup (func () {
1006- if t .Failed () {
1007- contents , err := os .ReadFile (otelConfigPath )
1008- if err != nil {
1009- t .Logf ("No otel configuration file at %s" , otelConfigPath )
1010- return
1011- }
1012- t .Logf ("Contents of otel config file:\n %s\n " , string (contents ))
1013- }
1014- })
1015- // Now we can actually create the fixture and run it
1016- fixture , err := define .NewFixtureFromLocalBuild (t , define .Version (), aTesting .WithAdditionalArgs ([]string {"--config" , otelConfigPath }))
1017- require .NoError (t , err )
1018-
1019- ctx , cancel := testcontext .WithDeadline (t , context .Background (), time .Now ().Add (5 * time .Minute ))
1020- defer cancel ()
1021- err = fixture .Prepare (ctx , fakeComponent )
1022- require .NoError (t , err )
1023-
1024- var fixtureWg sync.WaitGroup
1025- fixtureWg .Add (1 )
1026- go func () {
1027- defer fixtureWg .Done ()
1028- err = fixture .RunOtelWithClient (ctx )
1029- }()
1030-
1031- // Make sure find the logs
1032- actualHits := & struct { Hits int }{}
1033- require .Eventually (t ,
1034- func () bool {
1035- findCtx , findCancel := context .WithTimeout (context .Background (), 10 * time .Second )
1036- defer findCancel ()
1037-
1038- docs , err := estools .GetLogsForIndexWithContext (findCtx , info .ESClient , ".ds-" + index + "*" , map [string ]interface {}{
1039- "log.file.path" : inputFilePath ,
1040- })
1041- require .NoError (t , err )
1042-
1043- actualHits .Hits = docs .Hits .Total .Value
1044- return actualHits .Hits == numEvents
1045- },
1046- 2 * time .Minute , 1 * time .Second ,
1047- "Expected %d logs, got %v" , numEvents , actualHits )
1048-
1049- cancel ()
1050- fixtureWg .Wait ()
1051- require .True (t , err == nil || errors .Is (err , context .Canceled ) || errors .Is (err , context .DeadlineExceeded ), "Retrieved unexpected error: %s" , err .Error ())
1052- }
1053-
1054797func TestOtelFilestreamInput (t * testing.T ) {
1055798 info := define .Require (t , define.Requirements {
1056799 Group : integration .Default ,
@@ -1346,135 +1089,6 @@ agent.monitoring:
13461089 cmd .Wait ()
13471090}
13481091
1349- func TestOtelMBReceiverE2E (t * testing.T ) {
1350- info := define .Require (t , define.Requirements {
1351- Group : integration .Default ,
1352- Local : true ,
1353- OS : []define.OS {
1354- // {Type: define.Windows}, we don't support otel on Windows yet
1355- {Type : define .Linux },
1356- {Type : define .Darwin },
1357- },
1358- Stack : & define.Stack {},
1359- })
1360- tmpDir := t .TempDir ()
1361-
1362- // Create the otel configuration file
1363- type otelConfigOptions struct {
1364- HomeDir string
1365- ESEndpoint string
1366- ESApiKey string
1367- Index string
1368- MinItems int
1369- }
1370- esEndpoint , err := integration .GetESHost ()
1371- require .NoError (t , err , "error getting elasticsearch endpoint" )
1372- esApiKey , err := createESApiKey (info .ESClient )
1373- require .NoError (t , err , "error creating API key" )
1374- require .True (t , len (esApiKey .Encoded ) > 1 , "api key is invalid %q" , esApiKey )
1375- index := "logs-integration-default"
1376- otelConfigTemplate := `receivers:
1377- metricbeatreceiver:
1378- metricbeat:
1379- modules:
1380- - module: system
1381- enabled: true
1382- period: 1s
1383- processes:
1384- - '.*'
1385- metricsets:
1386- - cpu
1387- output:
1388- otelconsumer:
1389- logging:
1390- level: info
1391- selectors:
1392- - '*'
1393- path.home: {{.HomeDir}}
1394- queue.mem.flush.timeout: 0s
1395- exporters:
1396- elasticsearch/log:
1397- endpoints:
1398- - {{.ESEndpoint}}
1399- api_key: {{.ESApiKey}}
1400- logs_index: {{.Index}}
1401- sending_queue:
1402- wait_for_result: true # Avoid losing data on shutdown
1403- block_on_overflow: true
1404- batch:
1405- flush_timeout: 1s
1406- min_size: {{.MinItems}}
1407- mapping:
1408- mode: bodymap
1409- service:
1410- pipelines:
1411- logs:
1412- receivers:
1413- - metricbeatreceiver
1414- exporters:
1415- - elasticsearch/log
1416- `
1417- otelConfigPath := filepath .Join (tmpDir , "otel.yml" )
1418- var otelConfigBuffer bytes.Buffer
1419- require .NoError (t ,
1420- template .Must (template .New ("otelConfig" ).Parse (otelConfigTemplate )).Execute (& otelConfigBuffer ,
1421- otelConfigOptions {
1422- HomeDir : tmpDir ,
1423- ESEndpoint : esEndpoint ,
1424- ESApiKey : esApiKey .Encoded ,
1425- Index : index ,
1426- MinItems : 1 ,
1427- }))
1428- require .NoError (t , os .WriteFile (otelConfigPath , otelConfigBuffer .Bytes (), 0o600 ))
1429- t .Cleanup (func () {
1430- if t .Failed () {
1431- contents , err := os .ReadFile (otelConfigPath )
1432- if err != nil {
1433- t .Logf ("No otel configuration file at %s" , otelConfigPath )
1434- return
1435- }
1436- t .Logf ("Contents of otel config file:\n %s\n " , string (contents ))
1437- }
1438- })
1439- // Now we can actually create the fixture and run it
1440- fixture , err := define .NewFixtureFromLocalBuild (t , define .Version (), aTesting .WithAdditionalArgs ([]string {"--config" , otelConfigPath }))
1441- require .NoError (t , err )
1442-
1443- ctx , cancel := testcontext .WithDeadline (t , context .Background (), time .Now ().Add (5 * time .Minute ))
1444- defer cancel ()
1445- err = fixture .Prepare (ctx , fakeComponent )
1446- require .NoError (t , err )
1447-
1448- var fixtureWg sync.WaitGroup
1449- fixtureWg .Add (1 )
1450- go func () {
1451- defer fixtureWg .Done ()
1452- err = fixture .RunOtelWithClient (ctx )
1453- }()
1454-
1455- // Make sure find the logs
1456- actualHits := & struct { Hits int }{}
1457- require .Eventually (t ,
1458- func () bool {
1459- findCtx , findCancel := context .WithTimeout (context .Background (), 10 * time .Second )
1460- defer findCancel ()
1461-
1462- docs , err := estools .GetLogsForIndexWithContext (findCtx , info .ESClient , ".ds-" + index + "*" , map [string ]interface {}{
1463- "metricset.name" : "cpu" ,
1464- })
1465- require .NoError (t , err )
1466-
1467- actualHits .Hits = docs .Hits .Total .Value
1468- return actualHits .Hits >= 1
1469- },
1470- 2 * time .Minute , 1 * time .Second ,
1471- "Expected at least %d logs, got %v" , 1 , actualHits )
1472-
1473- cancel ()
1474- fixtureWg .Wait ()
1475- require .True (t , err == nil || errors .Is (err , context .Canceled ) || errors .Is (err , context .DeadlineExceeded ), "Retrieved unexpected error: %s" , err .Error ())
1476- }
1477-
14781092func TestHybridAgentE2E (t * testing.T ) {
14791093 // This test is a hybrid agent test that ingests a single log with
14801094 // filebeat and fbreceiver. It then compares the final documents in
0 commit comments