Skip to content

Commit cac88a4

Browse files
VihasMakwanacmacknzpkoutsovasilis
authored
[otel] - Add Diagnostics Extension (#10141)
* initial commit * timeout * fix tests * comments * error handling * test case * readme * gofmt * remove println * test * add cpu profile * remove telemetry for now. * remove redundant code * doc * fix npipe * Update internal/pkg/otel/extension/elasticdiagnostics/extension.go Co-authored-by: Craig MacKenzie <[email protected]> * readme * readme * Update internal/pkg/otel/extension/elasticdiagnostics/extension.go Co-authored-by: Panos Koutsovasilis <[email protected]> * mutex * comments * Update generated_component_test.go * Update generated_component_test.go * comments * test thorough * use eventuallyWith * test CI green * remove platforms --------- Co-authored-by: Craig MacKenzie <[email protected]> Co-authored-by: Panos Koutsovasilis <[email protected]>
1 parent 1a0b8e3 commit cac88a4

File tree

15 files changed

+2243
-1610
lines changed

15 files changed

+2243
-1610
lines changed

NOTICE-fips.txt

Lines changed: 884 additions & 884 deletions
Large diffs are not rendered by default.

NOTICE.txt

Lines changed: 719 additions & 719 deletions
Large diffs are not rendered by default.

go.mod

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -109,11 +109,13 @@ require (
109109
go.elastic.co/ecszap v1.0.3
110110
go.elastic.co/go-licence-detector v0.7.0
111111
go.opentelemetry.io/collector/component/componentstatus v0.135.0
112+
go.opentelemetry.io/collector/component/componenttest v0.135.0
112113
go.opentelemetry.io/collector/connector/forwardconnector v0.135.0
113114
go.opentelemetry.io/collector/exporter/debugexporter v0.135.0
114115
go.opentelemetry.io/collector/exporter/nopexporter v0.135.0
115116
go.opentelemetry.io/collector/exporter/otlpexporter v0.135.0
116117
go.opentelemetry.io/collector/exporter/otlphttpexporter v0.135.0
118+
go.opentelemetry.io/collector/extension/extensiontest v0.135.0
117119
go.opentelemetry.io/collector/extension/memorylimiterextension v0.135.0
118120
go.opentelemetry.io/collector/pipeline v1.41.0
119121
go.opentelemetry.io/collector/processor/batchprocessor v0.135.0
@@ -122,6 +124,7 @@ require (
122124
go.opentelemetry.io/collector/receiver/otlpreceiver v0.135.0
123125
go.opentelemetry.io/ebpf-profiler v0.0.202536
124126
go.uber.org/zap v1.27.0
127+
go.yaml.in/yaml/v3 v3.0.4
125128
golang.org/x/crypto v0.41.0
126129
golang.org/x/exp v0.0.0-20250819193227-8b4c13bb791b
127130
golang.org/x/mod v0.27.0
@@ -669,7 +672,6 @@ require (
669672
go.opentelemetry.io/auto/sdk v1.2.0 // indirect
670673
go.opentelemetry.io/collector v0.135.0 // indirect
671674
go.opentelemetry.io/collector/client v1.41.0 // indirect
672-
go.opentelemetry.io/collector/component/componenttest v0.135.0 // indirect
673675
go.opentelemetry.io/collector/config/configauth v0.135.0 // indirect
674676
go.opentelemetry.io/collector/config/configcompression v1.41.0 // indirect
675677
go.opentelemetry.io/collector/config/configgrpc v0.135.0 // indirect
@@ -696,7 +698,6 @@ require (
696698
go.opentelemetry.io/collector/extension/extensionauth v1.41.0 // indirect
697699
go.opentelemetry.io/collector/extension/extensioncapabilities v0.135.0 // indirect
698700
go.opentelemetry.io/collector/extension/extensionmiddleware v0.135.0 // indirect
699-
go.opentelemetry.io/collector/extension/extensiontest v0.135.0 // indirect
700701
go.opentelemetry.io/collector/extension/xextension v0.135.0 // indirect
701702
go.opentelemetry.io/collector/filter v0.135.0 // indirect
702703
go.opentelemetry.io/collector/internal/fanoutconsumer v0.135.0 // indirect
@@ -753,7 +754,6 @@ require (
753754
go.uber.org/ratelimit v0.3.1 // indirect
754755
go.uber.org/zap/exp v0.3.0 // indirect
755756
go.yaml.in/yaml/v2 v2.4.2 // indirect
756-
go.yaml.in/yaml/v3 v3.0.4 // indirect
757757
golang.org/x/arch v0.20.0 // indirect
758758
golang.org/x/oauth2 v0.30.0 // indirect
759759
golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da // indirect

internal/pkg/otel/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ This section provides a summary of components included in the Elastic Distributi
9999
| [apmconfigextension](https://github.com/elastic/opentelemetry-collector-components/blob/extension/apmconfigextension/v0.7.0/extension/apmconfigextension/README.md) | v0.7.0 |
100100
| [bearertokenauthextension](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/extension/bearertokenauthextension/v0.135.0/extension/bearertokenauthextension/README.md) | v0.135.0 |
101101
| [beatsauthextension](https://github.com/elastic/opentelemetry-collector-components/blob/extension/beatsauthextension/v0.3.0/extension/beatsauthextension/README.md) | v0.3.0 |
102+
| [extensiontest](https://github.com/open-telemetry/opentelemetry-collector/blob/extension/extensiontest/v0.135.0/extension/extensiontest/README.md) | v0.135.0 |
102103
| [filestorage](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/extension/storage/filestorage/v0.135.0/extension/storage/filestorage/README.md) | v0.135.0 |
103104
| [headerssetterextension](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/extension/headerssetterextension/v0.135.0/extension/headerssetterextension/README.md) | v0.135.0 |
104105
| [healthcheckextension](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/extension/healthcheckextension/v0.135.0/extension/healthcheckextension/README.md) | v0.135.0 |
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
# elasticdiagnosticsextension
2+
3+
`elasticdiagnosticsextension` is an internal package for peforming diagnostics and is used in conjunction with EDOT.
4+
The extension is designed to return diagnostics in a format compatible with the [ActionDiagnosticUnitResult](https://github.com/elastic/elastic-agent-client/blob/888026ef85e1c9190fe76eb158cf21d9c9c02920/elastic-agent-client.proto#L424-L437) type defined in the control protocol.
5+
6+
7+
## Configuration
8+
9+
The extension accepts the `endpoint` as a sole parameter. The endpoint should begin with a valid protocol and it valid values are `unix` and `npipe` for now. Here are a few examples:
10+
11+
- `unix:///tmp/elastic-agent/xyz.soc`
12+
- `npipe:///elastic-agent`
13+
14+
## Features
15+
16+
- Acts as a registrar and keeps track of common diagnostic hooks.
17+
- Collects profiles using `runtime/pprof`.
18+
- Collects internal telemetry exposed by the OTeL Collector.
19+
- Implements the `extensioncapabilities.ConfigWatcher` interface and stores the latest configuration of the running collector.
20+
- Listens for diagnostic requests and provides diagnostic data.
21+
22+
## Design
23+
24+
### Diagnostic hooks:
25+
- Individual beats register custom diagnostic hooks and these hooks are called when we run the elastic-agent diagnostics command.
26+
- Our extension stores these hooks and executes them everytime it gets a "diagnostics" request.
27+
28+
### Request/Response format:
29+
- This extension runs an HTTP server and listens to new requests on `/diagnostics` path.
30+
- The following query parameters are optional:
31+
- `cpu`
32+
- If `true`, the extension will also collect cpu profile of EDOT.
33+
- By default, the extension doesn't collect the CPU profile unless explicitly specified.
34+
- `cpuduration`:
35+
- Specifies the time duration over which the CPU profile should be collected.
36+
- Valid time units are `ns`, `us`, `ms`, `s`, `m`, `h`
37+
- Default: `30s`.
38+
- The response format is defined in [response.go](./response.go).
39+
- `GlobalDiagnostics`: Data related to the overall process:
40+
1. Profiles.
41+
2. Internal telemetry.
42+
3. latest collector configuration.
43+
- `ComponentDiagnostics`: Data from individual receivers, collected via registered diagnostic hooks.
44+
45+
### Interaction with Elastic-Agent service in hybrid mode.
46+
47+
- When the user triggers the diagnostic request, EDOT diagnostics are injected at two levels:
48+
1. At top-Level:
49+
- When `DiagnosticAgent()` is called in [server.go](https://github.com/elastic/elastic-agent/blob/710c49f45433e2f136a6e41cae980c1aa37dabdd/pkg/control/v2/server/server.go#L197).
50+
- Diagnostics are captured at the global level and stored under the `edot/*` directory in the resulting ZIP archive.
51+
2. At component-level:
52+
- When `otelMgr.PerformComponentsDiagnostics()` is called in [coordinator.go](https://github.com/elastic/elastic-agent/blob/710c49f45433e2f136a6e41cae980c1aa37dabdd/internal/pkg/agent/application/coordinator/coordinator.go#L863).
53+
- Diagnostics are added per component and stored under the `components/{comp}/*` directory in the resulting ZIP archive.
54+
55+
#### Diagram
56+
57+
```mermaid
58+
graph LR
59+
A[elastic-agent install ...] --> B[Runs the service in hybrid mode<br/>and we inject agentdiagnosticsextension]
60+
B -->|listens to| D[edot-diagnostics-extension.sock]
61+
62+
C[elastic-agent diagnostics] --> E[Extracts diagnostics socket path via the binary]
63+
E -->|requests OTeL diagnostics| D
64+
```
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
2+
// or more contributor license agreements. Licensed under the Elastic License 2.0;
3+
// you may not use this file except in compliance with the Elastic License 2.0.
4+
5+
package elasticdiagnostics
6+
7+
import (
8+
"errors"
9+
10+
"go.opentelemetry.io/collector/component"
11+
)
12+
13+
type Config struct {
14+
Endpoint string `mapstructure:"endpoint"`
15+
}
16+
17+
func createDefaultConfig() component.Config {
18+
return &Config{}
19+
}
20+
21+
func (c *Config) Validate() error {
22+
if c.Endpoint == "" {
23+
return errors.New("endpoint is a required field")
24+
}
25+
return nil
26+
}
Lines changed: 245 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,245 @@
1+
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
2+
// or more contributor license agreements. Licensed under the Elastic License 2.0;
3+
// you may not use this file except in compliance with the Elastic License 2.0.
4+
5+
package elasticdiagnostics
6+
7+
import (
8+
"bytes"
9+
"context"
10+
"encoding/json"
11+
"errors"
12+
"fmt"
13+
"net"
14+
"net/http"
15+
"runtime/pprof"
16+
"sync"
17+
"time"
18+
19+
"go.opentelemetry.io/collector/component"
20+
"go.opentelemetry.io/collector/confmap"
21+
"go.uber.org/zap"
22+
"go.yaml.in/yaml/v3"
23+
"google.golang.org/protobuf/types/known/timestamppb"
24+
25+
"github.com/elastic/elastic-agent-client/v7/pkg/proto"
26+
"github.com/elastic/elastic-agent-libs/logp"
27+
"github.com/elastic/elastic-agent/internal/pkg/diagnostics"
28+
"github.com/elastic/elastic-agent/pkg/ipc"
29+
)
30+
31+
var (
32+
_ component.Component = (*diagnosticsExtension)(nil)
33+
)
34+
35+
type diagHook struct {
36+
description string
37+
filename string
38+
contentType string
39+
hook func() []byte
40+
}
41+
42+
type diagnosticsExtension struct {
43+
listener net.Listener
44+
server *http.Server
45+
logger *zap.Logger
46+
logp *logp.Logger
47+
48+
diagnosticsConfig *Config
49+
collectorConfig *confmap.Conf
50+
componentHooks map[string][]*diagHook
51+
globalHooks map[string]*diagHook
52+
53+
mx sync.Mutex
54+
hooksMtx sync.Mutex
55+
configMtx sync.Mutex
56+
}
57+
58+
func (d *diagnosticsExtension) Start(ctx context.Context, host component.Host) error {
59+
d.mx.Lock()
60+
defer d.mx.Unlock()
61+
var err error
62+
63+
d.logp, err = logp.NewZapLogger(d.logger)
64+
if err != nil {
65+
// NewZapLogger always returns nil error, so this shouldn't happen.
66+
return fmt.Errorf("failed to create logp.Logger from zap logger: %w", err)
67+
}
68+
69+
d.registerGlobalDiagnostics()
70+
71+
d.listener, err = ipc.CreateListener(d.logp, d.diagnosticsConfig.Endpoint)
72+
if err != nil {
73+
return fmt.Errorf("error creating listener: %w", err)
74+
}
75+
76+
mux := http.NewServeMux()
77+
mux.Handle("/diagnostics", d)
78+
79+
d.server = &http.Server{
80+
Handler: mux,
81+
ReadHeaderTimeout: 30 * time.Second,
82+
}
83+
go func() {
84+
if err := d.server.Serve(d.listener); err != nil && !errors.Is(err, http.ErrServerClosed) {
85+
d.logger.Error("HTTP server error", zap.Error(err))
86+
}
87+
}()
88+
d.logger.Info("Diagnostics extension started", zap.String("address", d.listener.Addr().String()))
89+
return nil
90+
}
91+
92+
func (d *diagnosticsExtension) Shutdown(ctx context.Context) error {
93+
d.mx.Lock()
94+
defer d.mx.Unlock()
95+
if d.server == nil {
96+
return nil
97+
}
98+
if err := d.server.Shutdown(ctx); err != nil {
99+
return err
100+
}
101+
ipc.CleanupListener(d.logp, d.diagnosticsConfig.Endpoint)
102+
return nil
103+
}
104+
105+
func (d *diagnosticsExtension) registerGlobalDiagnostics() {
106+
d.globalHooks["collector_config"] = &diagHook{
107+
description: "full collector configuration",
108+
filename: "edot/otel-merged-actual.yaml",
109+
contentType: "application/yaml",
110+
hook: func() []byte {
111+
d.configMtx.Lock()
112+
defer d.configMtx.Unlock()
113+
if d.collectorConfig == nil {
114+
return []byte("no active OTel Configuration")
115+
}
116+
b, err := yaml.Marshal(d.collectorConfig.ToStringMap())
117+
if err != nil {
118+
return fmt.Appendf(nil, "error: failed to convert to yaml: %v", err)
119+
}
120+
return b
121+
},
122+
}
123+
124+
// register basic profiles.
125+
for _, profile := range []string{"goroutine", "heap", "allocs", "mutex", "threadcreate", "block"} {
126+
d.globalHooks[profile] = &diagHook{
127+
description: fmt.Sprintf("%s profile of the collector", profile),
128+
filename: fmt.Sprintf("edot/%s.profile.gz", profile),
129+
contentType: "application/octet-stream",
130+
hook: func() []byte {
131+
var buf bytes.Buffer
132+
err := pprof.Lookup(profile).WriteTo(&buf, 0)
133+
if err != nil {
134+
return fmt.Appendf(nil, "error: failed to get %s profile: %v", profile, err)
135+
}
136+
return buf.Bytes()
137+
},
138+
}
139+
}
140+
}
141+
142+
func (d *diagnosticsExtension) NotifyConfig(ctx context.Context, conf *confmap.Conf) error {
143+
d.configMtx.Lock()
144+
defer d.configMtx.Unlock()
145+
d.collectorConfig = conf
146+
return nil
147+
}
148+
149+
// RegisterDiagnosticHook API exposes the ability for beat receivers to register their hooks.
150+
// NOTE: Changing the function signature will require changes to libbeat and beatreceivers. Proceed with caution.
151+
func (d *diagnosticsExtension) RegisterDiagnosticHook(componentName string, description string, filename string, contentType string, hook func() []byte) {
152+
d.hooksMtx.Lock()
153+
defer d.hooksMtx.Unlock()
154+
if _, ok := d.componentHooks[componentName]; ok {
155+
d.componentHooks[componentName] = append(d.componentHooks[componentName], &diagHook{
156+
description: description,
157+
filename: filename,
158+
contentType: contentType,
159+
hook: hook,
160+
})
161+
} else {
162+
d.componentHooks[componentName] = []*diagHook{
163+
{
164+
description: description,
165+
filename: filename,
166+
contentType: contentType,
167+
hook: hook,
168+
},
169+
}
170+
}
171+
}
172+
173+
func (d *diagnosticsExtension) ServeHTTP(w http.ResponseWriter, req *http.Request) {
174+
d.hooksMtx.Lock()
175+
defer d.hooksMtx.Unlock()
176+
componentResults := make([]*proto.ActionDiagnosticUnitResult, 0)
177+
for name, hooks := range d.componentHooks {
178+
for _, hook := range hooks {
179+
componentResults = append(componentResults, &proto.ActionDiagnosticUnitResult{
180+
Name: name,
181+
Filename: hook.filename,
182+
ContentType: hook.contentType,
183+
Description: hook.description,
184+
Content: hook.hook(),
185+
Generated: timestamppb.Now(),
186+
})
187+
}
188+
}
189+
190+
globalResults := make([]*proto.ActionDiagnosticUnitResult, 0)
191+
for name, hook := range d.globalHooks {
192+
globalResults = append(globalResults, &proto.ActionDiagnosticUnitResult{
193+
Name: name,
194+
Filename: hook.filename,
195+
ContentType: hook.contentType,
196+
Description: hook.description,
197+
Content: hook.hook(),
198+
Generated: timestamppb.Now(),
199+
})
200+
}
201+
202+
// only add a CPU profile if requested via query parameter.
203+
if req.URL.Query().Get("cpu") == "true" {
204+
diagCPUDuration := diagnostics.DiagCPUDuration
205+
206+
// check if cpuduration parameter is set, if so override the default duration
207+
// if parsing fails, log the error and use the default duration
208+
if req.URL.Query().Get("cpuduration") != "" {
209+
var err error
210+
diagCPUDuration, err = time.ParseDuration(req.URL.Query().Get("cpuduration"))
211+
if err != nil {
212+
d.logger.Error("Failed parsing cpuduration parameter, using default", zap.String("cpuduration", req.URL.Query().Get("cpuduration")), zap.Error(err))
213+
diagCPUDuration = diagnostics.DiagCPUDuration
214+
}
215+
}
216+
cpuProfile, err := diagnostics.CreateCPUProfile(req.Context(), diagCPUDuration)
217+
if err != nil {
218+
d.logger.Error("Failed creating CPU profile", zap.Error(err))
219+
}
220+
globalResults = append(globalResults, &proto.ActionDiagnosticUnitResult{
221+
Name: "cpu",
222+
Filename: "edot/cpu.profile.gz",
223+
ContentType: "application/octet-stream",
224+
Description: "CPU profile of the collector",
225+
Content: cpuProfile,
226+
})
227+
}
228+
229+
b, err := json.Marshal(Response{
230+
GlobalDiagnostics: globalResults,
231+
ComponentDiagnostics: componentResults,
232+
})
233+
w.Header().Add("content-type", "application/json")
234+
if err != nil {
235+
d.logger.Error("Failed marshaling response", zap.Error(err))
236+
w.WriteHeader(500)
237+
if _, err := fmt.Fprintf(w, "{'error':'%v'}", err); err != nil {
238+
d.logger.Error("Failed writing response to client.", zap.Error(err))
239+
}
240+
return
241+
}
242+
if _, err := w.Write(b); err != nil {
243+
d.logger.Error("Failed writing response to client.", zap.Error(err))
244+
}
245+
}

0 commit comments

Comments
 (0)