Skip to content

Commit

Permalink
Switch v1 receivers to use v2 write path (#6532)
Browse files Browse the repository at this point in the history
## Which problem is this PR solving?
- Resolves #6474

## Description of the changes
- Hook up OTLP and Zipkin receivers into v2 write path in the
SpanProcessor.
- Add unit tests to validate that `ptrace.Traces` object is being
received by the SpanProcessor.

## How was this change tested?
- CI
- Created `good-first-issue` #6535 to add e2e tests after this

---------

Signed-off-by: Yuri Shkuro <[email protected]>
  • Loading branch information
yurishkuro authored Jan 15, 2025
1 parent cec5ae1 commit 0b5f8b1
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 59 deletions.
11 changes: 9 additions & 2 deletions cmd/collector/app/handler/grpc_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type mockSpanProcessor struct {
expectedError error
mux sync.Mutex
spans []*model.Span
traces []ptrace.Traces
tenants map[string]bool
transport processor.InboundTransport
spanFormat processor.SpanFormat
Expand All @@ -41,8 +42,8 @@ func (p *mockSpanProcessor) ProcessSpans(_ context.Context, batch processor.Batc
defer p.mux.Unlock()
batch.GetSpans(func(spans []*model.Span) {
p.spans = append(p.spans, spans...)
}, func(_ ptrace.Traces) {
panic("not implemented")
}, func(td ptrace.Traces) {
p.traces = append(p.traces, td)
})
oks := make([]bool, len(p.spans))
if p.tenants == nil {
Expand All @@ -60,6 +61,12 @@ func (p *mockSpanProcessor) getSpans() []*model.Span {
return p.spans
}

func (p *mockSpanProcessor) getTraces() []ptrace.Traces {
p.mux.Lock()
defer p.mux.Unlock()
return p.traces
}

func (p *mockSpanProcessor) getTenants() map[string]bool {
p.mux.Lock()
defer p.mux.Unlock()
Expand Down
48 changes: 24 additions & 24 deletions cmd/collector/app/handler/otlp_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/jaegertracing/jaeger/cmd/collector/app/flags"
"github.com/jaegertracing/jaeger/cmd/collector/app/processor"
"github.com/jaegertracing/jaeger/pkg/tenancy"
"github.com/jaegertracing/jaeger/storage_v2/v1adapter"
)

var _ component.Host = (*otelHost)(nil) // API check
Expand Down Expand Up @@ -72,9 +71,14 @@ func startOTLPReceiver(
},
}

otlpConsumer := newConsumerDelegate(logger, spanProcessor, tm)
// the following two constructors never return errors given non-nil arguments, so we ignore errors
nextConsumer, err := newTraces(otlpConsumer.consume)
consumerHelper := &consumerHelper{
batchConsumer: newBatchConsumer(logger,
spanProcessor,
processor.UnknownTransport, // could be gRPC or HTTP
processor.OTLPSpanFormat,
tm),
}
nextConsumer, err := newTraces(consumerHelper.consume)
if err != nil {
return nil, fmt.Errorf("could not create the OTLP consumer: %w", err)
}
Expand All @@ -93,29 +97,25 @@ func startOTLPReceiver(
return otlpReceiver, nil
}

func newConsumerDelegate(logger *zap.Logger, spanProcessor processor.SpanProcessor, tm *tenancy.Manager) *consumerDelegate {
return &consumerDelegate{
batchConsumer: newBatchConsumer(logger,
spanProcessor,
processor.UnknownTransport, // could be gRPC or HTTP
processor.OTLPSpanFormat,
tm),
}
}

type consumerDelegate struct {
batchConsumer batchConsumer
type consumerHelper struct {
batchConsumer
}

func (c *consumerDelegate) consume(ctx context.Context, td ptrace.Traces) error {
batches := v1adapter.ProtoFromTraces(td)
for _, batch := range batches {
err := c.batchConsumer.consume(ctx, batch)
if err != nil {
return err
}
func (ch *consumerHelper) consume(ctx context.Context, td ptrace.Traces) error {
tenant, err := ch.validateTenant(ctx)
if err != nil {
ch.logger.Debug("rejecting spans (tenancy)", zap.Error(err))
return err
}
return nil
_, err = ch.spanProcessor.ProcessSpans(ctx, processor.SpansV2{
Traces: td,
Details: processor.Details{
InboundTransport: ch.spanOptions.InboundTransport,
SpanFormat: ch.spanOptions.SpanFormat,
Tenant: tenant,
},
})
return err
}

var _ componentstatus.Reporter = (*otelHost)(nil)
Expand Down
75 changes: 46 additions & 29 deletions cmd/collector/app/handler/otlp_receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"errors"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand All @@ -19,8 +20,10 @@ import (
"go.opentelemetry.io/collector/pipeline"
"go.opentelemetry.io/collector/receiver"
"go.opentelemetry.io/collector/receiver/otlpreceiver"
"go.uber.org/zap/zaptest"

"github.com/jaegertracing/jaeger/cmd/collector/app/flags"
"github.com/jaegertracing/jaeger/cmd/collector/app/processor"
"github.com/jaegertracing/jaeger/pkg/tenancy"
"github.com/jaegertracing/jaeger/pkg/testutils"
)
Expand Down Expand Up @@ -49,7 +52,7 @@ func optionsWithPorts(port string) *flags.CollectorOptions {

func TestStartOtlpReceiver(t *testing.T) {
spanProcessor := &mockSpanProcessor{}
logger, _ := testutils.NewLogger()
logger := zaptest.NewLogger(t)
tm := &tenancy.Manager{}
rec, err := StartOTLPReceiver(optionsWithPorts(":0"), logger, spanProcessor, tm)
require.NoError(t, err)
Expand All @@ -70,36 +73,9 @@ func makeTracesOneSpan() ptrace.Traces {
return traces
}

func TestConsumerDelegate(t *testing.T) {
testCases := []struct {
expectErr error
expectLog string
}{
{}, // no errors
{expectErr: errors.New("test-error"), expectLog: "test-error"},
}
for _, test := range testCases {
t.Run(test.expectLog, func(t *testing.T) {
logger, logBuf := testutils.NewLogger()
spanProcessor := &mockSpanProcessor{expectedError: test.expectErr}
consumer := newConsumerDelegate(logger, spanProcessor, &tenancy.Manager{})

err := consumer.consume(context.Background(), makeTracesOneSpan())

if test.expectErr != nil {
require.Equal(t, test.expectErr, err)
assert.Contains(t, logBuf.String(), test.expectLog)
} else {
require.NoError(t, err)
assert.Len(t, spanProcessor.getSpans(), 1)
}
})
}
}

func TestStartOtlpReceiver_Error(t *testing.T) {
spanProcessor := &mockSpanProcessor{}
logger, _ := testutils.NewLogger()
logger := zaptest.NewLogger(t)
opts := optionsWithPorts(":-1")
tm := &tenancy.Manager{}
_, err := StartOTLPReceiver(opts, logger, spanProcessor, tm)
Expand Down Expand Up @@ -139,3 +115,44 @@ func TestOtelHost(t *testing.T) {
assert.Nil(t, host.GetExtensions())
assert.Nil(t, host.GetExporters())
}

func TestConsumerHelper(t *testing.T) {
spanProcessor := &mockSpanProcessor{}
consumerHelper := &consumerHelper{
batchConsumer: newBatchConsumer(zaptest.NewLogger(t),
spanProcessor,
processor.UnknownTransport, // could be gRPC or HTTP
processor.OTLPSpanFormat,
&tenancy.Manager{}),
}
err := consumerHelper.consume(context.Background(), makeTracesOneSpan())
require.NoError(t, err)
assert.Eventually(t, func() bool {
return len(spanProcessor.getTraces()) == 1
}, time.Second, time.Millisecond, "spanProcessor should have received one span")
assert.Empty(t, spanProcessor.getSpans())
}

func TestConsumerHelper_Consume_Error(t *testing.T) {
consumerHelper := &consumerHelper{
batchConsumer: newBatchConsumer(zaptest.NewLogger(t),
&mockSpanProcessor{expectedError: assert.AnError},
processor.UnknownTransport, // could be gRPC or HTTP
processor.OTLPSpanFormat,
&tenancy.Manager{}),
}
err := consumerHelper.consume(context.Background(), makeTracesOneSpan())
require.ErrorIs(t, err, assert.AnError)
}

func TestConsumerHelper_Consume_TenantError(t *testing.T) {
consumerHelper := &consumerHelper{
batchConsumer: newBatchConsumer(zaptest.NewLogger(t),
&mockSpanProcessor{},
processor.UnknownTransport, // could be gRPC or HTTP
processor.OTLPSpanFormat,
&tenancy.Manager{Enabled: true}),
}
err := consumerHelper.consume(context.Background(), makeTracesOneSpan())
require.ErrorContains(t, err, "missing tenant header")
}
12 changes: 8 additions & 4 deletions cmd/collector/app/handler/zipkin_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,15 @@ func startZipkinReceiver(
},
}

consumerAdapter := newConsumerDelegate(logger, spanProcessor, tm)
// reset Zipkin spanFormat
consumerAdapter.batchConsumer.spanOptions.SpanFormat = processor.ZipkinSpanFormat
consumerHelper := &consumerHelper{
batchConsumer: newBatchConsumer(logger,
spanProcessor,
processor.HTTPTransport,
processor.ZipkinSpanFormat,
tm),
}

nextConsumer, err := newTraces(consumerAdapter.consume)
nextConsumer, err := newTraces(consumerHelper.consume)
if err != nil {
return nil, fmt.Errorf("could not create Zipkin consumer: %w", err)
}
Expand Down

0 comments on commit 0b5f8b1

Please sign in to comment.