Skip to content

Commit

Permalink
Merge pull request #24 from doron-cohen/master
Browse files Browse the repository at this point in the history
Allow other tracing implementations
  • Loading branch information
m110 authored Jul 7, 2023
2 parents 090d971 + aa9296f commit 5a1ee85
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 14 deletions.
13 changes: 10 additions & 3 deletions pkg/kafka/publisher.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package kafka

import (
"go.opentelemetry.io/contrib/instrumentation/github.com/Shopify/sarama/otelsarama"
"time"

"github.com/Shopify/sarama"
Expand Down Expand Up @@ -39,8 +38,12 @@ func NewPublisher(
return nil, errors.Wrap(err, "cannot create Kafka producer")
}

if config.OTELEnabled {
producer = otelsarama.WrapSyncProducer(config.OverwriteSaramaConfig, producer)
if config.OTELEnabled && config.Tracer == nil {
config.Tracer = NewOTELSaramaTracer()
}

if config.Tracer != nil {
producer = config.Tracer.WrapSyncProducer(config.OverwriteSaramaConfig, producer)
}

return &Publisher{
Expand All @@ -62,6 +65,10 @@ type PublisherConfig struct {

// If true then each sent message will be wrapped with Opentelemetry tracing, provided by otelsarama.
OTELEnabled bool

// Tracer is used to trace Kafka messages.
// If nil, then no tracing will be used.
Tracer SaramaTracer
}

func (c *PublisherConfig) setDefaults() {
Expand Down
31 changes: 20 additions & 11 deletions pkg/kafka/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"github.com/Shopify/sarama"
"github.com/hashicorp/go-multierror"
"github.com/pkg/errors"
"go.opentelemetry.io/contrib/instrumentation/github.com/Shopify/sarama/otelsarama"

"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/message"
Expand Down Expand Up @@ -40,6 +39,10 @@ func NewSubscriber(
logger = watermill.NopLogger{}
}

if config.OTELEnabled && config.Tracer == nil {
config.Tracer = NewOTELSaramaTracer()
}

logger = logger.With(watermill.LogFields{
"subscriber_uuid": watermill.NewShortUUID(),
})
Expand Down Expand Up @@ -75,7 +78,13 @@ type SubscriberConfig struct {
InitializeTopicDetails *sarama.TopicDetail

// If true then each consumed message will be wrapped with Opentelemetry tracing, provided by otelsarama.
//
// Deprecated: pass OTELSaramaTracer to Tracer field instead.
OTELEnabled bool

// Tracer is used to trace Kafka messages.
// If nil, then no tracing will be used.
Tracer SaramaTracer
}

// NoSleep can be set to SubscriberConfig.NackResendSleep and SubscriberConfig.ReconnectRetrySleep.
Expand Down Expand Up @@ -231,9 +240,9 @@ func (s *Subscriber) consumeMessages(
}()

if s.config.ConsumerGroup == "" {
consumeMessagesClosed, err = s.consumeWithoutConsumerGroups(ctx, client, topic, output, logFields, s.config.OTELEnabled)
consumeMessagesClosed, err = s.consumeWithoutConsumerGroups(ctx, client, topic, output, logFields, s.config.Tracer)
} else {
consumeMessagesClosed, err = s.consumeGroupMessages(ctx, client, topic, output, logFields, s.config.OTELEnabled)
consumeMessagesClosed, err = s.consumeGroupMessages(ctx, client, topic, output, logFields, s.config.Tracer)
}
if err != nil {
s.logger.Debug(
Expand Down Expand Up @@ -262,7 +271,7 @@ func (s *Subscriber) consumeGroupMessages(
topic string,
output chan *message.Message,
logFields watermill.LogFields,
otelEnabled bool,
tracer SaramaTracer,
) (chan struct{}, error) {
// Start a new consumer group
group, err := sarama.NewConsumerGroupFromClient(s.config.ConsumerGroup, client)
Expand All @@ -283,8 +292,8 @@ func (s *Subscriber) consumeGroupMessages(
messageLogFields: logFields,
}

if otelEnabled {
handler = otelsarama.WrapConsumerGroupHandler(handler)
if tracer != nil {
handler = tracer.WrapConsumerGroupHandler(handler)
}

go func() {
Expand Down Expand Up @@ -367,15 +376,15 @@ func (s *Subscriber) consumeWithoutConsumerGroups(
topic string,
output chan *message.Message,
logFields watermill.LogFields,
otelEnabled bool,
tracer SaramaTracer,
) (chan struct{}, error) {
consumer, err := sarama.NewConsumerFromClient(client)
if err != nil {
return nil, errors.Wrap(err, "cannot create client")
}

if otelEnabled {
consumer = otelsarama.WrapConsumer(consumer)
if tracer != nil {
consumer = tracer.WrapConsumer(consumer)
}

partitions, err := consumer.Partitions(topic)
Expand All @@ -396,8 +405,8 @@ func (s *Subscriber) consumeWithoutConsumerGroups(
return nil, errors.Wrap(err, "failed to start consumer for partition")
}

if otelEnabled {
partitionConsumer = otelsarama.WrapPartitionConsumer(partitionConsumer)
if tracer != nil {
partitionConsumer = tracer.WrapPartitionConsumer(partitionConsumer)
}

messageHandler := s.createMessagesHandler(output)
Expand Down
35 changes: 35 additions & 0 deletions pkg/kafka/tracer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package kafka

import (
"github.com/Shopify/sarama"
"go.opentelemetry.io/contrib/instrumentation/github.com/Shopify/sarama/otelsarama"
)

type SaramaTracer interface {
WrapConsumer(sarama.Consumer) sarama.Consumer
WrapPartitionConsumer(sarama.PartitionConsumer) sarama.PartitionConsumer
WrapConsumerGroupHandler(sarama.ConsumerGroupHandler) sarama.ConsumerGroupHandler
WrapSyncProducer(*sarama.Config, sarama.SyncProducer) sarama.SyncProducer
}

type OTELSaramaTracer struct{}

func NewOTELSaramaTracer() SaramaTracer {
return OTELSaramaTracer{}
}

func (t OTELSaramaTracer) WrapConsumer(c sarama.Consumer) sarama.Consumer {
return otelsarama.WrapConsumer(c)
}

func (t OTELSaramaTracer) WrapConsumerGroupHandler(h sarama.ConsumerGroupHandler) sarama.ConsumerGroupHandler {
return otelsarama.WrapConsumerGroupHandler(h)
}

func (t OTELSaramaTracer) WrapPartitionConsumer(pc sarama.PartitionConsumer) sarama.PartitionConsumer {
return otelsarama.WrapPartitionConsumer(pc)
}

func (t OTELSaramaTracer) WrapSyncProducer(cfg *sarama.Config, p sarama.SyncProducer) sarama.SyncProducer {
return otelsarama.WrapSyncProducer(cfg, p)
}

0 comments on commit 5a1ee85

Please sign in to comment.