Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ENG-58885 add ServiceOption support to metrics #247

Merged
merged 1 commit into from
Mar 26, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 37 additions & 29 deletions instrumentation/opentelemetry/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,46 +103,54 @@
return pieces[1]
}

func makeMetricsExporterFactory(cfg *config.AgentConfig) func() (metric.Exporter, error) {
func makeMetricsExporterFactory(cfg *config.AgentConfig) func(opts ...ServiceOption) (metric.Exporter, error) {
// We are only supporting logging and otlp metric exporters for now. We will add support for prometheus
// metrics later
switch cfg.Reporting.MetricReporterType {
case config.MetricReporterType_METRIC_REPORTER_TYPE_LOGGING:
// stdout exporter
return func() (metric.Exporter, error) {
// currently only ServiceOption is WithHeaders so noop-ing ServiceOption for stdout for now
return func(_ ...ServiceOption) (metric.Exporter, error) {

Check warning on line 113 in instrumentation/opentelemetry/init.go

View check run for this annotation

Codecov / codecov/patch

instrumentation/opentelemetry/init.go#L112-L113

Added lines #L112 - L113 were not covered by tests
// TODO: Define if endpoint could be a filepath to write into a file.
return stdoutmetric.New()
}
default:
endpoint := cfg.GetReporting().GetMetricEndpoint().GetValue()
if len(endpoint) == 0 {
endpoint = cfg.GetReporting().GetEndpoint().GetValue()
}
return func(opts ...ServiceOption) (metric.Exporter, error) {
endpoint := cfg.GetReporting().GetMetricEndpoint().GetValue()
if len(endpoint) == 0 {
endpoint = cfg.GetReporting().GetEndpoint().GetValue()
}

opts := []otlpmetricgrpc.Option{
otlpmetricgrpc.WithEndpoint(removeProtocolPrefixForOTLP(endpoint)),
}
serviceOpts := &ServiceOptions{
headers: make(map[string]string),
}
for _, opt := range opts {
opt(serviceOpts)
}

Check warning on line 129 in instrumentation/opentelemetry/init.go

View check run for this annotation

Codecov / codecov/patch

instrumentation/opentelemetry/init.go#L128-L129

Added lines #L128 - L129 were not covered by tests

if !cfg.GetReporting().GetSecure().GetValue() {
opts = append(opts, otlpmetricgrpc.WithInsecure())
}
metricOpts := []otlpmetricgrpc.Option{
otlpmetricgrpc.WithEndpoint(removeProtocolPrefixForOTLP(endpoint)),
otlpmetricgrpc.WithHeaders(serviceOpts.headers),
}

certFile := cfg.GetReporting().GetCertFile().GetValue()
if len(certFile) > 0 {
if tlsCredentials, err := credentials.NewClientTLSFromFile(certFile, ""); err == nil {
opts = append(opts, otlpmetricgrpc.WithTLSCredentials(tlsCredentials))
} else {
log.Printf("error while creating tls credentials from cert path %s: %v", certFile, err)
if !cfg.GetReporting().GetSecure().GetValue() {
metricOpts = append(metricOpts, otlpmetricgrpc.WithInsecure())
}
}

if cfg.Reporting.GetEnableGrpcLoadbalancing().GetValue() {
resolver.SetDefaultScheme("dns")
opts = append(opts, otlpmetricgrpc.WithServiceConfig(`{"loadBalancingConfig": [ { "round_robin": {} } ]}`))
}
certFile := cfg.GetReporting().GetCertFile().GetValue()
if len(certFile) > 0 {
if tlsCredentials, err := credentials.NewClientTLSFromFile(certFile, ""); err == nil {
metricOpts = append(metricOpts, otlpmetricgrpc.WithTLSCredentials(tlsCredentials))
} else {
log.Printf("error while creating tls credentials from cert path %s: %v", certFile, err)
}

Check warning on line 146 in instrumentation/opentelemetry/init.go

View check run for this annotation

Codecov / codecov/patch

instrumentation/opentelemetry/init.go#L145-L146

Added lines #L145 - L146 were not covered by tests
}

return func() (metric.Exporter, error) {
return otlpmetricgrpc.New(context.Background(), opts...)
if cfg.Reporting.GetEnableGrpcLoadbalancing().GetValue() {
resolver.SetDefaultScheme("dns")
metricOpts = append(metricOpts, otlpmetricgrpc.WithServiceConfig(`{"loadBalancingConfig": [ { "round_robin": {} } ]}`))
}
return otlpmetricgrpc.New(context.Background(), metricOpts...)
}
}
}
Expand Down Expand Up @@ -305,7 +313,7 @@
// and returns a shutdown function to flush data immediately on a termination signal.
// Also sets opentelemetry internal errorhandler to the provider zap errorhandler
func InitWithSpanProcessorWrapperAndZap(cfg *config.AgentConfig, wrapper SpanProcessorWrapper,
versionInfoAttrs []attribute.KeyValue, logger *zap.Logger) func() {
versionInfoAttrs []attribute.KeyValue, logger *zap.Logger, opts ...ServiceOption) func() {
mu.Lock()
defer mu.Unlock()
if initialized {
Expand Down Expand Up @@ -340,7 +348,7 @@
}

// Initialize metrics
metricsShutdownFn := initializeMetrics(cfg, versionInfoAttrs)
metricsShutdownFn := initializeMetrics(cfg, versionInfoAttrs, opts...)

exporterFactory = makeExporterFactory(cfg)
configFactory = makeConfigFactory(cfg)
Expand Down Expand Up @@ -485,13 +493,13 @@
}), tp, nil
}

func initializeMetrics(cfg *config.AgentConfig, versionInfoAttrs []attribute.KeyValue) func() {
func initializeMetrics(cfg *config.AgentConfig, versionInfoAttrs []attribute.KeyValue, opts ...ServiceOption) func() {
if shouldDisableMetrics(cfg) {
return func() {}
}

metricsExporterFactory := makeMetricsExporterFactory(cfg)
metricsExporter, err := metricsExporterFactory()
metricsExporter, err := metricsExporterFactory(opts...)
if err != nil {
log.Fatal(err)
}
Expand Down
Loading