Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switch v1 receivers to use v2 write path #6532

Merged
merged 11 commits into from
Jan 15, 2025
11 changes: 9 additions & 2 deletions cmd/collector/app/handler/grpc_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type mockSpanProcessor struct {
expectedError error
mux sync.Mutex
spans []*model.Span
traces []ptrace.Traces
tenants map[string]bool
transport processor.InboundTransport
spanFormat processor.SpanFormat
Expand All @@ -41,8 +42,8 @@ func (p *mockSpanProcessor) ProcessSpans(_ context.Context, batch processor.Batc
defer p.mux.Unlock()
batch.GetSpans(func(spans []*model.Span) {
p.spans = append(p.spans, spans...)
}, func(_ ptrace.Traces) {
panic("not implemented")
}, func(td ptrace.Traces) {
p.traces = append(p.traces, td)
})
oks := make([]bool, len(p.spans))
if p.tenants == nil {
Expand All @@ -60,6 +61,12 @@ func (p *mockSpanProcessor) getSpans() []*model.Span {
return p.spans
}

func (p *mockSpanProcessor) getTraces() []ptrace.Traces {
p.mux.Lock()
defer p.mux.Unlock()
return p.traces
}

func (p *mockSpanProcessor) getTenants() map[string]bool {
p.mux.Lock()
defer p.mux.Unlock()
Expand Down
48 changes: 24 additions & 24 deletions cmd/collector/app/handler/otlp_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/jaegertracing/jaeger/cmd/collector/app/flags"
"github.com/jaegertracing/jaeger/cmd/collector/app/processor"
"github.com/jaegertracing/jaeger/pkg/tenancy"
"github.com/jaegertracing/jaeger/storage_v2/v1adapter"
)

var _ component.Host = (*otelHost)(nil) // API check
Expand Down Expand Up @@ -72,9 +71,14 @@ func startOTLPReceiver(
},
}

otlpConsumer := newConsumerDelegate(logger, spanProcessor, tm)
// the following two constructors never return errors given non-nil arguments, so we ignore errors
nextConsumer, err := newTraces(otlpConsumer.consume)
consumerHelper := &consumerHelper{
batchConsumer: newBatchConsumer(logger,
spanProcessor,
processor.UnknownTransport, // could be gRPC or HTTP
processor.OTLPSpanFormat,
tm),
}
nextConsumer, err := newTraces(consumerHelper.consume)
if err != nil {
return nil, fmt.Errorf("could not create the OTLP consumer: %w", err)
}
Expand All @@ -93,29 +97,25 @@ func startOTLPReceiver(
return otlpReceiver, nil
}

func newConsumerDelegate(logger *zap.Logger, spanProcessor processor.SpanProcessor, tm *tenancy.Manager) *consumerDelegate {
return &consumerDelegate{
batchConsumer: newBatchConsumer(logger,
spanProcessor,
processor.UnknownTransport, // could be gRPC or HTTP
processor.OTLPSpanFormat,
tm),
}
}

type consumerDelegate struct {
batchConsumer batchConsumer
type consumerHelper struct {
batchConsumer
}

func (c *consumerDelegate) consume(ctx context.Context, td ptrace.Traces) error {
batches := v1adapter.ProtoFromTraces(td)
for _, batch := range batches {
err := c.batchConsumer.consume(ctx, batch)
if err != nil {
return err
}
func (ch *consumerHelper) consume(ctx context.Context, td ptrace.Traces) error {
tenant, err := ch.validateTenant(ctx)
if err != nil {
ch.logger.Debug("rejecting spans (tenancy)", zap.Error(err))
return err
}
return nil
_, err = ch.spanProcessor.ProcessSpans(ctx, processor.SpansV2{
Traces: td,
Details: processor.Details{
InboundTransport: ch.spanOptions.InboundTransport,
SpanFormat: ch.spanOptions.SpanFormat,
Tenant: tenant,
},
})
return err
}

var _ componentstatus.Reporter = (*otelHost)(nil)
Expand Down
75 changes: 46 additions & 29 deletions cmd/collector/app/handler/otlp_receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"errors"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand All @@ -19,8 +20,10 @@ import (
"go.opentelemetry.io/collector/pipeline"
"go.opentelemetry.io/collector/receiver"
"go.opentelemetry.io/collector/receiver/otlpreceiver"
"go.uber.org/zap/zaptest"

"github.com/jaegertracing/jaeger/cmd/collector/app/flags"
"github.com/jaegertracing/jaeger/cmd/collector/app/processor"
"github.com/jaegertracing/jaeger/pkg/tenancy"
"github.com/jaegertracing/jaeger/pkg/testutils"
)
Expand Down Expand Up @@ -49,7 +52,7 @@ func optionsWithPorts(port string) *flags.CollectorOptions {

func TestStartOtlpReceiver(t *testing.T) {
spanProcessor := &mockSpanProcessor{}
logger, _ := testutils.NewLogger()
logger := zaptest.NewLogger(t)
tm := &tenancy.Manager{}
rec, err := StartOTLPReceiver(optionsWithPorts(":0"), logger, spanProcessor, tm)
require.NoError(t, err)
Expand All @@ -70,36 +73,9 @@ func makeTracesOneSpan() ptrace.Traces {
return traces
}

func TestConsumerDelegate(t *testing.T) {
testCases := []struct {
expectErr error
expectLog string
}{
{}, // no errors
{expectErr: errors.New("test-error"), expectLog: "test-error"},
}
for _, test := range testCases {
t.Run(test.expectLog, func(t *testing.T) {
logger, logBuf := testutils.NewLogger()
spanProcessor := &mockSpanProcessor{expectedError: test.expectErr}
consumer := newConsumerDelegate(logger, spanProcessor, &tenancy.Manager{})

err := consumer.consume(context.Background(), makeTracesOneSpan())

if test.expectErr != nil {
require.Equal(t, test.expectErr, err)
assert.Contains(t, logBuf.String(), test.expectLog)
} else {
require.NoError(t, err)
assert.Len(t, spanProcessor.getSpans(), 1)
}
})
}
}

func TestStartOtlpReceiver_Error(t *testing.T) {
spanProcessor := &mockSpanProcessor{}
logger, _ := testutils.NewLogger()
logger := zaptest.NewLogger(t)
opts := optionsWithPorts(":-1")
tm := &tenancy.Manager{}
_, err := StartOTLPReceiver(opts, logger, spanProcessor, tm)
Expand Down Expand Up @@ -139,3 +115,44 @@ func TestOtelHost(t *testing.T) {
assert.Nil(t, host.GetExtensions())
assert.Nil(t, host.GetExporters())
}

func TestConsumerHelper(t *testing.T) {
spanProcessor := &mockSpanProcessor{}
consumerHelper := &consumerHelper{
batchConsumer: newBatchConsumer(zaptest.NewLogger(t),
spanProcessor,
processor.UnknownTransport, // could be gRPC or HTTP
processor.OTLPSpanFormat,
&tenancy.Manager{}),
}
err := consumerHelper.consume(context.Background(), makeTracesOneSpan())
require.NoError(t, err)
assert.Eventually(t, func() bool {
return len(spanProcessor.getTraces()) == 1
}, time.Second, time.Millisecond, "spanProcessor should have received one span")
assert.Empty(t, spanProcessor.getSpans())
}

func TestConsumerHelper_Consume_Error(t *testing.T) {
consumerHelper := &consumerHelper{
batchConsumer: newBatchConsumer(zaptest.NewLogger(t),
&mockSpanProcessor{expectedError: errors.New("mock error")},
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can leverage assert.AnError here

processor.UnknownTransport, // could be gRPC or HTTP
processor.OTLPSpanFormat,
&tenancy.Manager{}),
}
err := consumerHelper.consume(context.Background(), makeTracesOneSpan())
require.ErrorContains(t, err, "mock error")
}

func TestConsumerHelper_Consume_TenantError(t *testing.T) {
consumerHelper := &consumerHelper{
batchConsumer: newBatchConsumer(zaptest.NewLogger(t),
&mockSpanProcessor{},
processor.UnknownTransport, // could be gRPC or HTTP
processor.OTLPSpanFormat,
&tenancy.Manager{Enabled: true}),
}
err := consumerHelper.consume(context.Background(), makeTracesOneSpan())
require.ErrorContains(t, err, "missing tenant header")
}
12 changes: 8 additions & 4 deletions cmd/collector/app/handler/zipkin_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,15 @@ func startZipkinReceiver(
},
}

consumerAdapter := newConsumerDelegate(logger, spanProcessor, tm)
// reset Zipkin spanFormat
consumerAdapter.batchConsumer.spanOptions.SpanFormat = processor.ZipkinSpanFormat
consumerHelper := &consumerHelper{
batchConsumer: newBatchConsumer(logger,
spanProcessor,
processor.HTTPTransport,
processor.ZipkinSpanFormat,
tm),
}

nextConsumer, err := newTraces(consumerAdapter.consume)
nextConsumer, err := newTraces(consumerHelper.consume)
if err != nil {
return nil, fmt.Errorf("could not create Zipkin consumer: %w", err)
}
Expand Down
Loading