-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathproducer.go
90 lines (77 loc) · 2.53 KB
/
producer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
package otelkafka
import (
"context"
"fmt"
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
semconv "go.opentelemetry.io/otel/semconv/v1.26.0"
"go.opentelemetry.io/otel/trace"
)
// Producer supports only tracing mechanism for Produce method over deprecated ProduceChannel method
type Producer struct {
*kafka.Producer
cfg config
}
func NewProducer(conf *kafka.ConfigMap, opts ...Option) (*Producer, error) {
p, err := kafka.NewProducer(conf)
if err != nil {
return nil, err
}
opts = append(opts, withConfig(conf))
cfg := newConfig(opts...)
return &Producer{Producer: p, cfg: cfg}, nil
}
// Produce calls the underlying Producer.Produce and traces the request.
func (p *Producer) Produce(msg *kafka.Message, deliveryChan chan kafka.Event) error {
span := p.startSpan(msg)
// if the user has selected a delivery channel, we will wrap it and
// wait for the delivery event to finish the span
if deliveryChan != nil {
oldDeliveryChan := deliveryChan
deliveryChan = make(chan kafka.Event)
go func() {
evt := <-deliveryChan
if resMsg, ok := evt.(*kafka.Message); ok {
if err := resMsg.TopicPartition.Error; err != nil {
span.RecordError(resMsg.TopicPartition.Error)
span.SetStatus(codes.Error, err.Error())
}
}
span.End()
oldDeliveryChan <- evt
}()
}
err := p.Producer.Produce(msg, deliveryChan)
// with no delivery channel or enqueue error, finish immediately
if err != nil || deliveryChan == nil {
span.RecordError(err)
span.End()
}
return err
}
// Close calls the underlying Producer.Close and also closes the internal
// wrapping producer channel.
func (p *Producer) Close() {
p.Producer.Close()
}
func (p *Producer) startSpan(msg *kafka.Message) trace.Span {
// If there's a span context in the message, use that as the parent context.
carrier := NewMessageCarrier(msg)
ctx := p.cfg.Propagators.Extract(context.Background(), carrier)
attr := []attribute.KeyValue{
semconv.MessagingOperationTypePublish,
semconv.MessagingSystemKafka,
semconv.ServerAddress(p.cfg.bootstrapServers),
semconv.MessagingDestinationName(*msg.TopicPartition.Topic),
semconv.MessagingKafkaMessageKey(string(msg.Key)),
semconv.MessagingMessageBodySize(getMsgSize(msg)),
}
opts := []trace.SpanStartOption{
trace.WithAttributes(attr...),
trace.WithSpanKind(trace.SpanKindProducer),
}
ctx, span := p.cfg.Tracer.Start(ctx, fmt.Sprintf("%s publish", *msg.TopicPartition.Topic), opts...)
p.cfg.Propagators.Inject(ctx, carrier)
return span
}