Skip to content

Commit

Permalink
Merge branch 'main' into jaeger-bundle-ci
Browse files Browse the repository at this point in the history
  • Loading branch information
yurishkuro authored Jan 15, 2025
2 parents 98b8ca8 + 3ba8e54 commit 864bad6
Show file tree
Hide file tree
Showing 49 changed files with 307 additions and 283 deletions.
4 changes: 2 additions & 2 deletions cmd/agent/app/httpserver/srv.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@ import (
"go.uber.org/zap/zapcore"

"github.com/jaegertracing/jaeger/cmd/agent/app/configmanager"
"github.com/jaegertracing/jaeger/pkg/clientcfg/clientcfghttp"
samplinghttp "github.com/jaegertracing/jaeger/internal/sampling/http"
"github.com/jaegertracing/jaeger/pkg/metrics"
)

// NewHTTPServer creates a new server that hosts an HTTP/JSON endpoint for clients
// to query for sampling strategies.
func NewHTTPServer(hostPort string, manager configmanager.ClientConfigManager, mFactory metrics.Factory, logger *zap.Logger) *http.Server {
handler := clientcfghttp.NewHTTPHandler(clientcfghttp.HTTPHandlerParams{
handler := samplinghttp.NewHandler(samplinghttp.HandlerParams{
ConfigManager: manager,
MetricsFactory: mFactory,
LegacySamplingEndpoint: true,
Expand Down
2 changes: 1 addition & 1 deletion cmd/collector/app/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ import (
"github.com/jaegertracing/jaeger/cmd/collector/app/flags"
"github.com/jaegertracing/jaeger/cmd/collector/app/handler"
"github.com/jaegertracing/jaeger/cmd/collector/app/processor"
"github.com/jaegertracing/jaeger/cmd/collector/app/sampling/samplingstrategy"
"github.com/jaegertracing/jaeger/cmd/collector/app/server"
"github.com/jaegertracing/jaeger/internal/safeexpvar"
"github.com/jaegertracing/jaeger/internal/sampling/samplingstrategy"
"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/pkg/healthcheck"
"github.com/jaegertracing/jaeger/pkg/metrics"
Expand Down
11 changes: 9 additions & 2 deletions cmd/collector/app/handler/grpc_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type mockSpanProcessor struct {
expectedError error
mux sync.Mutex
spans []*model.Span
traces []ptrace.Traces
tenants map[string]bool
transport processor.InboundTransport
spanFormat processor.SpanFormat
Expand All @@ -41,8 +42,8 @@ func (p *mockSpanProcessor) ProcessSpans(_ context.Context, batch processor.Batc
defer p.mux.Unlock()
batch.GetSpans(func(spans []*model.Span) {
p.spans = append(p.spans, spans...)
}, func(_ ptrace.Traces) {
panic("not implemented")
}, func(td ptrace.Traces) {
p.traces = append(p.traces, td)
})
oks := make([]bool, len(p.spans))
if p.tenants == nil {
Expand All @@ -60,6 +61,12 @@ func (p *mockSpanProcessor) getSpans() []*model.Span {
return p.spans
}

func (p *mockSpanProcessor) getTraces() []ptrace.Traces {
p.mux.Lock()
defer p.mux.Unlock()
return p.traces
}

func (p *mockSpanProcessor) getTenants() map[string]bool {
p.mux.Lock()
defer p.mux.Unlock()
Expand Down
48 changes: 24 additions & 24 deletions cmd/collector/app/handler/otlp_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/jaegertracing/jaeger/cmd/collector/app/flags"
"github.com/jaegertracing/jaeger/cmd/collector/app/processor"
"github.com/jaegertracing/jaeger/pkg/tenancy"
"github.com/jaegertracing/jaeger/storage_v2/v1adapter"
)

var _ component.Host = (*otelHost)(nil) // API check
Expand Down Expand Up @@ -72,9 +71,14 @@ func startOTLPReceiver(
},
}

otlpConsumer := newConsumerDelegate(logger, spanProcessor, tm)
// the following two constructors never return errors given non-nil arguments, so we ignore errors
nextConsumer, err := newTraces(otlpConsumer.consume)
consumerHelper := &consumerHelper{
batchConsumer: newBatchConsumer(logger,
spanProcessor,
processor.UnknownTransport, // could be gRPC or HTTP
processor.OTLPSpanFormat,
tm),
}
nextConsumer, err := newTraces(consumerHelper.consume)
if err != nil {
return nil, fmt.Errorf("could not create the OTLP consumer: %w", err)
}
Expand All @@ -93,29 +97,25 @@ func startOTLPReceiver(
return otlpReceiver, nil
}

func newConsumerDelegate(logger *zap.Logger, spanProcessor processor.SpanProcessor, tm *tenancy.Manager) *consumerDelegate {
return &consumerDelegate{
batchConsumer: newBatchConsumer(logger,
spanProcessor,
processor.UnknownTransport, // could be gRPC or HTTP
processor.OTLPSpanFormat,
tm),
}
}

type consumerDelegate struct {
batchConsumer batchConsumer
type consumerHelper struct {
batchConsumer
}

func (c *consumerDelegate) consume(ctx context.Context, td ptrace.Traces) error {
batches := v1adapter.ProtoFromTraces(td)
for _, batch := range batches {
err := c.batchConsumer.consume(ctx, batch)
if err != nil {
return err
}
func (ch *consumerHelper) consume(ctx context.Context, td ptrace.Traces) error {
tenant, err := ch.validateTenant(ctx)
if err != nil {
ch.logger.Debug("rejecting spans (tenancy)", zap.Error(err))
return err
}
return nil
_, err = ch.spanProcessor.ProcessSpans(ctx, processor.SpansV2{
Traces: td,
Details: processor.Details{
InboundTransport: ch.spanOptions.InboundTransport,
SpanFormat: ch.spanOptions.SpanFormat,
Tenant: tenant,
},
})
return err
}

var _ componentstatus.Reporter = (*otelHost)(nil)
Expand Down
75 changes: 46 additions & 29 deletions cmd/collector/app/handler/otlp_receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"errors"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand All @@ -19,8 +20,10 @@ import (
"go.opentelemetry.io/collector/pipeline"
"go.opentelemetry.io/collector/receiver"
"go.opentelemetry.io/collector/receiver/otlpreceiver"
"go.uber.org/zap/zaptest"

"github.com/jaegertracing/jaeger/cmd/collector/app/flags"
"github.com/jaegertracing/jaeger/cmd/collector/app/processor"
"github.com/jaegertracing/jaeger/pkg/tenancy"
"github.com/jaegertracing/jaeger/pkg/testutils"
)
Expand Down Expand Up @@ -49,7 +52,7 @@ func optionsWithPorts(port string) *flags.CollectorOptions {

func TestStartOtlpReceiver(t *testing.T) {
spanProcessor := &mockSpanProcessor{}
logger, _ := testutils.NewLogger()
logger := zaptest.NewLogger(t)
tm := &tenancy.Manager{}
rec, err := StartOTLPReceiver(optionsWithPorts(":0"), logger, spanProcessor, tm)
require.NoError(t, err)
Expand All @@ -70,36 +73,9 @@ func makeTracesOneSpan() ptrace.Traces {
return traces
}

func TestConsumerDelegate(t *testing.T) {
testCases := []struct {
expectErr error
expectLog string
}{
{}, // no errors
{expectErr: errors.New("test-error"), expectLog: "test-error"},
}
for _, test := range testCases {
t.Run(test.expectLog, func(t *testing.T) {
logger, logBuf := testutils.NewLogger()
spanProcessor := &mockSpanProcessor{expectedError: test.expectErr}
consumer := newConsumerDelegate(logger, spanProcessor, &tenancy.Manager{})

err := consumer.consume(context.Background(), makeTracesOneSpan())

if test.expectErr != nil {
require.Equal(t, test.expectErr, err)
assert.Contains(t, logBuf.String(), test.expectLog)
} else {
require.NoError(t, err)
assert.Len(t, spanProcessor.getSpans(), 1)
}
})
}
}

func TestStartOtlpReceiver_Error(t *testing.T) {
spanProcessor := &mockSpanProcessor{}
logger, _ := testutils.NewLogger()
logger := zaptest.NewLogger(t)
opts := optionsWithPorts(":-1")
tm := &tenancy.Manager{}
_, err := StartOTLPReceiver(opts, logger, spanProcessor, tm)
Expand Down Expand Up @@ -139,3 +115,44 @@ func TestOtelHost(t *testing.T) {
assert.Nil(t, host.GetExtensions())
assert.Nil(t, host.GetExporters())
}

func TestConsumerHelper(t *testing.T) {
spanProcessor := &mockSpanProcessor{}
consumerHelper := &consumerHelper{
batchConsumer: newBatchConsumer(zaptest.NewLogger(t),
spanProcessor,
processor.UnknownTransport, // could be gRPC or HTTP
processor.OTLPSpanFormat,
&tenancy.Manager{}),
}
err := consumerHelper.consume(context.Background(), makeTracesOneSpan())
require.NoError(t, err)
assert.Eventually(t, func() bool {
return len(spanProcessor.getTraces()) == 1
}, time.Second, time.Millisecond, "spanProcessor should have received one span")
assert.Empty(t, spanProcessor.getSpans())
}

func TestConsumerHelper_Consume_Error(t *testing.T) {
consumerHelper := &consumerHelper{
batchConsumer: newBatchConsumer(zaptest.NewLogger(t),
&mockSpanProcessor{expectedError: assert.AnError},
processor.UnknownTransport, // could be gRPC or HTTP
processor.OTLPSpanFormat,
&tenancy.Manager{}),
}
err := consumerHelper.consume(context.Background(), makeTracesOneSpan())
require.ErrorIs(t, err, assert.AnError)
}

func TestConsumerHelper_Consume_TenantError(t *testing.T) {
consumerHelper := &consumerHelper{
batchConsumer: newBatchConsumer(zaptest.NewLogger(t),
&mockSpanProcessor{},
processor.UnknownTransport, // could be gRPC or HTTP
processor.OTLPSpanFormat,
&tenancy.Manager{Enabled: true}),
}
err := consumerHelper.consume(context.Background(), makeTracesOneSpan())
require.ErrorContains(t, err, "missing tenant header")
}
12 changes: 8 additions & 4 deletions cmd/collector/app/handler/zipkin_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,15 @@ func startZipkinReceiver(
},
}

consumerAdapter := newConsumerDelegate(logger, spanProcessor, tm)
// reset Zipkin spanFormat
consumerAdapter.batchConsumer.spanOptions.SpanFormat = processor.ZipkinSpanFormat
consumerHelper := &consumerHelper{
batchConsumer: newBatchConsumer(logger,
spanProcessor,
processor.HTTPTransport,
processor.ZipkinSpanFormat,
tm),
}

nextConsumer, err := newTraces(consumerAdapter.consume)
nextConsumer, err := newTraces(consumerHelper.consume)
if err != nil {
return nil, fmt.Errorf("could not create Zipkin consumer: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/collector/app/server/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ import (
"google.golang.org/grpc/reflection"

"github.com/jaegertracing/jaeger/cmd/collector/app/handler"
"github.com/jaegertracing/jaeger/cmd/collector/app/sampling/samplingstrategy"
samplinggrpc "github.com/jaegertracing/jaeger/internal/sampling/grpc"
"github.com/jaegertracing/jaeger/internal/sampling/samplingstrategy"
"github.com/jaegertracing/jaeger/pkg/telemetry"
"github.com/jaegertracing/jaeger/proto-gen/api_v2"
)
Expand Down
8 changes: 4 additions & 4 deletions cmd/collector/app/server/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ import (
"go.uber.org/zap/zapcore"

"github.com/jaegertracing/jaeger/cmd/collector/app/handler"
"github.com/jaegertracing/jaeger/cmd/collector/app/sampling/samplingstrategy"
clientcfgHandler "github.com/jaegertracing/jaeger/pkg/clientcfg/clientcfghttp"
samplinghttp "github.com/jaegertracing/jaeger/internal/sampling/http"
"github.com/jaegertracing/jaeger/internal/sampling/samplingstrategy"
"github.com/jaegertracing/jaeger/pkg/healthcheck"
"github.com/jaegertracing/jaeger/pkg/httpmetrics"
"github.com/jaegertracing/jaeger/pkg/metrics"
Expand Down Expand Up @@ -59,8 +59,8 @@ func serveHTTP(server *http.Server, listener net.Listener, params *HTTPServerPar
apiHandler := handler.NewAPIHandler(params.Handler)
apiHandler.RegisterRoutes(r)

cfgHandler := clientcfgHandler.NewHTTPHandler(clientcfgHandler.HTTPHandlerParams{
ConfigManager: &clientcfgHandler.ConfigManager{
cfgHandler := samplinghttp.NewHandler(samplinghttp.HandlerParams{
ConfigManager: &samplinghttp.ConfigManager{
SamplingProvider: params.SamplingProvider,
},
MetricsFactory: params.MetricsFactory,
Expand Down
10 changes: 5 additions & 5 deletions cmd/jaeger/internal/extension/remotesampling/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@ import (
"google.golang.org/grpc/health"
"google.golang.org/grpc/health/grpc_health_v1"

"github.com/jaegertracing/jaeger/cmd/collector/app/sampling/samplingstrategy"
"github.com/jaegertracing/jaeger/cmd/jaeger/internal/extension/jaegerstorage"
"github.com/jaegertracing/jaeger/internal/leaderelection"
"github.com/jaegertracing/jaeger/internal/metrics/otelmetrics"
samplinggrpc "github.com/jaegertracing/jaeger/internal/sampling/grpc"
"github.com/jaegertracing/jaeger/pkg/clientcfg/clientcfghttp"
samplinghttp "github.com/jaegertracing/jaeger/internal/sampling/http"
"github.com/jaegertracing/jaeger/internal/sampling/samplingstrategy"
"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/plugin/sampling/leaderelection"
"github.com/jaegertracing/jaeger/plugin/sampling/strategyprovider/adaptive"
"github.com/jaegertracing/jaeger/plugin/sampling/strategyprovider/static"
"github.com/jaegertracing/jaeger/proto-gen/api_v2"
Expand Down Expand Up @@ -229,8 +229,8 @@ func (ext *rsExtension) startAdaptiveStrategyProvider(host component.Host) error
func (ext *rsExtension) startHTTPServer(ctx context.Context, host component.Host) error {
mf := otelmetrics.NewFactory(ext.telemetry.MeterProvider)
mf = mf.Namespace(metrics.NSOptions{Name: "jaeger_remote_sampling"})
handler := clientcfghttp.NewHTTPHandler(clientcfghttp.HTTPHandlerParams{
ConfigManager: &clientcfghttp.ConfigManager{
handler := samplinghttp.NewHandler(samplinghttp.HandlerParams{
ConfigManager: &samplinghttp.ConfigManager{
SamplingProvider: ext.strategyProvider,
},
MetricsFactory: mf,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/ptrace"

"github.com/jaegertracing/jaeger/cmd/collector/app/sampling/samplingstrategy"
"github.com/jaegertracing/jaeger/cmd/jaeger/internal/extension/remotesampling"
"github.com/jaegertracing/jaeger/internal/metrics/otelmetrics"
"github.com/jaegertracing/jaeger/internal/sampling/samplingstrategy"
"github.com/jaegertracing/jaeger/plugin/sampling/strategyprovider/adaptive"
"github.com/jaegertracing/jaeger/storage_v2/v1adapter"
)
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ require (
golang.org/x/net v0.34.0
golang.org/x/sys v0.29.0
google.golang.org/grpc v1.69.4
google.golang.org/protobuf v1.36.2
google.golang.org/protobuf v1.36.3
gopkg.in/yaml.v3 v3.0.1
)

Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -996,8 +996,8 @@ google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2
google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c=
google.golang.org/protobuf v1.36.2 h1:R8FeyR1/eLmkutZOM5CWghmo5itiG9z0ktFlTVLuTmU=
google.golang.org/protobuf v1.36.2/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
google.golang.org/protobuf v1.36.3 h1:82DV7MYdb8anAVi3qge1wSnMDrnKK7ebr+I0hHRN1BU=
google.golang.org/protobuf v1.36.3/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand Down
File renamed without changes.
2 changes: 1 addition & 1 deletion internal/sampling/grpc/grpc_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ package grpc
import (
"context"

"github.com/jaegertracing/jaeger/cmd/collector/app/sampling/samplingstrategy"
"github.com/jaegertracing/jaeger/internal/sampling/samplingstrategy"
"github.com/jaegertracing/jaeger/proto-gen/api_v2"
)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
// Copyright (c) 2020 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0

package clientcfghttp
package http

import (
"context"

"github.com/jaegertracing/jaeger/cmd/collector/app/sampling/samplingstrategy"
"github.com/jaegertracing/jaeger/internal/sampling/samplingstrategy"
"github.com/jaegertracing/jaeger/proto-gen/api_v2"
)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (c) 2020 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0

package clientcfghttp
package http

import (
"context"
Expand Down
Loading

0 comments on commit 864bad6

Please sign in to comment.