From 0b5f8b190e848615cee878ed359e9a28aee0ea86 Mon Sep 17 00:00:00 2001 From: Yuri Shkuro Date: Tue, 14 Jan 2025 22:42:04 -0400 Subject: [PATCH] Switch v1 receivers to use v2 write path (#6532) ## Which problem is this PR solving? - Resolves #6474 ## Description of the changes - Hook up OTLP and Zipkin receivers into v2 write path in the SpanProcessor. - Add unit tests to validate that `ptrace.Traces` object is being received by the SpanProcessor. ## How was this change tested? - CI - Created `good-first-issue` #6535 to add e2e tests after this --------- Signed-off-by: Yuri Shkuro --- .../app/handler/grpc_handler_test.go | 11 ++- cmd/collector/app/handler/otlp_receiver.go | 48 ++++++------ .../app/handler/otlp_receiver_test.go | 75 ++++++++++++------- cmd/collector/app/handler/zipkin_receiver.go | 12 ++- 4 files changed, 87 insertions(+), 59 deletions(-) diff --git a/cmd/collector/app/handler/grpc_handler_test.go b/cmd/collector/app/handler/grpc_handler_test.go index ad13e914730..7cb5fedfa57 100644 --- a/cmd/collector/app/handler/grpc_handler_test.go +++ b/cmd/collector/app/handler/grpc_handler_test.go @@ -31,6 +31,7 @@ type mockSpanProcessor struct { expectedError error mux sync.Mutex spans []*model.Span + traces []ptrace.Traces tenants map[string]bool transport processor.InboundTransport spanFormat processor.SpanFormat @@ -41,8 +42,8 @@ func (p *mockSpanProcessor) ProcessSpans(_ context.Context, batch processor.Batc defer p.mux.Unlock() batch.GetSpans(func(spans []*model.Span) { p.spans = append(p.spans, spans...) - }, func(_ ptrace.Traces) { - panic("not implemented") + }, func(td ptrace.Traces) { + p.traces = append(p.traces, td) }) oks := make([]bool, len(p.spans)) if p.tenants == nil { @@ -60,6 +61,12 @@ func (p *mockSpanProcessor) getSpans() []*model.Span { return p.spans } +func (p *mockSpanProcessor) getTraces() []ptrace.Traces { + p.mux.Lock() + defer p.mux.Unlock() + return p.traces +} + func (p *mockSpanProcessor) getTenants() map[string]bool { p.mux.Lock() defer p.mux.Unlock() diff --git a/cmd/collector/app/handler/otlp_receiver.go b/cmd/collector/app/handler/otlp_receiver.go index 228ad80b666..8d4f599b169 100644 --- a/cmd/collector/app/handler/otlp_receiver.go +++ b/cmd/collector/app/handler/otlp_receiver.go @@ -23,7 +23,6 @@ import ( "github.com/jaegertracing/jaeger/cmd/collector/app/flags" "github.com/jaegertracing/jaeger/cmd/collector/app/processor" "github.com/jaegertracing/jaeger/pkg/tenancy" - "github.com/jaegertracing/jaeger/storage_v2/v1adapter" ) var _ component.Host = (*otelHost)(nil) // API check @@ -72,9 +71,14 @@ func startOTLPReceiver( }, } - otlpConsumer := newConsumerDelegate(logger, spanProcessor, tm) - // the following two constructors never return errors given non-nil arguments, so we ignore errors - nextConsumer, err := newTraces(otlpConsumer.consume) + consumerHelper := &consumerHelper{ + batchConsumer: newBatchConsumer(logger, + spanProcessor, + processor.UnknownTransport, // could be gRPC or HTTP + processor.OTLPSpanFormat, + tm), + } + nextConsumer, err := newTraces(consumerHelper.consume) if err != nil { return nil, fmt.Errorf("could not create the OTLP consumer: %w", err) } @@ -93,29 +97,25 @@ func startOTLPReceiver( return otlpReceiver, nil } -func newConsumerDelegate(logger *zap.Logger, spanProcessor processor.SpanProcessor, tm *tenancy.Manager) *consumerDelegate { - return &consumerDelegate{ - batchConsumer: newBatchConsumer(logger, - spanProcessor, - processor.UnknownTransport, // could be gRPC or HTTP - processor.OTLPSpanFormat, - tm), - } -} - -type consumerDelegate struct { - batchConsumer batchConsumer +type consumerHelper struct { + batchConsumer } -func (c *consumerDelegate) consume(ctx context.Context, td ptrace.Traces) error { - batches := v1adapter.ProtoFromTraces(td) - for _, batch := range batches { - err := c.batchConsumer.consume(ctx, batch) - if err != nil { - return err - } +func (ch *consumerHelper) consume(ctx context.Context, td ptrace.Traces) error { + tenant, err := ch.validateTenant(ctx) + if err != nil { + ch.logger.Debug("rejecting spans (tenancy)", zap.Error(err)) + return err } - return nil + _, err = ch.spanProcessor.ProcessSpans(ctx, processor.SpansV2{ + Traces: td, + Details: processor.Details{ + InboundTransport: ch.spanOptions.InboundTransport, + SpanFormat: ch.spanOptions.SpanFormat, + Tenant: tenant, + }, + }) + return err } var _ componentstatus.Reporter = (*otelHost)(nil) diff --git a/cmd/collector/app/handler/otlp_receiver_test.go b/cmd/collector/app/handler/otlp_receiver_test.go index c0f80de60f2..a582d287df9 100644 --- a/cmd/collector/app/handler/otlp_receiver_test.go +++ b/cmd/collector/app/handler/otlp_receiver_test.go @@ -7,6 +7,7 @@ import ( "context" "errors" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -19,8 +20,10 @@ import ( "go.opentelemetry.io/collector/pipeline" "go.opentelemetry.io/collector/receiver" "go.opentelemetry.io/collector/receiver/otlpreceiver" + "go.uber.org/zap/zaptest" "github.com/jaegertracing/jaeger/cmd/collector/app/flags" + "github.com/jaegertracing/jaeger/cmd/collector/app/processor" "github.com/jaegertracing/jaeger/pkg/tenancy" "github.com/jaegertracing/jaeger/pkg/testutils" ) @@ -49,7 +52,7 @@ func optionsWithPorts(port string) *flags.CollectorOptions { func TestStartOtlpReceiver(t *testing.T) { spanProcessor := &mockSpanProcessor{} - logger, _ := testutils.NewLogger() + logger := zaptest.NewLogger(t) tm := &tenancy.Manager{} rec, err := StartOTLPReceiver(optionsWithPorts(":0"), logger, spanProcessor, tm) require.NoError(t, err) @@ -70,36 +73,9 @@ func makeTracesOneSpan() ptrace.Traces { return traces } -func TestConsumerDelegate(t *testing.T) { - testCases := []struct { - expectErr error - expectLog string - }{ - {}, // no errors - {expectErr: errors.New("test-error"), expectLog: "test-error"}, - } - for _, test := range testCases { - t.Run(test.expectLog, func(t *testing.T) { - logger, logBuf := testutils.NewLogger() - spanProcessor := &mockSpanProcessor{expectedError: test.expectErr} - consumer := newConsumerDelegate(logger, spanProcessor, &tenancy.Manager{}) - - err := consumer.consume(context.Background(), makeTracesOneSpan()) - - if test.expectErr != nil { - require.Equal(t, test.expectErr, err) - assert.Contains(t, logBuf.String(), test.expectLog) - } else { - require.NoError(t, err) - assert.Len(t, spanProcessor.getSpans(), 1) - } - }) - } -} - func TestStartOtlpReceiver_Error(t *testing.T) { spanProcessor := &mockSpanProcessor{} - logger, _ := testutils.NewLogger() + logger := zaptest.NewLogger(t) opts := optionsWithPorts(":-1") tm := &tenancy.Manager{} _, err := StartOTLPReceiver(opts, logger, spanProcessor, tm) @@ -139,3 +115,44 @@ func TestOtelHost(t *testing.T) { assert.Nil(t, host.GetExtensions()) assert.Nil(t, host.GetExporters()) } + +func TestConsumerHelper(t *testing.T) { + spanProcessor := &mockSpanProcessor{} + consumerHelper := &consumerHelper{ + batchConsumer: newBatchConsumer(zaptest.NewLogger(t), + spanProcessor, + processor.UnknownTransport, // could be gRPC or HTTP + processor.OTLPSpanFormat, + &tenancy.Manager{}), + } + err := consumerHelper.consume(context.Background(), makeTracesOneSpan()) + require.NoError(t, err) + assert.Eventually(t, func() bool { + return len(spanProcessor.getTraces()) == 1 + }, time.Second, time.Millisecond, "spanProcessor should have received one span") + assert.Empty(t, spanProcessor.getSpans()) +} + +func TestConsumerHelper_Consume_Error(t *testing.T) { + consumerHelper := &consumerHelper{ + batchConsumer: newBatchConsumer(zaptest.NewLogger(t), + &mockSpanProcessor{expectedError: assert.AnError}, + processor.UnknownTransport, // could be gRPC or HTTP + processor.OTLPSpanFormat, + &tenancy.Manager{}), + } + err := consumerHelper.consume(context.Background(), makeTracesOneSpan()) + require.ErrorIs(t, err, assert.AnError) +} + +func TestConsumerHelper_Consume_TenantError(t *testing.T) { + consumerHelper := &consumerHelper{ + batchConsumer: newBatchConsumer(zaptest.NewLogger(t), + &mockSpanProcessor{}, + processor.UnknownTransport, // could be gRPC or HTTP + processor.OTLPSpanFormat, + &tenancy.Manager{Enabled: true}), + } + err := consumerHelper.consume(context.Background(), makeTracesOneSpan()) + require.ErrorContains(t, err, "missing tenant header") +} diff --git a/cmd/collector/app/handler/zipkin_receiver.go b/cmd/collector/app/handler/zipkin_receiver.go index 5d5455ee7af..6546f08c29a 100644 --- a/cmd/collector/app/handler/zipkin_receiver.go +++ b/cmd/collector/app/handler/zipkin_receiver.go @@ -63,11 +63,15 @@ func startZipkinReceiver( }, } - consumerAdapter := newConsumerDelegate(logger, spanProcessor, tm) - // reset Zipkin spanFormat - consumerAdapter.batchConsumer.spanOptions.SpanFormat = processor.ZipkinSpanFormat + consumerHelper := &consumerHelper{ + batchConsumer: newBatchConsumer(logger, + spanProcessor, + processor.HTTPTransport, + processor.ZipkinSpanFormat, + tm), + } - nextConsumer, err := newTraces(consumerAdapter.consume) + nextConsumer, err := newTraces(consumerHelper.consume) if err != nil { return nil, fmt.Errorf("could not create Zipkin consumer: %w", err) }